Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| from PIL import Image | |
| import os | |
| from collections import defaultdict | |
| css = """ | |
| #custom-gallery{--row-height:180px;display:grid;grid-auto-rows:min-content;gap:10px}#custom-gallery .thumbnail-item{height:var(--row-height);width:100%;position:relative;overflow:hidden;border-radius:8px;box-shadow:0 2px 5px rgb(0 0 0 / .1);transition:transform 0.2s ease,box-shadow 0.2s ease}#custom-gallery .thumbnail-item:hover{transform:translateY(-3px);box-shadow:0 4px 12px rgb(0 0 0 / .15)}#custom-gallery .thumbnail-item img{width:auto;height:100%;max-width:100%;max-height:var(--row-height);object-fit:contain;margin:0 auto;display:block}#custom-gallery .thumbnail-item img.portrait{max-width:100%}#custom-gallery .thumbnail-item img.landscape{max-height:100%}.gallery-container{max-height:500px;overflow-y:auto;padding-right:0;--size-80:500px}.thumbnails{display:flex;position:absolute;bottom:0;width:120px;overflow-x:scroll;padding-top:320px;padding-bottom:280px;padding-left:4px;flex-wrap:wrap} | |
| """ | |
| EMPTY_RESULT = ("Not Available",) * 15 | |
| # ---------- EXTRACTION FUNCTIONS ---------- | |
| def read_metadata(file_path): | |
| try: | |
| with Image.open(file_path) as img: | |
| return img.info | |
| except Exception as e: | |
| return {"error": f"Error reading file: {str(e)}"} | |
| def extract_workflow_data(file_path): | |
| metadata = read_metadata(file_path) | |
| if "error" in metadata: | |
| return {"error": metadata["error"]} | |
| if 'prompt' in metadata: | |
| try: | |
| return json.loads(metadata['prompt']) | |
| except json.JSONDecodeError: | |
| pass | |
| for key, value in metadata.items(): | |
| if isinstance(value, str) and value.strip().startswith('{'): | |
| try: | |
| return json.loads(value) | |
| except json.JSONDecodeError: | |
| continue | |
| return {"error": "No workflow data found"} | |
| def extract_ksampler_params(workflow_data): | |
| seed = steps = cfg = sampler = scheduler = denoise = "Not found" | |
| if not isinstance(workflow_data, dict): | |
| return seed, steps, cfg, sampler, scheduler, denoise | |
| for node in workflow_data.values(): | |
| if isinstance(node, dict) and node.get("class_type", "") in ["KSampler", "KSampler (Efficient)"]: | |
| inputs = node.get("inputs", {}) | |
| seed = inputs.get("seed", "Not found") | |
| steps = inputs.get("steps", "Not found") | |
| cfg = inputs.get("cfg", "Not found") | |
| sampler = inputs.get("sampler_name", "Not found") | |
| scheduler = inputs.get("scheduler", "Not found") | |
| denoise = inputs.get("denoise", "Not found") | |
| break | |
| return str(seed), str(steps), str(cfg), str(sampler), str(scheduler), str(denoise) | |
| def extract_prompts(workflow_data): | |
| positive = negative = "Not found" | |
| if not isinstance(workflow_data, dict): | |
| return positive, negative | |
| for node in workflow_data.values(): | |
| if isinstance(node, dict): | |
| class_type = node.get("class_type", "") | |
| inputs = node.get("inputs", {}) | |
| title = node.get("_meta", {}).get("title", "") if node.get("_meta") else "" | |
| if "Text to Conditioning" in class_type: | |
| if "POSITIVE" in title: | |
| positive = inputs.get("text", "Not found") | |
| elif "NEGATIVE" in title: | |
| negative = inputs.get("text", "Not found") | |
| if "ShowText|pysssss" in class_type: | |
| if "text_1" in inputs: | |
| positive = inputs["text_1"] | |
| if "text_2" in inputs: | |
| negative = inputs["text_2"] | |
| if "DPRandomGenerator" in class_type: | |
| if "POSITIVE" in title: | |
| positive = inputs.get("text", "Not found") | |
| elif "NEGATIVE" in title: | |
| negative = inputs.get("text", "Not found") | |
| return str(positive), str(negative) | |
| def extract_loras(workflow_data): | |
| loras = [] | |
| if not isinstance(workflow_data, dict): | |
| return "None found" | |
| for node in workflow_data.values(): | |
| if isinstance(node, dict): | |
| inputs = node.get("inputs", {}) | |
| if "LoraLoader" in node.get("class_type", ""): | |
| name = inputs.get("lora_name", "Unknown") | |
| strength = inputs.get("strength_model", "Unknown") | |
| loras.append(f"{name} (Strength: {strength})") | |
| for val in inputs.values(): | |
| if isinstance(val, str) and "lora:" in val.lower(): | |
| loras.append(val) | |
| return "\n".join(loras) if loras else "None found" | |
| def extract_model_info(workflow_data): | |
| models = [] | |
| if not isinstance(workflow_data, dict): | |
| return "Not found" | |
| for node in workflow_data.values(): | |
| if isinstance(node, dict): | |
| inputs = node.get("inputs", {}) | |
| class_type = node.get("class_type", "") | |
| if "CheckpointLoader" in class_type: | |
| models.append(inputs.get("ckpt_name", "Unknown")) | |
| if "Model Mecha Recipe" in class_type: | |
| models.append(inputs.get("model_path", "Unknown")) | |
| return "\n".join(models) if models else "Not found" | |
| def extract_image_info_from_file(image_path): | |
| """Extract actual image dimensions from the image file itself""" | |
| try: | |
| with Image.open(image_path) as img: | |
| width, height = img.size | |
| return str(width), str(height) | |
| except Exception as e: | |
| return "Not found", "Not found" | |
| def extract_batch_size(workflow_data): | |
| """Extract batch size from workflow data""" | |
| batch_size = "Not found" | |
| if not isinstance(workflow_data, dict): | |
| return batch_size | |
| for node in workflow_data.values(): | |
| if isinstance(node, dict) and node.get("class_type", "") == "EmptyLatentImage": | |
| inputs = node.get("inputs", {}) | |
| batch_size = inputs.get("batch_size", "Not found") | |
| break | |
| return str(batch_size) | |
| def extract_nodes_info(workflow_data): | |
| if not isinstance(workflow_data, dict): | |
| return "Not found" | |
| total_nodes = len(workflow_data) | |
| node_types = defaultdict(int) | |
| for node in workflow_data.values(): | |
| if isinstance(node, dict): | |
| node_types[node.get("class_type", "Unknown")] += 1 | |
| summary = f"Total Nodes: {total_nodes}\n" | |
| for t, c in sorted(node_types.items()): | |
| summary += f"{t}: {c}\n" | |
| return summary.strip() | |
| def extract_workflow_as_json(workflow_data): | |
| if isinstance(workflow_data, dict): | |
| return json.dumps(workflow_data, ensure_ascii=False, indent=2) | |
| return "{}" | |
| # ---------- EXTRACTION FUNCTIONS ---------- | |
| # | |
| # ---------- IMAGE PROCESSING ---------- | |
| def process_single_image(image_path): | |
| """Extract all workflow info from a single image path.""" | |
| if not image_path: | |
| return EMPTY_RESULT | |
| workflow_data = extract_workflow_data(image_path) | |
| if isinstance(workflow_data, dict) and "error" not in workflow_data: | |
| seed, steps, cfg, sampler, scheduler, denoise = extract_ksampler_params(workflow_data) | |
| positive, negative = extract_prompts(workflow_data) | |
| loras = extract_loras(workflow_data) | |
| models = extract_model_info(workflow_data) | |
| # Get actual image dimensions instead of workflow dimensions | |
| width, height = extract_image_info_from_file(image_path) | |
| batch = extract_batch_size(workflow_data) | |
| nodes = extract_nodes_info(workflow_data) | |
| full_json = extract_workflow_as_json(workflow_data) | |
| else: | |
| error = str(workflow_data.get("error", "Unknown error")) | |
| seed = steps = cfg = sampler = scheduler = denoise = positive = negative = loras = models = width = height = batch = nodes = full_json = error | |
| return seed, steps, cfg, sampler, scheduler, denoise, \ | |
| positive, negative, loras, models, width, height, batch, nodes, full_json | |
| def append_gallery(gallery: list, image: str): | |
| """Add a single image to the gallery""" | |
| if gallery is None: | |
| gallery = [] | |
| if not image: | |
| return gallery, None | |
| gallery.append(image) | |
| return gallery, None | |
| def extend_gallery(gallery, images): | |
| """Extend gallery preserving uniqueness""" | |
| if gallery is None: | |
| gallery = [] | |
| if not images: | |
| return gallery | |
| # Normalize input - Gradio might pass various formats | |
| incoming_paths = [] | |
| if isinstance(images, str): # Single image path | |
| incoming_paths.append(images) | |
| elif isinstance(images, list): | |
| for img in images: | |
| # Handle cases where elements could be tuples from Gallery | |
| if isinstance(img, (tuple, list)): | |
| incoming_paths.append(str(img[0])) | |
| else: | |
| incoming_paths.append(str(img)) | |
| unique_incoming = list(set(incoming_paths)) # Avoid duplicates | |
| seen_paths = {item[0] if isinstance(item, (list, tuple)) else item for item in gallery} | |
| new_entries = [path for path in unique_incoming if path not in seen_paths] | |
| # Create entries matching expected gallery style | |
| formatted_new = [(path, '') for path in new_entries] | |
| updated_gallery = gallery + formatted_new | |
| return updated_gallery | |
| def process_gallery(gallery, results_state): | |
| """Process all images and populate metadata in session.""" | |
| if not gallery or len(gallery) == 0: | |
| # Clear results if nothing left | |
| results_state.clear() | |
| return EMPTY_RESULT + (results_state,) | |
| updated_state = {} | |
| first_image_result = EMPTY_RESULT | |
| try: | |
| for item in gallery: | |
| path = item if isinstance(item, str) else item[0] | |
| if path not in results_state: | |
| res = process_single_image(path) | |
| results_state[path] = res | |
| updated_state[path] = res | |
| if first_image_result == EMPTY_RESULT: | |
| first_image_result = res | |
| else: | |
| # Already cached | |
| res = results_state[path] | |
| updated_state[path] = res | |
| if first_image_result == EMPTY_RESULT: | |
| first_image_result = res | |
| results_state.update(updated_state) | |
| return first_image_result + (results_state,) | |
| except Exception as e: | |
| print("[ERROR]", str(e)) | |
| return EMPTY_RESULT + (results_state,) | |
| def get_selection_from_gallery(gallery, results_state, evt: gr.SelectData): | |
| """Fetch result for selected image in gallery.""" | |
| if evt is None or evt.value is None: | |
| # No selection: use first image | |
| if gallery and len(gallery) > 0: | |
| img_path = str(gallery[0][0] if isinstance(gallery[0], (list, tuple)) else gallery[0]) | |
| if img_path in results_state: | |
| return list(results_state[img_path]) | |
| else: | |
| # Handle selection event | |
| try: | |
| selected_value = evt.value | |
| img_path = None | |
| if isinstance(selected_value, dict) and 'image' in selected_value: | |
| img_path = selected_value['image']['path'] | |
| elif isinstance(selected_value, (list, tuple)): | |
| img_path = selected_value[0] | |
| else: | |
| img_path = str(selected_value) | |
| if img_path in results_state: | |
| return list(results_state[img_path]) | |
| except Exception as e: | |
| print(f"Selection error: {e}") | |
| # Return empty if no image found | |
| return list(EMPTY_RESULT) | |
| # ---------- IMAGE PROCESSING ---------- | |
| # | |
| def create_multi_comfy(): | |
| with gr.Blocks(css=css, fill_width=True) as demo: | |
| gr.Markdown("# 🛠️ ComfyUI Workflow Information Extractor") | |
| gr.Markdown("Upload Multiple ComfyUI-generated images. Extract prompts, parameters, models, and full workflows.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| upload_button = gr.UploadButton( | |
| "📁 Upload Multiple Images", | |
| file_types=["image"], | |
| file_count="multiple", | |
| size='lg' | |
| ) | |
| gallery = gr.Gallery( | |
| columns=3, | |
| show_share_button=False, | |
| interactive=True, | |
| height='auto', | |
| label='Grid of images', | |
| preview=False, | |
| elem_id='custom-gallery' | |
| ) | |
| with gr.Column(scale=3): | |
| with gr.Tabs(): | |
| with gr.Tab("Sampling Parameters"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| seed_out = gr.Textbox(label="Seed", interactive=False, show_copy_button=True) | |
| steps_out = gr.Textbox(label="Steps", interactive=False, show_copy_button=True) | |
| cfg_out = gr.Textbox(label="CFG Scale", interactive=False) | |
| with gr.Column(): | |
| sampler_out = gr.Textbox(label="Sampler", interactive=False) | |
| scheduler_out = gr.Textbox(label="Scheduler", interactive=False) | |
| denoise_out = gr.Textbox(label="Denoise", interactive=False) | |
| with gr.Tab("Prompts"): | |
| pos_prompt = gr.Textbox(label="Positive Prompt", lines=4, interactive=False, show_copy_button=True) | |
| neg_prompt = gr.Textbox(label="Negative Prompt", lines=4, interactive=False, show_copy_button=True) | |
| with gr.Tab("Models & LoRAs"): | |
| with gr.Row(): | |
| lora_out = gr.Textbox(label="LoRAs", lines=5, interactive=False, show_copy_button=True) | |
| model_out = gr.Textbox(label="Base Models", lines=5, interactive=False, show_copy_button=True) | |
| with gr.Tab("Image Info"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| width_out = gr.Textbox(label="Width", interactive=False) | |
| height_out = gr.Textbox(label="Height", interactive=False) | |
| batch_out = gr.Textbox(label="Batch Size", interactive=False) | |
| with gr.Column(): | |
| nodes_out = gr.Textbox(label="Node Counts", lines=15, interactive=True, show_copy_button=True) | |
| with gr.Tab("Full Workflow"): | |
| json_out = gr.Textbox(label="Workflow JSON", lines=20, interactive=True, show_copy_button=True) | |
| # State to store results per image | |
| results_state = gr.State({}) | |
| # Event Connections | |
| upload_event = upload_button.upload( | |
| fn=extend_gallery, | |
| inputs=[gallery, upload_button], | |
| outputs=gallery, | |
| queue=False | |
| ) | |
| upload_event.then( | |
| fn=process_gallery, | |
| inputs=[gallery, results_state], | |
| outputs=[ | |
| seed_out, steps_out, cfg_out, sampler_out, scheduler_out, denoise_out, | |
| pos_prompt, neg_prompt, lora_out, model_out, width_out, height_out, | |
| batch_out, nodes_out, json_out, results_state | |
| ] | |
| ) | |
| gallery.change( | |
| fn=process_gallery, | |
| inputs=[gallery, results_state], | |
| outputs=[ | |
| seed_out, steps_out, cfg_out, sampler_out, scheduler_out, denoise_out, | |
| pos_prompt, neg_prompt, lora_out, model_out, width_out, height_out, | |
| batch_out, nodes_out, json_out, results_state | |
| ], | |
| queue=True | |
| ) | |
| gallery.select( | |
| get_selection_from_gallery, | |
| inputs=[gallery, results_state], | |
| outputs=[ | |
| seed_out, steps_out, cfg_out, sampler_out, scheduler_out, denoise_out, | |
| pos_prompt, neg_prompt, lora_out, model_out, width_out, height_out, | |
| batch_out, nodes_out, json_out | |
| ] | |
| ) | |
| gr.Markdown("---\n💡 **Note:** It's under development.") | |
| return demo |