bewizz's picture
Upload folder using huggingface_hub
b966a36 verified
import os
import json
import requests
import gradio as gr
import random
import time
import logging
import shutil
import zipfile
from typing import List
import subprocess
# Define constants
COMFYUI_DIR = 'ComfyUI'
WORKFLOW_PATH = os.path.join(COMFYUI_DIR, 'sd3.json')
OUTPUT_DIR = os.path.join(COMFYUI_DIR, 'output')
URL = "http://127.0.0.1:8188/prompt"
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Shared state for cancellation
cancel_processing = False
def check_gpu():
try:
import torch
if torch.cuda.is_available():
logging.info("GPU is available")
else:
logging.warning("GPU is not available")
except ImportError:
logging.warning("PyTorch is not installed, cannot check GPU availability")
def start_comfyui():
if not is_comfyui_running():
logging.info("Starting ComfyUI...")
subprocess.Popen(["python", "main.py"], cwd=COMFYUI_DIR)
time.sleep(10) # Wait for ComfyUI to start
if is_comfyui_running():
logging.info("ComfyUI started successfully.")
else:
logging.error("Failed to start ComfyUI.")
def is_comfyui_running():
try:
response = requests.get(URL)
if response.status_code == 200:
return True
except requests.ConnectionError:
return False
return False
def read_workflow(file_path):
logging.info(f"Reading workflow from {file_path}")
if not os.path.exists(file_path):
logging.error(f"Workflow file not found: {file_path}")
raise FileNotFoundError(f"Workflow file not found: {file_path}")
with open(file_path, 'r') as file:
workflow = json.load(file)
return workflow
def update_workflow(workflow, prompt, negative_prompt):
logging.info(f"Updating workflow with new prompts: {prompt}, negative: {negative_prompt}")
workflow["6"]["inputs"]["text"] = prompt
workflow["71"]["inputs"]["text"] = negative_prompt
return workflow
def write_workflow(workflow, file_path):
logging.info(f"Writing updated workflow to {file_path}")
with open(file_path, 'w') as file:
json.dump(workflow, file, indent=4)
def send_workflow_to_comfyui(workflow, url):
headers = {"Content-Type": "application/json"}
try:
response = requests.post(url, headers=headers, json={"prompt": workflow})
response.raise_for_status()
logging.info(f"Workflow sent successfully: {response.status_code}")
logging.debug(f"Response content: {response.content}")
except requests.exceptions.RequestException as e:
logging.error(f"Error sending workflow to ComfyUI: {e}")
raise
def monitor_output_images(output_dir, previous_images, timeout=60):
start_time = time.time()
logging.info(f"Monitoring {output_dir} for new images...")
while time.time() - start_time < timeout:
current_images = os.listdir(output_dir)
new_images = [img for img in current_images if img not in previous_images]
if new_images:
latest_image = new_images[-1]
logging.info(f"New image found: {latest_image}")
return latest_image
time.sleep(1)
logging.info(f"Timeout while waiting for new images in {output_dir}")
return None
def copy_file_with_retry(src, dst_dir, file_index, retries=5, delay=1):
dst = os.path.join(dst_dir, f"SD3_{file_index:05d}.png")
for _ in range(retries):
try:
shutil.copy(src, dst)
return dst
except PermissionError:
time.sleep(delay)
raise PermissionError(f"Failed to copy {src} to {dst} after {retries} retries")
def zip_files(output_images: List[str], zip_interval: int, zip_folder: str):
zip_files = []
for i in range(0, len(output_images), zip_interval):
zip_filename = os.path.join(zip_folder, f"images_{i//zip_interval + 1}_{time.time_ns()}.zip")
with zipfile.ZipFile(zip_filename, 'w') as zipf:
for img in output_images[i:i+zip_interval]:
zipf.write(img, os.path.basename(img))
zip_files.append(zip_filename)
return zip_files
def process_prompts(prompts_text, negative_prompt_text, user_folder, zip_interval):
global cancel_processing
prompts = [line.strip() for line in prompts_text.split('\n\n') if line.strip()]
negative_prompts = [line.strip() for line in negative_prompt_text.split('\n') if line.strip()]
output_images = []
zip_files_list = []
file_index = 1
workflow = read_workflow(WORKFLOW_PATH)
total_prompts = len(prompts)
previous_images = os.listdir(OUTPUT_DIR)
logs = ""
try:
for i, prompt in enumerate(prompts):
if cancel_processing:
logging.info("Processing cancelled by user.")
break
if not prompt.strip():
continue
negative_prompt = negative_prompts[i] if i < len(negative_prompts) else ""
updated_workflow = update_workflow(workflow, prompt, negative_prompt)
write_workflow(updated_workflow, WORKFLOW_PATH)
logging.debug(f"Updated workflow: {json.dumps(updated_workflow, indent=4)}")
send_workflow_to_comfyui(updated_workflow, URL)
logging.info(f"Sent workflow to ComfyUI for prompt {i + 1}/{total_prompts}")
new_image = None
retries = 0
while new_image is None and retries < 5:
new_image = monitor_output_images(OUTPUT_DIR, previous_images)
if new_image is None:
retries += 1
logging.warning(f"Retrying ({retries}/5)...")
time.sleep(5)
else:
time.sleep(2)
if new_image is None:
logging.error("Error monitoring output images: Timed out waiting for new image.")
continue
new_image_path = os.path.join(OUTPUT_DIR, new_image)
try:
copied_image_path = copy_file_with_retry(new_image_path, user_folder, file_index)
logging.info(f"New image generated and copied to user folder: {copied_image_path}")
except PermissionError as e:
logging.error(f"Failed to copy file after retries: {e}")
continue
output_images.append(copied_image_path)
previous_images.append(new_image)
file_index += 1
if len(output_images) % zip_interval == 0 and not cancel_processing:
zip_folder = os.path.join(user_folder, "zipped_images")
os.makedirs(zip_folder, exist_ok=True)
new_zip_files = zip_files(output_images[-zip_interval:], zip_interval, zip_folder)
zip_files_list.extend(new_zip_files)
logs += f"Processed {i + 1}/{total_prompts} - Done: {i + 1}, Left: {total_prompts - (i + 1)}\n"
yield output_images, zip_files_list, logs
if cancel_processing or (len(output_images) % zip_interval != 0):
zip_folder = os.path.join(user_folder, "zipped_images")
os.makedirs(zip_folder, exist_ok=True)
new_zip_files = zip_files(output_images, zip_interval, zip_folder)
zip_files_list.extend(new_zip_files)
except KeyboardInterrupt:
logging.info("Script interrupted by user.")
return output_images, zip_files_list
def cancel_processing_fn():
global cancel_processing
cancel_processing = True
def reset_cancel_processing_fn():
global cancel_processing
cancel_processing = False
def main():
check_gpu() # Check if GPU is available
start_comfyui() # Start ComfyUI if not already running
with gr.Blocks(css="""
.gradio-container {font-family: Arial, sans-serif;}
.psychedelic-text span {
animation: colorchange 10s infinite;
}
@keyframes colorchange {
0% { color: #ff69b4; }
10% { color: #ba55d3; }
20% { color: #7b68ee; }
30% { color: #00bfff; }
40% { color: #3cb371; }
50% { color: #ffff54; }
60% { color: #ffa500; }
70% { color: #ff4500; }
80% { color: #ff1493; }
90% { color: #da70d6; }
100% { color: #ff69b4; }
}
.image-container img {
width: 250px;
height: 250px;
}
""") as demo:
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### beWiZ's GroOvy SD3 Batch Imagine")
gr.HTML('<div class="image-container"><img src="https://raw.githubusercontent.com/downlifted/Groovy-StyleSuite/main/groovy.png" alt="GroOvy - SD3 Batch Imagine Logo"></div>')
with gr.Accordion("Developer Information", open=False):
gr.Markdown("### Made by BeWiZ")
gr.Markdown('<div class="image-container"><a href="https://twitter.com/AiAnarchist"><img src="https://raw.githubusercontent.com/downlifted/pictoprompt/master/aia.png" alt="BeWiZ Logo"></a></div>')
gr.Markdown("Contact: [downlifted@gmail.com](mailto:downlifted@gmail.com)")
gr.Markdown("Twitter: [@AiAnarchist](https://x.com/AiAnarchist)")
with gr.Accordion("About SD3 Batch Imagine", open=False):
gr.Markdown("""
### SD3 Batch Imagine: Batch Image Generation
Produce large batches of images using the latest SD3 Medium model. This tool allows you to generate images quickly and efficiently.
- **ComfyUI**: For seamless integration and image processing.
- **Hugging Face**: For state-of-the-art language models.
- **Gradio**: For an intuitive user interface.
""")
with gr.Accordion("Instructions", open=True):
gr.Markdown("""
**SD3 Batch Imagine Instructions**
- Enter your prompts below, one per empty line.
- Enter your negative prompts below, one per line. (Optional)
- Set the zip interval to determine how many images will be included in each zip file.
- Click "Process Prompts" to start generating images.
- Click "Cancel Processing" to stop the current batch run.
- Watch the progress as images are generated in real-time.
- At the end of the process, zip files containing your images will be available for download.
""")
with gr.Column(scale=2):
gr.Markdown("### Enter Prompts")
prompts_text = gr.Textbox(lines=20, placeholder="Enter your prompts here, one per empty line.", label="Prompts")
negative_prompts_text = gr.Textbox(lines=5, placeholder="Enter your negative prompts here, one per line.", label="Negative Prompts")
zip_interval = gr.Number(value=10, label="Zip Interval", precision=0)
process_btn = gr.Button("Process Prompts")
cancel_btn = gr.Button("Cancel Processing")
progress_text = gr.Markdown("Progress")
gallery_output = gr.Gallery(label="Generated Images")
zip_files_output = gr.Files(label="Zip Files")
with gr.Column(scale=1):
gr.Markdown("### Detailed Logs")
logs_output = gr.Textbox(lines=10, interactive=False, label="Logs")
def generate_user_folder():
user_folder = os.path.normpath(os.path.join(OUTPUT_DIR, f'SD3{random.randint(1000, 9999)}'))
os.makedirs(user_folder, exist_ok=True)
logging.info(f"Generated user folder: {user_folder}")
return user_folder
def on_click(prompts_text, negative_prompts_text, zip_interval):
reset_cancel_processing_fn()
user_folder = generate_user_folder()
output_images, zip_files_list = [], []
logs = ""
for images, zip_files, log_msg in process_prompts(prompts_text, negative_prompts_text, user_folder, zip_interval):
output_images = images
zip_files_list = zip_files
logs = log_msg
yield images, zip_files_list, logs
return output_images, zip_files_list, logs
process_btn.click(
fn=on_click,
inputs=[prompts_text, negative_prompts_text, zip_interval],
outputs=[gallery_output, zip_files_output, logs_output]
)
cancel_btn.click(
fn=cancel_processing_fn,
inputs=[],
outputs=[]
)
demo.launch()
if __name__ == "__main__":
main()