Spaces:
Running
Running
""" | |
MOUSE Workflow - Visual Workflow Builder with UI Execution | |
@Powered by VIDraft | |
✓ Visual workflow designer with drag-and-drop | |
✓ Import/Export JSON with copy-paste support | |
✓ Auto-generate UI from workflow for end-user execution | |
""" | |
import os, json, typing, tempfile, traceback | |
import gradio as gr | |
from gradio_workflowbuilder import WorkflowBuilder | |
# Optional imports for LLM APIs | |
try: | |
from openai import OpenAI | |
OPENAI_AVAILABLE = True | |
except ImportError: | |
OPENAI_AVAILABLE = False | |
print("OpenAI library not available. Install with: pip install openai") | |
# Anthropic 관련 코드 주석 처리 | |
# try: | |
# import anthropic | |
# ANTHROPIC_AVAILABLE = True | |
# except ImportError: | |
# ANTHROPIC_AVAILABLE = False | |
# print("Anthropic library not available. Install with: pip install anthropic") | |
ANTHROPIC_AVAILABLE = False | |
try: | |
import requests | |
REQUESTS_AVAILABLE = True | |
except ImportError: | |
REQUESTS_AVAILABLE = False | |
print("Requests library not available. Install with: pip install requests") | |
try: | |
from huggingface_hub import HfApi, create_repo, upload_file | |
HF_HUB_AVAILABLE = True | |
except ImportError: | |
HF_HUB_AVAILABLE = False | |
print("Huggingface Hub not available. Install with: pip install huggingface-hub") | |
# ------------------------------------------------------------------- | |
# 🛠️ 헬퍼 함수들 | |
# ------------------------------------------------------------------- | |
def export_pretty(data: typing.Dict[str, typing.Any]) -> str: | |
return json.dumps(data, indent=2, ensure_ascii=False) if data else "No workflow to export" | |
def export_file(data: typing.Dict[str, typing.Any]) -> typing.Optional[str]: | |
"""워크플로우를 JSON 파일로 내보내기""" | |
if not data: | |
return None | |
try: | |
# 임시 파일 생성 | |
fd, path = tempfile.mkstemp(suffix=".json", prefix="workflow_", text=True) | |
with os.fdopen(fd, "w", encoding="utf-8") as f: | |
json.dump(data, f, ensure_ascii=False, indent=2) | |
return path | |
except Exception as e: | |
print(f"Error exporting file: {e}") | |
return None | |
def load_json_from_text_or_file(json_text: str, file_obj) -> typing.Tuple[typing.Dict[str, typing.Any], str]: | |
"""텍스트 또는 파일에서 JSON 로드""" | |
# 파일이 있으면 파일 우선 | |
if file_obj is not None: | |
try: | |
with open(file_obj.name, "r", encoding="utf-8") as f: | |
json_text = f.read() | |
except Exception as e: | |
return None, f"❌ Error reading file: {str(e)}" | |
# JSON 텍스트가 없거나 비어있으면 | |
if not json_text or json_text.strip() == "": | |
return None, "No JSON data provided" | |
try: | |
# JSON 파싱 | |
data = json.loads(json_text.strip()) | |
# 데이터 검증 | |
if not isinstance(data, dict): | |
return None, "Invalid format: not a dictionary" | |
# 필수 필드 확인 | |
if 'nodes' not in data: | |
data['nodes'] = [] | |
if 'edges' not in data: | |
data['edges'] = [] | |
nodes_count = len(data.get('nodes', [])) | |
edges_count = len(data.get('edges', [])) | |
return data, f"✅ Loaded: {nodes_count} nodes, {edges_count} edges" | |
except json.JSONDecodeError as e: | |
return None, f"❌ JSON parsing error: {str(e)}" | |
except Exception as e: | |
return None, f"❌ Error: {str(e)}" | |
def create_sample_workflow(example_type="basic"): | |
"""샘플 워크플로우 생성""" | |
if example_type == "basic": | |
# 기본 예제: 간단한 Q&A - VIDraft 사용 | |
return { | |
"nodes": [ | |
{ | |
"id": "input_1", | |
"type": "ChatInput", | |
"position": {"x": 100, "y": 200}, | |
"data": { | |
"label": "User Question", | |
"template": { | |
"input_value": {"value": "What is the capital of Korea?"} | |
} | |
} | |
}, | |
{ | |
"id": "llm_1", | |
"type": "llmNode", | |
"position": {"x": 400, "y": 200}, | |
"data": { | |
"label": "AI Processing", | |
"template": { | |
"provider": {"value": "VIDraft"}, # 기본값을 VIDraft로 변경 | |
"model": {"value": "Gemma-3-r1984-27B"}, | |
"temperature": {"value": 0.7}, | |
"system_prompt": {"value": "You are a helpful assistant."} | |
} | |
} | |
}, | |
{ | |
"id": "output_1", | |
"type": "ChatOutput", | |
"position": {"x": 700, "y": 200}, | |
"data": {"label": "Answer"} | |
} | |
], | |
"edges": [ | |
{"id": "e1", "source": "input_1", "target": "llm_1"}, | |
{"id": "e2", "source": "llm_1", "target": "output_1"} | |
] | |
} | |
elif example_type == "vidraft": | |
# VIDraft 예제 | |
return { | |
"nodes": [ | |
{ | |
"id": "input_1", | |
"type": "ChatInput", | |
"position": {"x": 100, "y": 200}, | |
"data": { | |
"label": "User Input", | |
"template": { | |
"input_value": {"value": "AI와 머신러닝의 차이점을 설명해주세요."} | |
} | |
} | |
}, | |
{ | |
"id": "llm_1", | |
"type": "llmNode", | |
"position": {"x": 400, "y": 200}, | |
"data": { | |
"label": "VIDraft AI (Gemma)", | |
"template": { | |
"provider": {"value": "VIDraft"}, | |
"model": {"value": "Gemma-3-r1984-27B"}, | |
"temperature": {"value": 0.8}, | |
"system_prompt": {"value": "당신은 전문적이고 친절한 AI 교육자입니다. 복잡한 개념을 쉽게 설명해주세요."} | |
} | |
} | |
}, | |
{ | |
"id": "output_1", | |
"type": "ChatOutput", | |
"position": {"x": 700, "y": 200}, | |
"data": {"label": "AI Explanation"} | |
} | |
], | |
"edges": [ | |
{"id": "e1", "source": "input_1", "target": "llm_1"}, | |
{"id": "e2", "source": "llm_1", "target": "output_1"} | |
] | |
} | |
elif example_type == "multi_input": | |
# 다중 입력 예제 | |
return { | |
"nodes": [ | |
{ | |
"id": "name_input", | |
"type": "textInput", | |
"position": {"x": 100, "y": 100}, | |
"data": { | |
"label": "Your Name", | |
"template": { | |
"input_value": {"value": "John"} | |
} | |
} | |
}, | |
{ | |
"id": "topic_input", | |
"type": "textInput", | |
"position": {"x": 100, "y": 250}, | |
"data": { | |
"label": "Topic", | |
"template": { | |
"input_value": {"value": "Python programming"} | |
} | |
} | |
}, | |
{ | |
"id": "level_input", | |
"type": "textInput", | |
"position": {"x": 100, "y": 400}, | |
"data": { | |
"label": "Skill Level", | |
"template": { | |
"input_value": {"value": "beginner"} | |
} | |
} | |
}, | |
{ | |
"id": "combiner", | |
"type": "textNode", | |
"position": {"x": 350, "y": 250}, | |
"data": { | |
"label": "Combine Inputs", | |
"template": { | |
"text": {"value": "Create a personalized learning plan"} | |
} | |
} | |
}, | |
{ | |
"id": "llm_1", | |
"type": "llmNode", | |
"position": {"x": 600, "y": 250}, | |
"data": { | |
"label": "Generate Learning Plan", | |
"template": { | |
"provider": {"value": "VIDraft"}, # 기본값을 VIDraft로 변경 | |
"model": {"value": "Gemma-3-r1984-27B"}, | |
"temperature": {"value": 0.7}, | |
"system_prompt": {"value": "You are an expert educational consultant. Create personalized learning plans based on the user's name, topic of interest, and skill level."} | |
} | |
} | |
}, | |
{ | |
"id": "output_1", | |
"type": "ChatOutput", | |
"position": {"x": 900, "y": 250}, | |
"data": {"label": "Your Learning Plan"} | |
} | |
], | |
"edges": [ | |
{"id": "e1", "source": "name_input", "target": "combiner"}, | |
{"id": "e2", "source": "topic_input", "target": "combiner"}, | |
{"id": "e3", "source": "level_input", "target": "combiner"}, | |
{"id": "e4", "source": "combiner", "target": "llm_1"}, | |
{"id": "e5", "source": "llm_1", "target": "output_1"} | |
] | |
} | |
elif example_type == "chain": | |
# 체인 처리 예제 | |
return { | |
"nodes": [ | |
{ | |
"id": "input_1", | |
"type": "ChatInput", | |
"position": {"x": 50, "y": 200}, | |
"data": { | |
"label": "Original Text", | |
"template": { | |
"input_value": {"value": "The quick brown fox jumps over the lazy dog."} | |
} | |
} | |
}, | |
{ | |
"id": "translator", | |
"type": "llmNode", | |
"position": {"x": 300, "y": 200}, | |
"data": { | |
"label": "Translate to Korean", | |
"template": { | |
"provider": {"value": "VIDraft"}, | |
"model": {"value": "Gemma-3-r1984-27B"}, | |
"temperature": {"value": 0.3}, | |
"system_prompt": {"value": "You are a professional translator. Translate the given English text to Korean accurately."} | |
} | |
} | |
}, | |
{ | |
"id": "analyzer", | |
"type": "llmNode", | |
"position": {"x": 600, "y": 200}, | |
"data": { | |
"label": "Analyze Translation", | |
"template": { | |
"provider": {"value": "OpenAI"}, | |
"model": {"value": "gpt-4.1-mini"}, | |
"temperature": {"value": 0.5}, | |
"system_prompt": {"value": "You are a linguistic expert. Analyze the Korean translation and explain its nuances and cultural context."} | |
} | |
} | |
}, | |
{ | |
"id": "output_translation", | |
"type": "ChatOutput", | |
"position": {"x": 450, "y": 350}, | |
"data": {"label": "Korean Translation"} | |
}, | |
{ | |
"id": "output_analysis", | |
"type": "ChatOutput", | |
"position": {"x": 900, "y": 200}, | |
"data": {"label": "Translation Analysis"} | |
} | |
], | |
"edges": [ | |
{"id": "e1", "source": "input_1", "target": "translator"}, | |
{"id": "e2", "source": "translator", "target": "analyzer"}, | |
{"id": "e3", "source": "translator", "target": "output_translation"}, | |
{"id": "e4", "source": "analyzer", "target": "output_analysis"} | |
] | |
} | |
# 기본값은 basic | |
return create_sample_workflow("basic") | |
# 배포를 위한 독립 앱 생성 함수 | |
def generate_standalone_app(workflow_data: dict, app_name: str, app_description: str) -> str: | |
"""워크플로우를 독립적인 Gradio 앱으로 변환""" | |
# JSON 데이터를 문자열로 변환 | |
workflow_json = json.dumps(workflow_data, indent=2) | |
app_code = f'''""" | |
{app_name} | |
{app_description} | |
Generated by MOUSE Workflow | |
""" | |
import os | |
import json | |
import gradio as gr | |
import requests | |
# Workflow configuration | |
WORKFLOW_DATA = {workflow_json} | |
def execute_workflow(*input_values): | |
"""Execute the workflow with given inputs""" | |
# API keys from environment | |
vidraft_token = os.getenv("FRIENDLI_TOKEN") | |
openai_key = os.getenv("OPENAI_API_KEY") | |
nodes = WORKFLOW_DATA.get("nodes", []) | |
edges = WORKFLOW_DATA.get("edges", []) | |
results = {{}} | |
# Get input nodes | |
input_nodes = [n for n in nodes if n.get("type") in ["ChatInput", "textInput", "Input", "numberInput"]] | |
# Map inputs to node IDs | |
for i, node in enumerate(input_nodes): | |
if i < len(input_values): | |
results[node["id"]] = input_values[i] | |
# Process nodes | |
for node in nodes: | |
node_id = node.get("id") | |
node_type = node.get("type", "") | |
node_data = node.get("data", {{}}) | |
template = node_data.get("template", {{}}) | |
if node_type == "textNode": | |
# Combine connected inputs | |
base_text = template.get("text", {{}}).get("value", "") | |
connected_inputs = [] | |
for edge in edges: | |
if edge.get("target") == node_id: | |
source_id = edge.get("source") | |
if source_id in results: | |
connected_inputs.append(f"{{source_id}}: {{results[source_id]}}") | |
if connected_inputs: | |
results[node_id] = f"{{base_text}}\\n\\nInputs:\\n" + "\\n".join(connected_inputs) | |
else: | |
results[node_id] = base_text | |
elif node_type in ["llmNode", "OpenAIModel", "ChatModel"]: | |
# Get provider and model - VIDraft as default | |
provider = template.get("provider", {{}}).get("value", "VIDraft") | |
if provider not in ["VIDraft", "OpenAI"]: | |
provider = "VIDraft" # Default to VIDraft | |
temperature = template.get("temperature", {{}}).get("value", 0.7) | |
system_prompt = template.get("system_prompt", {{}}).get("value", "") | |
# Get input text | |
input_text = "" | |
for edge in edges: | |
if edge.get("target") == node_id: | |
source_id = edge.get("source") | |
if source_id in results: | |
input_text = results[source_id] | |
break | |
# Call API | |
if provider == "OpenAI" and openai_key: | |
try: | |
from openai import OpenAI | |
client = OpenAI(api_key=openai_key) | |
messages = [] | |
if system_prompt: | |
messages.append({{"role": "system", "content": system_prompt}}) | |
messages.append({{"role": "user", "content": input_text}}) | |
response = client.chat.completions.create( | |
model="gpt-4.1-mini", | |
messages=messages, | |
temperature=temperature, | |
max_tokens=1000 | |
) | |
results[node_id] = response.choices[0].message.content | |
except Exception as e: | |
results[node_id] = f"[OpenAI Error: {{str(e)}}]" | |
elif provider == "VIDraft" and vidraft_token: | |
try: | |
headers = {{ | |
"Authorization": f"Bearer {{vidraft_token}}", | |
"Content-Type": "application/json" | |
}} | |
messages = [] | |
if system_prompt: | |
messages.append({{"role": "system", "content": system_prompt}}) | |
messages.append({{"role": "user", "content": input_text}}) | |
payload = {{ | |
"model": "dep89a2fld32mcm", | |
"messages": messages, | |
"max_tokens": 16384, | |
"temperature": temperature, | |
"top_p": 0.8, | |
"stream": False | |
}} | |
response = requests.post( | |
"https://api.friendli.ai/dedicated/v1/chat/completions", | |
headers=headers, | |
json=payload, | |
timeout=30 | |
) | |
if response.status_code == 200: | |
results[node_id] = response.json()["choices"][0]["message"]["content"] | |
else: | |
results[node_id] = f"[VIDraft Error: {{response.status_code}}]" | |
except Exception as e: | |
results[node_id] = f"[VIDraft Error: {{str(e)}}]" | |
else: | |
# Show which API key is missing | |
if provider == "OpenAI": | |
results[node_id] = "[OpenAI API key not found. Please set OPENAI_API_KEY in Space secrets]" | |
elif provider == "VIDraft": | |
results[node_id] = "[VIDraft API key not found. Please set FRIENDLI_TOKEN in Space secrets]" | |
else: | |
results[node_id] = f"[No API key found for {{provider}}. Using simulated response: {{input_text[:50]}}...]" | |
elif node_type in ["ChatOutput", "textOutput", "Output"]: | |
# Get connected result | |
for edge in edges: | |
if edge.get("target") == node_id: | |
source_id = edge.get("source") | |
if source_id in results: | |
results[node_id] = results[source_id] | |
break | |
# Return outputs | |
output_nodes = [n for n in nodes if n.get("type") in ["ChatOutput", "textOutput", "Output"]] | |
return [results.get(n["id"], "") for n in output_nodes] | |
# Build UI | |
with gr.Blocks(title="{app_name}", theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# {app_name}") | |
gr.Markdown("{app_description}") | |
# API Status Check | |
vidraft_token = os.getenv("FRIENDLI_TOKEN") | |
openai_key = os.getenv("OPENAI_API_KEY") | |
with gr.Accordion("🔑 API Status", open=False): | |
if vidraft_token: | |
gr.Markdown("✅ **VIDraft API**: Connected (Gemma-3-r1984-27B)") | |
else: | |
gr.Markdown("❌ **VIDraft API**: Not configured") | |
if openai_key: | |
gr.Markdown("✅ **OpenAI API**: Connected (gpt-4.1-mini)") | |
else: | |
gr.Markdown("⚠️ **OpenAI API**: Not configured (optional)") | |
if not vidraft_token: | |
gr.Markdown(""" | |
**⚠️ Important**: Please add FRIENDLI_TOKEN to Space secrets for the app to work properly. | |
Go to: Space settings → Repository secrets → Add secret | |
""") | |
elif not openai_key: | |
gr.Markdown(""" | |
**💡 Tip**: The app will work with VIDraft alone. Add OPENAI_API_KEY if you need OpenAI features. | |
""") | |
else: | |
gr.Markdown("**✨ All APIs configured! Your app is fully functional.**") | |
# Extract nodes | |
nodes = WORKFLOW_DATA.get("nodes", []) | |
input_nodes = [n for n in nodes if n.get("type") in ["ChatInput", "textInput", "Input", "numberInput"]] | |
output_nodes = [n for n in nodes if n.get("type") in ["ChatOutput", "textOutput", "Output"]] | |
# Create inputs | |
inputs = [] | |
if input_nodes: | |
gr.Markdown("### 📥 Inputs") | |
for node in input_nodes: | |
label = node.get("data", {{}}).get("label", node.get("id")) | |
template = node.get("data", {{}}).get("template", {{}}) | |
default_value = template.get("input_value", {{}}).get("value", "") | |
if node.get("type") == "numberInput": | |
inp = gr.Number(label=label, value=float(default_value) if default_value else 0) | |
else: | |
inp = gr.Textbox(label=label, value=default_value, lines=2) | |
inputs.append(inp) | |
# Execute button | |
btn = gr.Button("🚀 Execute Workflow", variant="primary") | |
# Create outputs | |
outputs = [] | |
if output_nodes: | |
gr.Markdown("### 📤 Outputs") | |
for node in output_nodes: | |
label = node.get("data", {{}}).get("label", node.get("id")) | |
out = gr.Textbox(label=label, interactive=False, lines=3) | |
outputs.append(out) | |
# Connect | |
btn.click(fn=execute_workflow, inputs=inputs, outputs=outputs) | |
gr.Markdown("---") | |
gr.Markdown("*Powered by MOUSE Workflow*") | |
if __name__ == "__main__": | |
demo.launch() | |
''' | |
return app_code | |
def generate_requirements_txt() -> str: | |
"""Generate requirements.txt for the standalone app""" | |
return """gradio==5.34.2 | |
openai | |
requests | |
""" | |
def deploy_to_huggingface(workflow_data: dict, app_name: str, app_description: str, | |
hf_token: str, space_name: str, is_private: bool = False, | |
api_keys: dict = None) -> dict: | |
"""Deploy workflow to Hugging Face Space with API keys""" | |
if not HF_HUB_AVAILABLE: | |
return {"success": False, "error": "huggingface-hub library not installed"} | |
if api_keys is None: | |
api_keys = {} | |
try: | |
# Initialize HF API | |
api = HfApi(token=hf_token) | |
# Create repository | |
repo_id = api.create_repo( | |
repo_id=space_name, | |
repo_type="space", | |
space_sdk="gradio", | |
private=is_private, | |
exist_ok=True | |
) | |
# Detect which providers are used in the workflow | |
providers_used = set() | |
nodes = workflow_data.get("nodes", []) | |
for node in nodes: | |
if node.get("type") in ["llmNode", "OpenAIModel", "ChatModel"]: | |
template = node.get("data", {}).get("template", {}) | |
provider = template.get("provider", {}).get("value", "") | |
if provider: | |
providers_used.add(provider) | |
# Generate files | |
app_code = generate_standalone_app(workflow_data, app_name, app_description) | |
requirements = generate_requirements_txt() | |
# README with API setup instructions | |
api_status = [] | |
if "FRIENDLI_TOKEN" in api_keys and api_keys["FRIENDLI_TOKEN"]: | |
api_status.append("- **FRIENDLI_TOKEN**: ✅ Will be configured automatically") | |
else: | |
api_status.append("- **FRIENDLI_TOKEN**: ⚠️ Not provided (VIDraft won't work)") | |
if "OPENAI_API_KEY" in api_keys and api_keys["OPENAI_API_KEY"]: | |
api_status.append("- **OPENAI_API_KEY**: ✅ Will be configured automatically") | |
elif "OpenAI" in providers_used: | |
api_status.append("- **OPENAI_API_KEY**: ❌ Required but not provided") | |
readme = f"""--- | |
title: {app_name} | |
emoji: 🐭 | |
colorFrom: blue | |
colorTo: green | |
sdk: gradio | |
sdk_version: 5.34.2 | |
app_file: app.py | |
pinned: false | |
--- | |
# {app_name} | |
{app_description} | |
## 🔑 API Configuration Status | |
{chr(10).join(api_status)} | |
## 📝 Providers Used in This Workflow | |
{', '.join(providers_used) if providers_used else 'No LLM providers detected'} | |
## 🚀 Default Configuration | |
This app is configured to use **VIDraft (Gemma-3-r1984-27B)** as the default LLM provider for optimal performance. | |
--- | |
Generated by MOUSE Workflow | |
""" | |
# Upload files | |
api.upload_file( | |
path_or_fileobj=app_code.encode(), | |
path_in_repo="app.py", | |
repo_id=repo_id.repo_id, | |
repo_type="space" | |
) | |
api.upload_file( | |
path_or_fileobj=requirements.encode(), | |
path_in_repo="requirements.txt", | |
repo_id=repo_id.repo_id, | |
repo_type="space" | |
) | |
api.upload_file( | |
path_or_fileobj=readme.encode(), | |
path_in_repo="README.md", | |
repo_id=repo_id.repo_id, | |
repo_type="space" | |
) | |
# Add all provided API keys as secrets | |
added_secrets = [] | |
failed_secrets = [] | |
for key_name, key_value in api_keys.items(): | |
if key_value: # Only add non-empty keys | |
try: | |
api.add_space_secret( | |
repo_id=repo_id.repo_id, | |
key=key_name, | |
value=key_value | |
) | |
added_secrets.append(key_name) | |
except Exception as e: | |
failed_secrets.append(f"{key_name}: {str(e)}") | |
print(f"Warning: Could not add {key_name} secret: {e}") | |
space_url = f"https://huggingface.co/spaces/{repo_id.repo_id}" | |
return { | |
"success": True, | |
"space_url": space_url, | |
"message": f"Successfully deployed to {space_url}", | |
"added_secrets": added_secrets, | |
"failed_secrets": failed_secrets, | |
"providers_used": list(providers_used) | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"error": str(e) | |
} | |
# UI 실행을 위한 실제 워크플로우 실행 함수 | |
def execute_workflow_simple(workflow_data: dict, input_values: dict) -> dict: | |
"""워크플로우 실제 실행""" | |
import traceback | |
# API 키 확인 | |
vidraft_token = os.getenv("FRIENDLI_TOKEN") # VIDraft/Friendli token | |
openai_key = os.getenv("OPENAI_API_KEY") | |
# anthropic_key = os.getenv("ANTHROPIC_API_KEY") # 주석 처리 | |
# OpenAI 라이브러리 확인 | |
try: | |
from openai import OpenAI | |
openai_available = True | |
except ImportError: | |
openai_available = False | |
print("OpenAI library not available") | |
# Anthropic 라이브러리 확인 - 주석 처리 | |
# try: | |
# import anthropic | |
# anthropic_available = True | |
# except ImportError: | |
# anthropic_available = False | |
# print("Anthropic library not available") | |
anthropic_available = False | |
results = {} | |
nodes = workflow_data.get("nodes", []) | |
edges = workflow_data.get("edges", []) | |
# 노드를 순서대로 처리 | |
for node in nodes: | |
node_id = node.get("id") | |
node_type = node.get("type", "") | |
node_data = node.get("data", {}) | |
try: | |
if node_type in ["ChatInput", "textInput", "Input"]: | |
# UI에서 제공된 입력값 사용 | |
if node_id in input_values: | |
results[node_id] = input_values[node_id] | |
else: | |
# 기본값 사용 | |
template = node_data.get("template", {}) | |
default_value = template.get("input_value", {}).get("value", "") | |
results[node_id] = default_value | |
elif node_type == "textNode": | |
# 텍스트 노드는 연결된 모든 입력을 결합 | |
template = node_data.get("template", {}) | |
base_text = template.get("text", {}).get("value", "") | |
# 연결된 입력들 수집 | |
connected_inputs = [] | |
for edge in edges: | |
if edge.get("target") == node_id: | |
source_id = edge.get("source") | |
if source_id in results: | |
connected_inputs.append(f"{source_id}: {results[source_id]}") | |
# 결합된 텍스트 생성 | |
if connected_inputs: | |
combined_text = f"{base_text}\n\nInputs:\n" + "\n".join(connected_inputs) | |
results[node_id] = combined_text | |
else: | |
results[node_id] = base_text | |
elif node_type in ["llmNode", "OpenAIModel", "ChatModel"]: | |
# LLM 노드 처리 | |
template = node_data.get("template", {}) | |
# 프로바이더 정보 추출 - VIDraft 또는 OpenAI만 허용 | |
provider_info = template.get("provider", {}) | |
provider = provider_info.get("value", "VIDraft") if isinstance(provider_info, dict) else "VIDraft" # 기본값 VIDraft | |
# provider가 VIDraft 또는 OpenAI가 아닌 경우 VIDraft로 기본 설정 | |
if provider not in ["VIDraft", "OpenAI"]: | |
provider = "VIDraft" | |
# 모델 정보 추출 | |
if provider == "OpenAI": | |
# OpenAI는 gpt-4.1-mini로 고정 | |
model = "gpt-4.1-mini" | |
elif provider == "VIDraft": | |
# VIDraft는 Gemma-3-r1984-27B로 고정 | |
model = "Gemma-3-r1984-27B" | |
else: | |
model = "Gemma-3-r1984-27B" # 기본값 VIDraft 모델 | |
# 온도 정보 추출 | |
temp_info = template.get("temperature", {}) | |
temperature = temp_info.get("value", 0.7) if isinstance(temp_info, dict) else 0.7 | |
# 시스템 프롬프트 추출 | |
prompt_info = template.get("system_prompt", {}) | |
system_prompt = prompt_info.get("value", "") if isinstance(prompt_info, dict) else "" | |
# 입력 텍스트 찾기 | |
input_text = "" | |
for edge in edges: | |
if edge.get("target") == node_id: | |
source_id = edge.get("source") | |
if source_id in results: | |
input_text = results[source_id] | |
break | |
# 실제 API 호출 | |
if provider == "OpenAI" and openai_key and openai_available: | |
try: | |
client = OpenAI(api_key=openai_key) | |
messages = [] | |
if system_prompt: | |
messages.append({"role": "system", "content": system_prompt}) | |
messages.append({"role": "user", "content": input_text}) | |
response = client.chat.completions.create( | |
model="gpt-4.1-mini", # 고정된 모델명 | |
messages=messages, | |
temperature=temperature, | |
max_tokens=1000 | |
) | |
results[node_id] = response.choices[0].message.content | |
except Exception as e: | |
results[node_id] = f"[OpenAI Error: {str(e)}]" | |
# Anthropic 관련 코드 주석 처리 | |
# elif provider == "Anthropic" and anthropic_key and anthropic_available: | |
# try: | |
# client = anthropic.Anthropic(api_key=anthropic_key) | |
# | |
# message = client.messages.create( | |
# model="claude-3-haiku-20240307", | |
# max_tokens=1000, | |
# temperature=temperature, | |
# system=system_prompt if system_prompt else None, | |
# messages=[{"role": "user", "content": input_text}] | |
# ) | |
# | |
# results[node_id] = message.content[0].text | |
# | |
# except Exception as e: | |
# results[node_id] = f"[Anthropic Error: {str(e)}]" | |
elif provider == "VIDraft" and vidraft_token: | |
try: | |
import requests | |
headers = { | |
"Authorization": f"Bearer {vidraft_token}", | |
"Content-Type": "application/json" | |
} | |
# 메시지 구성 | |
messages = [] | |
if system_prompt: | |
messages.append({"role": "system", "content": system_prompt}) | |
messages.append({"role": "user", "content": input_text}) | |
payload = { | |
"model": "dep89a2fld32mcm", # VIDraft 모델 ID | |
"messages": messages, | |
"max_tokens": 16384, | |
"temperature": temperature, | |
"top_p": 0.8, | |
"stream": False # 동기 실행을 위해 False로 설정 | |
} | |
# VIDraft API endpoint | |
response = requests.post( | |
"https://api.friendli.ai/dedicated/v1/chat/completions", | |
headers=headers, | |
json=payload, | |
timeout=30 | |
) | |
if response.status_code == 200: | |
response_json = response.json() | |
results[node_id] = response_json["choices"][0]["message"]["content"] | |
else: | |
results[node_id] = f"[VIDraft API Error: {response.status_code} - {response.text}]" | |
except Exception as e: | |
results[node_id] = f"[VIDraft Error: {str(e)}]" | |
else: | |
# API 키가 없는 경우 시뮬레이션 | |
results[node_id] = f"[Simulated {provider} Response to: {input_text[:50]}...]" | |
elif node_type in ["ChatOutput", "textOutput", "Output"]: | |
# 출력 노드는 연결된 노드의 결과를 가져옴 | |
for edge in edges: | |
if edge.get("target") == node_id: | |
source_id = edge.get("source") | |
if source_id in results: | |
results[node_id] = results[source_id] | |
break | |
except Exception as e: | |
results[node_id] = f"[Node Error: {str(e)}]" | |
print(f"Error processing node {node_id}: {traceback.format_exc()}") | |
return results | |
# ------------------------------------------------------------------- | |
# 🎨 CSS | |
# ------------------------------------------------------------------- | |
CSS = """ | |
.main-container{max-width:1600px;margin:0 auto;} | |
.workflow-section{margin-bottom:2rem;min-height:500px;} | |
.button-row{display:flex;gap:1rem;justify-content:center;margin:1rem 0;} | |
.status-box{ | |
padding:10px;border-radius:5px;margin-top:10px; | |
background:#f0f9ff;border:1px solid #3b82f6;color:#1e40af; | |
} | |
.component-description{ | |
padding:24px;background:linear-gradient(135deg,#f8fafc 0%,#e2e8f0 100%); | |
border-left:4px solid #3b82f6;border-radius:12px; | |
box-shadow:0 2px 8px rgba(0,0,0,.05);margin:16px 0; | |
} | |
.workflow-container{position:relative;} | |
.ui-execution-section{ | |
background:linear-gradient(135deg,#f0fdf4 0%,#dcfce7 100%); | |
padding:24px;border-radius:12px;margin:24px 0; | |
border:1px solid #86efac; | |
} | |
.powered-by{ | |
text-align:center;color:#64748b;font-size:14px; | |
margin-top:8px;font-style:italic; | |
} | |
.sample-buttons{ | |
display:grid;grid-template-columns:1fr 1fr;gap:0.5rem; | |
margin-top:0.5rem; | |
} | |
.deploy-section{ | |
background:linear-gradient(135deg,#fef3c7 0%,#fde68a 100%); | |
padding:24px;border-radius:12px;margin:24px 0; | |
border:1px solid #fbbf24; | |
} | |
.save-indicator{ | |
text-align:right; | |
font-size:14px; | |
color:#16a34a; | |
padding:8px 16px; | |
background:#f0fdf4; | |
border-radius:20px; | |
display:inline-block; | |
margin-left:auto; | |
} | |
.workflow-info{ | |
font-size:14px; | |
color:#475569; | |
background:#f8fafc; | |
padding:8px 16px; | |
border-radius:8px; | |
display:inline-block; | |
margin-bottom:16px; | |
} | |
""" | |
# ------------------------------------------------------------------- | |
# 🖥️ Gradio 앱 | |
# ------------------------------------------------------------------- | |
with gr.Blocks(title="🐭 MOUSE Workflow", theme=gr.themes.Soft(), css=CSS) as demo: | |
with gr.Column(elem_classes=["main-container"]): | |
gr.Markdown("# 🐭 MOUSE Workflow") | |
gr.Markdown("**Visual Workflow Builder with Interactive UI Execution**") | |
gr.HTML('<p class="powered-by">@Powered by VIDraft & Huggingface gradio</p>') | |
html_content = """<div class="component-description"> | |
<p style="font-size:16px;margin:0;">Build sophisticated workflows visually • Import/Export JSON • Generate interactive UI for end-users • Default LLM: VIDraft (Gemma-3-r1984-27B)</p> | |
<p style="font-size:14px;margin-top:8px;color:#64748b;">💡 Tip: Your workflow is automatically saved as you make changes. The JSON preview updates in real-time!</p> | |
</div>""" | |
gr.HTML(html_content) | |
# API Status Display | |
with gr.Accordion("🔌 API Status", open=False): | |
gr.Markdown(f""" | |
**Available APIs:** | |
- FRIENDLI_TOKEN (VIDraft): {'✅ Connected' if os.getenv("FRIENDLI_TOKEN") else '❌ Not found'} | |
- OPENAI_API_KEY: {'✅ Connected' if os.getenv("OPENAI_API_KEY") else '❌ Not found'} | |
**Libraries:** | |
- OpenAI: {'✅ Installed' if OPENAI_AVAILABLE else '❌ Not installed'} | |
- Requests: {'✅ Installed' if REQUESTS_AVAILABLE else '❌ Not installed'} | |
- Hugging Face Hub: {'✅ Installed' if HF_HUB_AVAILABLE else '❌ Not installed (needed for deployment)'} | |
**Available Models:** | |
- OpenAI: gpt-4.1-mini (fixed) | |
- VIDraft: Gemma-3-r1984-27B (model ID: dep89a2fld32mcm) | |
**Sample Workflows:** | |
- Basic Q&A: Simple question-answer flow (VIDraft) | |
- VIDraft: Korean language example with Gemma model | |
- Multi-Input: Combine multiple inputs for personalized output (VIDraft) | |
- Chain: Sequential processing with multiple outputs (VIDraft + OpenAI) | |
**Note**: All examples prioritize VIDraft for optimal performance. Friendli API token will be automatically configured during deployment. | |
""") | |
# State for storing workflow data | |
loaded_data = gr.State(None) | |
trigger_update = gr.State(False) | |
save_status = gr.State("Ready") | |
# ─── Dynamic Workflow Container ─── | |
with gr.Column(elem_classes=["workflow-container"]): | |
# Auto-save status indicator | |
with gr.Row(): | |
gr.Markdown("### 🎨 Visual Workflow Designer") | |
save_indicator = gr.Markdown("💾 Auto-save: Ready", elem_classes=["save-indicator"]) | |
def render_workflow(data, trigger): | |
"""동적으로 WorkflowBuilder 렌더링""" | |
workflow_value = data if data else {"nodes": [], "edges": []} | |
wb = WorkflowBuilder( | |
label="", | |
info="Drag nodes → Connect edges → Edit properties → Changes auto-save!", | |
value=workflow_value, | |
elem_id="main_workflow" | |
) | |
# WorkflowBuilder 변경사항을 자동으로 loaded_data에 저장 | |
def update_workflow_data(workflow_data): | |
"""워크플로우 데이터 업데이트 및 상태 표시""" | |
import time | |
# 즉시 저장 상태 표시 | |
return workflow_data, f"💾 Auto-save: Saved ✓ ({time.strftime('%H:%M:%S')})" | |
wb.change( | |
fn=update_workflow_data, | |
inputs=wb, | |
outputs=[loaded_data, save_indicator] | |
) | |
return wb | |
# ─── Import Section ─── | |
with gr.Accordion("📥 Import Workflow", open=True): | |
gr.Markdown("*Load an existing workflow from JSON or start with a sample template*") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
import_json_text = gr.Code( | |
language="json", | |
label="Paste JSON here", | |
lines=8, | |
value='{\n "nodes": [],\n "edges": []\n}' | |
) | |
with gr.Column(scale=1): | |
file_upload = gr.File( | |
label="Or upload JSON file", | |
file_types=[".json"], | |
type="filepath" | |
) | |
btn_load = gr.Button("📥 Load Workflow", variant="primary", size="lg") | |
# Sample buttons | |
gr.Markdown("**Sample Workflows:**") | |
with gr.Row(): | |
btn_sample_basic = gr.Button("🎯 Basic Q&A", variant="secondary", scale=1) | |
btn_sample_vidraft = gr.Button("🤖 VIDraft", variant="secondary", scale=1) | |
with gr.Row(): | |
btn_sample_multi = gr.Button("📝 Multi-Input", variant="secondary", scale=1) | |
btn_sample_chain = gr.Button("🔗 Chain", variant="secondary", scale=1) | |
# Status | |
status_text = gr.Textbox( | |
label="Status", | |
value="Ready", | |
elem_classes=["status-box"], | |
interactive=False | |
) | |
# ─── Export Section ─── | |
gr.Markdown("## 💾 Export / Live Preview") | |
gr.Markdown("*Your workflow is automatically saved. The JSON below shows your current workflow in real-time.*") | |
# Workflow info display | |
workflow_info = gr.Markdown("📊 Empty workflow", elem_classes=["workflow-info"]) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
export_preview = gr.Code( | |
language="json", | |
label="Current Workflow JSON (Live Preview)", | |
lines=8, | |
interactive=False | |
) | |
gr.Markdown("*💡 This JSON updates automatically as you modify the workflow above*") | |
with gr.Column(scale=1): | |
btn_preview = gr.Button("🔄 Force Refresh", size="lg", variant="secondary") | |
btn_download = gr.DownloadButton( | |
"💾 Download JSON", | |
size="lg", | |
variant="primary", | |
visible=True | |
) | |
# ─── Deploy Section ─── | |
with gr.Accordion("🚀 Deploy to Hugging Face Space", open=False, elem_classes=["deploy-section"]): | |
gr.Markdown(""" | |
Deploy your **current workflow** as an independent Hugging Face Space app. | |
The workflow shown in the JSON preview above will be deployed exactly as is. | |
""") | |
gr.Markdown("*⚠️ Make sure to save/finalize your workflow design before deploying!*") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
deploy_name = gr.Textbox( | |
label="App Name", | |
placeholder="My Awesome Workflow App", | |
value="My Workflow App" | |
) | |
deploy_description = gr.Textbox( | |
label="App Description", | |
placeholder="Describe what your workflow does...", | |
lines=3, | |
value="A workflow application created with MOUSE Workflow builder." | |
) | |
deploy_space_name = gr.Textbox( | |
label="Space Name (your-username/space-name)", | |
placeholder="username/my-workflow-app", | |
info="This will be the URL of your Space" | |
) | |
with gr.Column(scale=1): | |
deploy_token = gr.Textbox( | |
label="Hugging Face Token", | |
type="password", | |
placeholder="hf_...", | |
info="Get your token from huggingface.co/settings/tokens" | |
) | |
# API Keys 설정 섹션 | |
gr.Markdown("### 🔑 API Keys Configuration") | |
# FRIENDLI_TOKEN 설정 | |
friendli_token_input = gr.Textbox( | |
label="FRIENDLI_TOKEN (VIDraft/Gemma)", | |
type="password", | |
placeholder="flp_...", | |
value=os.getenv("FRIENDLI_TOKEN", ""), | |
info="Required for VIDraft. Will be added as secret." | |
) | |
# OpenAI API Key 설정 | |
openai_token_input = gr.Textbox( | |
label="OPENAI_API_KEY (Optional)", | |
type="password", | |
placeholder="sk-...", | |
value=os.getenv("OPENAI_API_KEY", ""), | |
info="Optional. Leave empty if not using OpenAI." | |
) | |
deploy_private = gr.Checkbox( | |
label="Make Space Private", | |
value=False | |
) | |
btn_deploy = gr.Button("🚀 Deploy to HF Space", variant="primary", size="lg") | |
# Deploy status | |
deploy_status = gr.Markdown("") | |
# Preview generated code | |
with gr.Accordion("📄 Preview Generated Code", open=False): | |
generated_code_preview = gr.Code( | |
language="python", | |
label="app.py (This will be deployed)", | |
lines=20 | |
) | |
# ─── UI Execution Section ─── | |
with gr.Column(elem_classes=["ui-execution-section"]): | |
gr.Markdown("## 🚀 UI Execution") | |
gr.Markdown("Test your workflow instantly! Click below to generate and run the UI from your current workflow design.") | |
btn_execute_ui = gr.Button("▶️ Generate & Run UI from Current Workflow", variant="primary", size="lg") | |
# UI execution state | |
ui_workflow_data = gr.State(None) | |
# Dynamic UI container | |
def render_execution_ui(workflow_data): | |
if not workflow_data or not workflow_data.get("nodes"): | |
gr.Markdown("*Load a workflow first, then click 'Generate & Run UI'*") | |
return | |
gr.Markdown("### 📋 Generated UI") | |
# Extract input and output nodes | |
input_nodes = [] | |
output_nodes = [] | |
for node in workflow_data.get("nodes", []): | |
node_type = node.get("type", "") | |
if node_type in ["ChatInput", "textInput", "Input", "numberInput"]: | |
input_nodes.append(node) | |
elif node_type in ["ChatOutput", "textOutput", "Output"]: | |
output_nodes.append(node) | |
elif node_type == "textNode": | |
# textNode는 중간 처리 노드로, UI에는 표시하지 않음 | |
pass | |
# Create input components | |
input_components = {} | |
if input_nodes: | |
gr.Markdown("#### 📥 Inputs") | |
for node in input_nodes: | |
node_id = node.get("id") | |
label = node.get("data", {}).get("label", node_id) | |
node_type = node.get("type") | |
# Get default value | |
template = node.get("data", {}).get("template", {}) | |
default_value = template.get("input_value", {}).get("value", "") | |
if node_type == "numberInput": | |
input_components[node_id] = gr.Number( | |
label=label, | |
value=float(default_value) if default_value else 0 | |
) | |
else: | |
input_components[node_id] = gr.Textbox( | |
label=label, | |
value=default_value, | |
lines=2, | |
placeholder="Enter your input..." | |
) | |
# Execute button | |
execute_btn = gr.Button("🎯 Execute", variant="primary") | |
# Create output components | |
output_components = {} | |
if output_nodes: | |
gr.Markdown("#### 📤 Outputs") | |
for node in output_nodes: | |
node_id = node.get("id") | |
label = node.get("data", {}).get("label", node_id) | |
output_components[node_id] = gr.Textbox( | |
label=label, | |
interactive=False, | |
lines=3 | |
) | |
# Execution log | |
gr.Markdown("#### 📊 Execution Log") | |
log_output = gr.Textbox( | |
label="Log", | |
interactive=False, | |
lines=5 | |
) | |
# Define execution handler | |
def execute_ui_workflow(*input_values): | |
# Create input dictionary | |
inputs_dict = {} | |
input_keys = list(input_components.keys()) | |
for i, key in enumerate(input_keys): | |
if i < len(input_values): | |
inputs_dict[key] = input_values[i] | |
# Check API status | |
log = "=== Workflow Execution Started ===\n" | |
log += f"Inputs provided: {len(inputs_dict)}\n" | |
# API 상태 확인 | |
vidraft_token = os.getenv("FRIENDLI_TOKEN") | |
openai_key = os.getenv("OPENAI_API_KEY") | |
log += "\nAPI Status:\n" | |
log += f"- FRIENDLI_TOKEN (VIDraft): {'✅ Found' if vidraft_token else '❌ Not found'}\n" | |
log += f"- OPENAI_API_KEY: {'✅ Found' if openai_key else '❌ Not found'}\n" | |
if not vidraft_token and not openai_key: | |
log += "\n⚠️ No API keys found. Results will be simulated.\n" | |
log += "To get real AI responses, set API keys in environment variables.\n" | |
log += "Minimum requirement: FRIENDLI_TOKEN for VIDraft\n" | |
elif vidraft_token and not openai_key: | |
log += "\n✅ VIDraft API connected - Basic functionality available\n" | |
log += "💡 Add OPENAI_API_KEY for full functionality\n" | |
log += "\n--- Processing Nodes ---\n" | |
try: | |
results = execute_workflow_simple(workflow_data, inputs_dict) | |
# Prepare outputs | |
output_values = [] | |
for node_id in output_components.keys(): | |
value = results.get(node_id, "No output") | |
output_values.append(value) | |
# Log 길이 제한 | |
display_value = value[:100] + "..." if len(str(value)) > 100 else value | |
log += f"\nOutput [{node_id}]: {display_value}\n" | |
log += "\n=== Execution Completed Successfully! ===\n" | |
output_values.append(log) | |
return output_values | |
except Exception as e: | |
error_msg = f"❌ Error: {str(e)}" | |
log += f"\n{error_msg}\n" | |
log += "=== Execution Failed ===\n" | |
return [error_msg] * len(output_components) + [log] | |
# Connect execution | |
all_inputs = list(input_components.values()) | |
all_outputs = list(output_components.values()) + [log_output] | |
execute_btn.click( | |
fn=execute_ui_workflow, | |
inputs=all_inputs, | |
outputs=all_outputs | |
) | |
# ─── Event Handlers ─── | |
# Load workflow (from text or file) | |
def load_workflow(json_text, file_obj): | |
data, status = load_json_from_text_or_file(json_text, file_obj) | |
if data: | |
# 로드 성공시 자동으로 미리보기 업데이트 | |
return data, status, json_text if not file_obj else export_pretty(data), "💾 Auto-save: Loaded ✓" | |
else: | |
return None, status, gr.update(), gr.update() | |
btn_load.click( | |
fn=load_workflow, | |
inputs=[import_json_text, file_upload], | |
outputs=[loaded_data, status_text, import_json_text, save_indicator] | |
).then( | |
fn=lambda current_trigger: not current_trigger, | |
inputs=trigger_update, | |
outputs=trigger_update | |
) | |
# Auto-load when file is uploaded | |
file_upload.change( | |
fn=load_workflow, | |
inputs=[import_json_text, file_upload], | |
outputs=[loaded_data, status_text, import_json_text, save_indicator] | |
).then( | |
fn=lambda current_trigger: not current_trigger, | |
inputs=trigger_update, | |
outputs=trigger_update | |
) | |
# Load samples | |
btn_sample_basic.click( | |
fn=lambda: (create_sample_workflow("basic"), "✅ Basic Q&A sample loaded", export_pretty(create_sample_workflow("basic")), "💾 Auto-save: Sample loaded ✓"), | |
outputs=[loaded_data, status_text, import_json_text, save_indicator] | |
).then( | |
fn=lambda current_trigger: not current_trigger, | |
inputs=trigger_update, | |
outputs=trigger_update | |
) | |
btn_sample_vidraft.click( | |
fn=lambda: (create_sample_workflow("vidraft"), "✅ VIDraft sample loaded", export_pretty(create_sample_workflow("vidraft")), "💾 Auto-save: Sample loaded ✓"), | |
outputs=[loaded_data, status_text, import_json_text, save_indicator] | |
).then( | |
fn=lambda current_trigger: not current_trigger, | |
inputs=trigger_update, | |
outputs=trigger_update | |
) | |
btn_sample_multi.click( | |
fn=lambda: (create_sample_workflow("multi_input"), "✅ Multi-input sample loaded", export_pretty(create_sample_workflow("multi_input")), "💾 Auto-save: Sample loaded ✓"), | |
outputs=[loaded_data, status_text, import_json_text, save_indicator] | |
).then( | |
fn=lambda current_trigger: not current_trigger, | |
inputs=trigger_update, | |
outputs=trigger_update | |
) | |
btn_sample_chain.click( | |
fn=lambda: (create_sample_workflow("chain"), "✅ Chain processing sample loaded", export_pretty(create_sample_workflow("chain")), "💾 Auto-save: Sample loaded ✓"), | |
outputs=[loaded_data, status_text, import_json_text, save_indicator] | |
).then( | |
fn=lambda current_trigger: not current_trigger, | |
inputs=trigger_update, | |
outputs=trigger_update | |
) | |
# Preview current workflow - 강제 새로고침 | |
def force_refresh_preview(current_data): | |
"""현재 워크플로우 데이터를 강제로 새로고침""" | |
if current_data: | |
node_count = len(current_data.get("nodes", [])) | |
edge_count = len(current_data.get("edges", [])) | |
info = f"📊 Workflow contains {node_count} nodes and {edge_count} edges" | |
return export_pretty(current_data), "💾 Auto-save: Refreshed ✓", info | |
return "No workflow data available", "💾 Auto-save: No data", "📊 Empty workflow" | |
btn_preview.click( | |
fn=force_refresh_preview, | |
inputs=loaded_data, | |
outputs=[export_preview, save_indicator, workflow_info] | |
) | |
# Download workflow는 이미 loaded_data.change에서 처리됨 | |
# Auto-update export preview when workflow changes | |
def update_preview_and_download(data): | |
"""워크플로우 변경시 미리보기와 다운로드 업데이트""" | |
if data: | |
preview = export_pretty(data) | |
download_file = export_file(data) | |
node_count = len(data.get("nodes", [])) | |
edge_count = len(data.get("edges", [])) | |
status = f"📊 Workflow contains {node_count} nodes and {edge_count} edges" | |
return preview, download_file, status | |
return "No workflow data", None, "📊 Empty workflow" | |
loaded_data.change( | |
fn=update_preview_and_download, | |
inputs=loaded_data, | |
outputs=[export_preview, btn_download, workflow_info] | |
) | |
# Generate UI execution - 현재 워크플로우 사용 | |
def prepare_ui_execution(current_data): | |
"""현재 워크플로우를 UI 실행용으로 준비""" | |
if not current_data or not current_data.get("nodes"): | |
gr.Warning("Please create a workflow first!") | |
return None | |
return current_data | |
btn_execute_ui.click( | |
fn=prepare_ui_execution, | |
inputs=loaded_data, | |
outputs=ui_workflow_data | |
) | |
# ─── Deploy Event Handlers ─── | |
# Preview generated code | |
def preview_generated_code(workflow_data, app_name, app_description): | |
if not workflow_data: | |
return "# No workflow loaded\n# Create or load a workflow first" | |
if not workflow_data.get("nodes"): | |
return "# Empty workflow\n# Add some nodes to see the generated code" | |
try: | |
code = generate_standalone_app(workflow_data, app_name, app_description) | |
return code | |
except Exception as e: | |
return f"# Error generating code\n# {str(e)}" | |
# Update preview when inputs change | |
deploy_name.change( | |
fn=preview_generated_code, | |
inputs=[loaded_data, deploy_name, deploy_description], | |
outputs=generated_code_preview | |
) | |
deploy_description.change( | |
fn=preview_generated_code, | |
inputs=[loaded_data, deploy_name, deploy_description], | |
outputs=generated_code_preview | |
) | |
# Update preview when workflow changes too | |
loaded_data.change( | |
fn=preview_generated_code, | |
inputs=[loaded_data, deploy_name, deploy_description], | |
outputs=generated_code_preview | |
) | |
# Deploy handler | |
def handle_deploy(workflow_data, app_name, app_description, hf_token, space_name, | |
friendli_token, openai_token, is_private): | |
if not workflow_data: | |
return "❌ No workflow loaded. Please create or load a workflow first." | |
if not workflow_data.get("nodes"): | |
return "❌ Empty workflow. Please add some nodes to your workflow." | |
if not hf_token: | |
return "❌ Hugging Face token is required. Get yours at huggingface.co/settings/tokens" | |
if not space_name: | |
return "❌ Space name is required. Format: username/space-name" | |
# Validate space name format | |
if "/" not in space_name: | |
return "❌ Invalid space name format. Use: username/space-name" | |
# Check if huggingface-hub is available | |
if not HF_HUB_AVAILABLE: | |
return "❌ huggingface-hub library not installed. Install with: pip install huggingface-hub" | |
# Show deploying status | |
yield "🔄 Deploying to Hugging Face Space..." | |
# Prepare API keys | |
api_keys = {} | |
# Always include FRIENDLI_TOKEN (even if empty) | |
if not friendli_token: | |
friendli_token = os.getenv("FRIENDLI_TOKEN", "") | |
if friendli_token: | |
api_keys["FRIENDLI_TOKEN"] = friendli_token | |
# Include OpenAI key if provided | |
if not openai_token: | |
openai_token = os.getenv("OPENAI_API_KEY", "") | |
if openai_token: | |
api_keys["OPENAI_API_KEY"] = openai_token | |
# Deploy | |
result = deploy_to_huggingface( | |
workflow_data=workflow_data, | |
app_name=app_name, | |
app_description=app_description, | |
hf_token=hf_token, | |
space_name=space_name, | |
is_private=is_private, | |
api_keys=api_keys | |
) | |
if result["success"]: | |
# Build secrets status message | |
secrets_msg = "\n\n**🔑 API Keys Status:**" | |
if result.get("added_secrets"): | |
for secret in result["added_secrets"]: | |
secrets_msg += f"\n- {secret}: ✅ Successfully added" | |
if result.get("failed_secrets"): | |
for failure in result["failed_secrets"]: | |
secrets_msg += f"\n- {failure}: ❌ Failed to add" | |
# Check for missing required keys | |
providers = result.get("providers_used", []) | |
if "VIDraft" in providers and "FRIENDLI_TOKEN" not in result.get("added_secrets", []): | |
secrets_msg += "\n- FRIENDLI_TOKEN: ⚠️ Required for VIDraft but not provided" | |
if "OpenAI" in providers and "OPENAI_API_KEY" not in result.get("added_secrets", []): | |
secrets_msg += "\n- OPENAI_API_KEY: ⚠️ Required for OpenAI but not provided" | |
yield f"""✅ **Deployment Successful!** | |
🎉 Your workflow has been deployed to: | |
[{result['space_url']}]({result['space_url']}) | |
⏱️ The Space will be ready in a few minutes. Building usually takes 2-5 minutes. | |
{secrets_msg} | |
📝 **Providers Detected in Workflow:** | |
{', '.join(result.get('providers_used', [])) if result.get('providers_used') else 'No LLM providers detected'} | |
🚀 **Default Configuration:** | |
The app is configured to prioritize VIDraft (Gemma-3-r1984-27B) for optimal performance. | |
📚 **Space Management:** | |
- To update secrets: Go to Space settings → Repository secrets | |
- To restart Space: Go to Space settings → Factory reboot | |
- To make changes: Edit files directly in the Space repository | |
""" | |
else: | |
yield f"❌ **Deployment Failed**\n\nError: {result['error']}" | |
btn_deploy.click( | |
fn=handle_deploy, | |
inputs=[loaded_data, deploy_name, deploy_description, deploy_token, deploy_space_name, | |
friendli_token_input, openai_token_input, deploy_private], | |
outputs=deploy_status | |
) | |
# ------------------------------------------------------------------- | |
# 🚀 실행 | |
# ------------------------------------------------------------------- | |
if __name__ == "__main__": | |
demo.launch(server_name="0.0.0.0", show_error=True) |