Spaces:
Running
Running
import os | |
import json | |
import requests | |
import gradio as gr | |
import random | |
import time | |
import logging | |
import shutil | |
import zipfile | |
from typing import List | |
import subprocess | |
# Define constants | |
COMFYUI_DIR = 'ComfyUI' | |
WORKFLOW_PATH = os.path.join(COMFYUI_DIR, 'sd3.json') | |
OUTPUT_DIR = os.path.join(COMFYUI_DIR, 'output') | |
URL = "http://127.0.0.1:8188/prompt" | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Shared state for cancellation | |
cancel_processing = False | |
def check_gpu(): | |
try: | |
import torch | |
if torch.cuda.is_available(): | |
logging.info("GPU is available") | |
else: | |
logging.warning("GPU is not available") | |
except ImportError: | |
logging.warning("PyTorch is not installed, cannot check GPU availability") | |
def start_comfyui(): | |
if not is_comfyui_running(): | |
logging.info("Starting ComfyUI...") | |
subprocess.Popen(["python", "main.py"], cwd=COMFYUI_DIR) | |
time.sleep(10) # Wait for ComfyUI to start | |
if is_comfyui_running(): | |
logging.info("ComfyUI started successfully.") | |
else: | |
logging.error("Failed to start ComfyUI.") | |
def is_comfyui_running(): | |
try: | |
response = requests.get(URL) | |
if response.status_code == 200: | |
return True | |
except requests.ConnectionError: | |
return False | |
return False | |
def read_workflow(file_path): | |
logging.info(f"Reading workflow from {file_path}") | |
if not os.path.exists(file_path): | |
logging.error(f"Workflow file not found: {file_path}") | |
raise FileNotFoundError(f"Workflow file not found: {file_path}") | |
with open(file_path, 'r') as file: | |
workflow = json.load(file) | |
return workflow | |
def update_workflow(workflow, prompt, negative_prompt): | |
logging.info(f"Updating workflow with new prompts: {prompt}, negative: {negative_prompt}") | |
workflow["6"]["inputs"]["text"] = prompt | |
workflow["71"]["inputs"]["text"] = negative_prompt | |
return workflow | |
def write_workflow(workflow, file_path): | |
logging.info(f"Writing updated workflow to {file_path}") | |
with open(file_path, 'w') as file: | |
json.dump(workflow, file, indent=4) | |
def send_workflow_to_comfyui(workflow, url): | |
headers = {"Content-Type": "application/json"} | |
try: | |
response = requests.post(url, headers=headers, json={"prompt": workflow}) | |
response.raise_for_status() | |
logging.info(f"Workflow sent successfully: {response.status_code}") | |
logging.debug(f"Response content: {response.content}") | |
except requests.exceptions.RequestException as e: | |
logging.error(f"Error sending workflow to ComfyUI: {e}") | |
raise | |
def monitor_output_images(output_dir, previous_images, timeout=60): | |
start_time = time.time() | |
logging.info(f"Monitoring {output_dir} for new images...") | |
while time.time() - start_time < timeout: | |
current_images = os.listdir(output_dir) | |
new_images = [img for img in current_images if img not in previous_images] | |
if new_images: | |
latest_image = new_images[-1] | |
logging.info(f"New image found: {latest_image}") | |
return latest_image | |
time.sleep(1) | |
logging.info(f"Timeout while waiting for new images in {output_dir}") | |
return None | |
def copy_file_with_retry(src, dst_dir, file_index, retries=5, delay=1): | |
dst = os.path.join(dst_dir, f"SD3_{file_index:05d}.png") | |
for _ in range(retries): | |
try: | |
shutil.copy(src, dst) | |
return dst | |
except PermissionError: | |
time.sleep(delay) | |
raise PermissionError(f"Failed to copy {src} to {dst} after {retries} retries") | |
def zip_files(output_images: List[str], zip_interval: int, zip_folder: str): | |
zip_files = [] | |
for i in range(0, len(output_images), zip_interval): | |
zip_filename = os.path.join(zip_folder, f"images_{i//zip_interval + 1}_{time.time_ns()}.zip") | |
with zipfile.ZipFile(zip_filename, 'w') as zipf: | |
for img in output_images[i:i+zip_interval]: | |
zipf.write(img, os.path.basename(img)) | |
zip_files.append(zip_filename) | |
return zip_files | |
def process_prompts(prompts_text, negative_prompt_text, user_folder, zip_interval): | |
global cancel_processing | |
prompts = [line.strip() for line in prompts_text.split('\n\n') if line.strip()] | |
negative_prompts = [line.strip() for line in negative_prompt_text.split('\n') if line.strip()] | |
output_images = [] | |
zip_files_list = [] | |
file_index = 1 | |
workflow = read_workflow(WORKFLOW_PATH) | |
total_prompts = len(prompts) | |
previous_images = os.listdir(OUTPUT_DIR) | |
logs = "" | |
try: | |
for i, prompt in enumerate(prompts): | |
if cancel_processing: | |
logging.info("Processing cancelled by user.") | |
break | |
if not prompt.strip(): | |
continue | |
negative_prompt = negative_prompts[i] if i < len(negative_prompts) else "" | |
updated_workflow = update_workflow(workflow, prompt, negative_prompt) | |
write_workflow(updated_workflow, WORKFLOW_PATH) | |
logging.debug(f"Updated workflow: {json.dumps(updated_workflow, indent=4)}") | |
send_workflow_to_comfyui(updated_workflow, URL) | |
logging.info(f"Sent workflow to ComfyUI for prompt {i + 1}/{total_prompts}") | |
new_image = None | |
retries = 0 | |
while new_image is None and retries < 5: | |
new_image = monitor_output_images(OUTPUT_DIR, previous_images) | |
if new_image is None: | |
retries += 1 | |
logging.warning(f"Retrying ({retries}/5)...") | |
time.sleep(5) | |
else: | |
time.sleep(2) | |
if new_image is None: | |
logging.error("Error monitoring output images: Timed out waiting for new image.") | |
continue | |
new_image_path = os.path.join(OUTPUT_DIR, new_image) | |
try: | |
copied_image_path = copy_file_with_retry(new_image_path, user_folder, file_index) | |
logging.info(f"New image generated and copied to user folder: {copied_image_path}") | |
except PermissionError as e: | |
logging.error(f"Failed to copy file after retries: {e}") | |
continue | |
output_images.append(copied_image_path) | |
previous_images.append(new_image) | |
file_index += 1 | |
if len(output_images) % zip_interval == 0 and not cancel_processing: | |
zip_folder = os.path.join(user_folder, "zipped_images") | |
os.makedirs(zip_folder, exist_ok=True) | |
new_zip_files = zip_files(output_images[-zip_interval:], zip_interval, zip_folder) | |
zip_files_list.extend(new_zip_files) | |
logs += f"Processed {i + 1}/{total_prompts} - Done: {i + 1}, Left: {total_prompts - (i + 1)}\n" | |
yield output_images, zip_files_list, logs | |
if cancel_processing or (len(output_images) % zip_interval != 0): | |
zip_folder = os.path.join(user_folder, "zipped_images") | |
os.makedirs(zip_folder, exist_ok=True) | |
new_zip_files = zip_files(output_images, zip_interval, zip_folder) | |
zip_files_list.extend(new_zip_files) | |
except KeyboardInterrupt: | |
logging.info("Script interrupted by user.") | |
return output_images, zip_files_list | |
def cancel_processing_fn(): | |
global cancel_processing | |
cancel_processing = True | |
def reset_cancel_processing_fn(): | |
global cancel_processing | |
cancel_processing = False | |
def main(): | |
check_gpu() # Check if GPU is available | |
start_comfyui() # Start ComfyUI if not already running | |
with gr.Blocks(css=""" | |
.gradio-container {font-family: Arial, sans-serif;} | |
.psychedelic-text span { | |
animation: colorchange 10s infinite; | |
} | |
@keyframes colorchange { | |
0% { color: #ff69b4; } | |
10% { color: #ba55d3; } | |
20% { color: #7b68ee; } | |
30% { color: #00bfff; } | |
40% { color: #3cb371; } | |
50% { color: #ffff54; } | |
60% { color: #ffa500; } | |
70% { color: #ff4500; } | |
80% { color: #ff1493; } | |
90% { color: #da70d6; } | |
100% { color: #ff69b4; } | |
} | |
.image-container img { | |
width: 250px; | |
height: 250px; | |
} | |
""") as demo: | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### beWiZ's GroOvy SD3 Batch Imagine") | |
gr.HTML('<div class="image-container"><img src="https://raw.githubusercontent.com/downlifted/Groovy-StyleSuite/main/groovy.png" alt="GroOvy - SD3 Batch Imagine Logo"></div>') | |
with gr.Accordion("Developer Information", open=False): | |
gr.Markdown("### Made by BeWiZ") | |
gr.Markdown('<div class="image-container"><a href="https://twitter.com/AiAnarchist"><img src="https://raw.githubusercontent.com/downlifted/pictoprompt/master/aia.png" alt="BeWiZ Logo"></a></div>') | |
gr.Markdown("Contact: [downlifted@gmail.com](mailto:downlifted@gmail.com)") | |
gr.Markdown("Twitter: [@AiAnarchist](https://x.com/AiAnarchist)") | |
with gr.Accordion("About SD3 Batch Imagine", open=False): | |
gr.Markdown(""" | |
### SD3 Batch Imagine: Batch Image Generation | |
Produce large batches of images using the latest SD3 Medium model. This tool allows you to generate images quickly and efficiently. | |
- **ComfyUI**: For seamless integration and image processing. | |
- **Hugging Face**: For state-of-the-art language models. | |
- **Gradio**: For an intuitive user interface. | |
""") | |
with gr.Accordion("Instructions", open=True): | |
gr.Markdown(""" | |
**SD3 Batch Imagine Instructions** | |
- Enter your prompts below, one per empty line. | |
- Enter your negative prompts below, one per line. (Optional) | |
- Set the zip interval to determine how many images will be included in each zip file. | |
- Click "Process Prompts" to start generating images. | |
- Click "Cancel Processing" to stop the current batch run. | |
- Watch the progress as images are generated in real-time. | |
- At the end of the process, zip files containing your images will be available for download. | |
""") | |
with gr.Column(scale=2): | |
gr.Markdown("### Enter Prompts") | |
prompts_text = gr.Textbox(lines=20, placeholder="Enter your prompts here, one per empty line.", label="Prompts") | |
negative_prompts_text = gr.Textbox(lines=5, placeholder="Enter your negative prompts here, one per line.", label="Negative Prompts") | |
zip_interval = gr.Number(value=10, label="Zip Interval", precision=0) | |
process_btn = gr.Button("Process Prompts") | |
cancel_btn = gr.Button("Cancel Processing") | |
progress_text = gr.Markdown("Progress") | |
gallery_output = gr.Gallery(label="Generated Images") | |
zip_files_output = gr.Files(label="Zip Files") | |
with gr.Column(scale=1): | |
gr.Markdown("### Detailed Logs") | |
logs_output = gr.Textbox(lines=10, interactive=False, label="Logs") | |
def generate_user_folder(): | |
user_folder = os.path.normpath(os.path.join(OUTPUT_DIR, f'SD3{random.randint(1000, 9999)}')) | |
os.makedirs(user_folder, exist_ok=True) | |
logging.info(f"Generated user folder: {user_folder}") | |
return user_folder | |
def on_click(prompts_text, negative_prompts_text, zip_interval): | |
reset_cancel_processing_fn() | |
user_folder = generate_user_folder() | |
output_images, zip_files_list = [], [] | |
logs = "" | |
for images, zip_files, log_msg in process_prompts(prompts_text, negative_prompts_text, user_folder, zip_interval): | |
output_images = images | |
zip_files_list = zip_files | |
logs = log_msg | |
yield images, zip_files_list, logs | |
return output_images, zip_files_list, logs | |
process_btn.click( | |
fn=on_click, | |
inputs=[prompts_text, negative_prompts_text, zip_interval], | |
outputs=[gallery_output, zip_files_output, logs_output] | |
) | |
cancel_btn.click( | |
fn=cancel_processing_fn, | |
inputs=[], | |
outputs=[] | |
) | |
demo.launch() | |
if __name__ == "__main__": | |
main() | |