LeroyDyer's picture
Rename WorkflowDesigner.py to app.py
e2bfd97 verified
from dataclasses import asdict, dataclass, field
import os
import pickle
from typing import Dict, List, Optional, Any
import gradio as gr
import json
import tempfile
import asyncio
import uuid
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Any
from openai import AsyncOpenAI
import base64
# Complete Hierarchical Component definitions with Implementation Details
COMPONENT_INFO = {
"SYSTEM": {
"description": "Top-level system architecture containing all components",
"color": "#333333",
"icon": "🌐",
"shape": "folder",
"sub_components": ["AGENT", "USER", "TOOL", "DATA", "PROCESSOR", "ROUTER", "INFRASTRUCTURE", "CONFIG"]
},
# ===================================
# AGENT: Autonomous reasoning units
# ===================================
"AGENT": {
"description": "Autonomous reasoning and decision-making units",
"color": "#4CAF50",
"icon": "🤖",
"shape": "rect",
"sub_components": ["REASONING_AGENT", "ACTION_AGENT", "PLANNER_AGENT", "REACT_AGENT", "MULTI_AGENT"]
},
"REASONING_AGENT": {
"shape": "rect",
"color": "#4CAF50",
"icon": "🧠",
"description": [
"• Performs complex reasoning tasks",
"• Uses chain-of-thought or tree-of-thought",
"• Can break down complex problems",
"• Maintains reasoning traces"
],
"implementation": {
"python_snippet": """
class ReasoningAgent:
def __init__(self, model, tools=None):
self.model = model
self.tools = tools or []
self.reasoning_history = []
async def process(self, query, context=None):
# Chain-of-thought reasoning
reasoning_steps = await self.generate_reasoning_steps(query, context)
self.reasoning_history.extend(reasoning_steps)
# Final answer generation
answer = await self.synthesize_answer(reasoning_steps)
return answer
async def generate_reasoning_steps(self, query, context):
prompt = f\"\"\"Analyze this problem step by step:
Query: {query}
Context: {context}
Break down your reasoning:\"\"\"
return await self.model.generate(prompt)
""",
"prompt_template": """
You are a reasoning agent. Analyze the user's query step by step:
Query: {user_input}
Context: {context}
Please:
1. Break down the problem into logical steps
2. Consider different perspectives
3. Evaluate evidence and constraints
4. Synthesize a comprehensive answer
Reasoning steps:
""",
"dependencies": ["openai", "langchain", "pydantic"],
"config": {
"model": "gpt-4",
"temperature": 0.1,
"max_tokens": 2000
}
}
},
"ACTION_AGENT": {
"shape": "rect",
"color": "#4CAF50",
"icon": "⚡",
"description": [
"• Executes actions using available tools",
"• Monitors action outcomes",
"• Handles errors and retries",
"• Updates state after actions"
],
"implementation": {
"python_snippet": """
class ActionAgent:
def __init__(self, tools, model):
self.tools = {tool.name: tool for tool in tools}
self.model = model
async def execute_action(self, action_request):
tool_name, parameters = self.parse_action(action_request)
if tool_name in self.tools:
return await self.tools[tool_name].execute(parameters)
else:
raise ValueError(f"Unknown tool: {tool_name}")
def parse_action(self, action_request):
# Parse action from model response
return action_request['tool'], action_request['parameters']
""",
"dependencies": ["pydantic", "asyncio"],
"config": {
"retry_attempts": 3,
"timeout_seconds": 30
}
}
},
"PLANNER_AGENT": {
"shape": "rect",
"color": "#4CAF50",
"icon": "📋",
"description": [
"• Creates multi-step plans to achieve goals",
"• Decomposes complex tasks",
"• Optimizes execution order",
"• Monitors plan progress"
],
"implementation": {
"python_snippet": """
class PlannerAgent:
def __init__(self, model):
self.model = model
self.plans = []
async def create_plan(self, goal, context):
plan_prompt = f\"\"\"Create a step-by-step plan to achieve:
Goal: {goal}
Context: {context}
Return a list of actionable steps:\"\"\"
plan_steps = await self.model.generate(plan_prompt)
plan = Plan(steps=plan_steps, goal=goal)
self.plans.append(plan)
return plan
""",
"dependencies": ["pydantic"],
"config": {
"max_steps": 20,
"planning_temperature": 0.3
}
}
},
"REACT_AGENT": {
"shape": "rect",
"color": "#4CAF50",
"icon": "🔄",
"description": [
"• Implements ReAct (Reason + Act) framework",
"• Alternates reasoning and action steps",
"• Maintains conversation history",
"• Handles tool interactions"
],
"implementation": {
"python_snippet": """
class ReActAgent:
def __init__(self, model, tools):
self.model = model
self.tools = tools
self.conversation_history = []
async def step(self, input_text):
# Generate thought
thought = await self.generate_thought(input_text)
# Decide on action
action = await self.decide_action(thought)
# Execute action if needed
if action:
observation = await self.execute_action(action)
return {"thought": thought, "action": action, "observation": observation}
else:
return {"thought": thought, "answer": await self.generate_answer(thought)}
""",
"dependencies": ["asyncio", "langchain"],
"config": {
"max_iterations": 10,
"react_temperature": 0.7
}
}
},
"MULTI_AGENT": {
"shape": "rect",
"color": "#4CAF50",
"icon": "👥",
"description": [
"• Coordinates multiple specialized agents",
"• Manages agent communication",
"• Distributes tasks among agents",
"• Aggregates results from agents"
],
"implementation": {
"python_snippet": """
class MultiAgentSystem:
def __init__(self, agents, orchestrator):
self.agents = {agent.name: agent for agent in agents}
self.orchestrator = orchestrator
async def coordinate(self, task):
# Assign task to appropriate agents
agent_assignments = await self.orchestrator.assign(task)
# Execute in parallel
results = await asyncio.gather(*[
self.agents[agent_name].process(subtask)
for agent_name, subtask in agent_assignments.items()
])
return self.orchestrator.aggregate(results)
""",
"dependencies": ["asyncio", "concurrent.futures"],
"config": {
"max_concurrent_agents": 10,
"communication_protocol": "message_queue"
}
}
},
# ===================================
# USER: Interaction interfaces
# ===================================
"USER": {
"description": "User interaction points and interfaces",
"color": "#9C27B0",
"icon": "👤",
"shape": "ellipse",
"sub_components": ["USER_INPUT", "USER_OUTPUT", "MULTIMODAL_INTERFACE"]
},
"USER_INPUT": {
"shape": "ellipse",
"color": "#9C27B0",
"icon": "⌨️",
"description": [
"• Accepts text, voice, or gesture input",
"• Validates and sanitizes input",
"• Converts to structured format",
"• Handles multiple input channels"
],
"implementation": {
"python_snippet": """
class UserInputHandler:
def __init__(self):
self.input_validators = {
'text': self.validate_text,
'voice': self.validate_voice,
'gesture': self.validate_gesture
}
async def process_input(self, input_type, raw_input):
validator = self.input_validators.get(input_type)
if validator:
return await validator(raw_input)
else:
raise ValueError(f"Unsupported input type: {input_type}")
async def validate_text(self, text):
# Sanitize and structure text input
return {"type": "text", "content": text.strip()}
""",
"dependencies": ["validators", "pydantic"],
"config": {
"max_input_length": 10000,
"allowed_input_types": ["text", "voice", "gesture"]
}
}
},
"USER_OUTPUT": {
"shape": "ellipse",
"color": "#9C27B0",
"icon": "🔊",
"description": [
"• Formats responses for user consumption",
"• Supports multiple output formats",
"• Handles accessibility features",
"• Manages response timing"
],
"implementation": {
"python_snippet": """
class UserOutputHandler:
def __init__(self):
self.formatters = {
'text': self.format_text,
'audio': self.format_audio,
'visual': self.format_visual
}
async def deliver_response(self, response_data, output_format):
formatter = self.formatters.get(output_format)
if formatter:
formatted_response = await formatter(response_data)
return await self.send_to_user(formatted_response)
async def format_text(self, data):
# Format response as structured text
return {"format": "text", "content": data}
""",
"dependencies": ["jinja2", "markdown"],
"config": {
"default_format": "text",
"max_response_length": 5000
}
}
},
"MULTIMODAL_INTERFACE": {
"shape": "ellipse",
"color": "#9C27B0",
"icon": "🖼️",
"description": [
"• Handles multiple input/output modalities",
"• Integrates text, image, audio, video",
"• Manages modality conversion",
"• Supports rich media responses"
],
"implementation": {
"python_snippet": """
class MultimodalInterface:
def __init__(self):
self.input_processors = {
'image': ImageProcessor(),
'audio': AudioProcessor(),
'text': TextProcessor()
}
self.output_formatters = {
'rich_text': RichTextFormatter(),
'multimedia': MultimediaFormatter()
}
async def process_multimodal_input(self, inputs):
processed_inputs = {}
for input_type, input_data in inputs.items():
processor = self.input_processors.get(input_type)
if processor:
processed_inputs[input_type] = await processor.process(input_data)
return processed_inputs
""",
"dependencies": ["pillow", "pyaudio", "opencv-python"],
"config": {
"supported_modalities": ["text", "image", "audio", "video"],
"max_file_size_mb": 50
}
}
},
# ===================================
# TOOL: External functions and capabilities
# ===================================
"TOOL": {
"description": "External functions and capabilities",
"color": "#795548",
"icon": "🔧",
"shape": "hexagon",
"sub_components": ["MCP_TOOL", "API_TOOL", "LOCAL_TOOL", "AGENT_TOOL", "FUNCTION_TOOL"]
},
"MCP_TOOL": {
"shape": "hexagon",
"color": "#795548",
"icon": "🔌",
"description": [
"• Model Context Protocol server",
"• Standardized tool interface",
"• Dynamic tool discovery",
"• Secure resource access"
],
"implementation": {
"python_snippet": """
# MCP Server implementation
from mcp import MCPServer, Tool
class FileSystemTool:
@Tool
async def read_file(self, path: str) -> str:
\"\"\"Read content from a file\"\"\"
with open(path, 'r') as f:
return f.read()
@Tool
async def write_file(self, path: str, content: str) -> str:
\"\"\"Write content to a file\"\"\"
with open(path, 'w') as f:
f.write(content)
return f"Written to {path}"
# MCP Client usage
async def use_mcp_tool(agent, tool_name, parameters):
result = await agent.use_tool(tool_name, parameters)
return result
""",
"protocol_spec": {
"version": "1.0",
"transport": ["stdio", "sse"],
"authentication": ["none", "bearer"]
},
"example_tools": ["filesystem", "calculator", "web_search", "database"]
}
},
"API_TOOL": {
"shape": "hexagon",
"color": "#795548",
"icon": "🔗",
"description": [
"• Wraps external REST/gRPC APIs",
"• Handles authentication and rate limits",
"• Manages request/response mapping",
"• Provides error handling and retries"
],
"implementation": {
"python_snippet": """
class APITool:
def __init__(self, base_url, auth_token=None, rate_limit=10):
self.base_url = base_url
self.auth_token = auth_token
self.rate_limit = rate_limit
self.session = aiohttp.ClientSession()
async def call(self, endpoint, method='GET', data=None):
headers = {"Authorization": f"Bearer {self.auth_token}"} if self.auth_token else {}
url = f"{self.base_url}/{endpoint}"
async with self.session.request(method, url, json=data, headers=headers) as response:
return await response.json()
""",
"dependencies": ["aiohttp", "requests"],
"config": {
"timeout": 30,
"max_retries": 3,
"retry_delay": 1.0
}
}
},
"LOCAL_TOOL": {
"shape": "hexagon",
"color": "#795548",
"icon": "💻",
"description": [
"• Locally executed utility functions",
"• File operations, math calculations",
"• System utilities and helpers",
"• Fast execution without network calls"
],
"implementation": {
"python_snippet": """
class LocalTool:
@staticmethod
async def file_operations(action, **kwargs):
if action == 'read':
with open(kwargs['path'], 'r') as f:
return f.read()
elif action == 'write':
with open(kwargs['path'], 'w') as f:
f.write(kwargs['content'])
return f"File written to {kwargs['path']}"
@staticmethod
async def math_operations(operation, **kwargs):
if operation == 'add':
return kwargs['a'] + kwargs['b']
elif operation == 'multiply':
return kwargs['a'] * kwargs['b']
""",
"dependencies": ["os", "math"],
"config": {
"max_execution_time": 5.0,
"allowed_operations": ["file", "math", "system"]
}
}
},
"AGENT_TOOL": {
"shape": "hexagon",
"color": "#795548",
"icon": "🛠️",
"description": [
"• Allows one agent to act as a tool for another",
"• Wraps agent functionality for external use",
"• Handles agent-to-agent communication",
"• Manages agent state and context"
],
"implementation": {
"python_snippet": """
class AgentTool:
def __init__(self, agent):
self.agent = agent
async def execute(self, query, context=None):
# Wrap agent execution as a tool call
result = await self.agent.process(query, context)
return {
"result": result,
"agent_name": self.agent.name,
"execution_time": time.time()
}
""",
"dependencies": ["asyncio", "time"],
"config": {
"max_concurrent_calls": 5,
"timeout_seconds": 60
}
}
},
"FUNCTION_TOOL": {
"shape": "hexagon",
"color": "#795548",
"icon": "🧮",
"description": [
"• Generic callable function exposed to agents",
"• Wraps Python functions for tool use",
"• Handles parameter validation",
"• Provides type safety and documentation"
],
"implementation": {
"python_snippet": """
from pydantic import BaseModel, create_model
class FunctionTool:
def __init__(self, func, description, param_schema=None):
self.func = func
self.description = description
self.param_schema = param_schema or self._infer_schema(func)
async def execute(self, **kwargs):
validated_params = self.param_schema(**kwargs)
return await self.func(**validated_params.dict())
def _infer_schema(self, func):
# Infer schema from function signature
sig = inspect.signature(func)
fields = {}
for name, param in sig.parameters.items():
fields[name] = (param.annotation, param.default if param.default != param.empty else ...)
return create_model(f"{func.__name__}Params", **fields)
""",
"dependencies": ["pydantic", "inspect"],
"config": {
"max_params": 10,
"validation_enabled": True
}
}
},
# ===================================
# DATA: Storage and knowledge systems
# ===================================
"DATA": {
"description": "Data sources and storage systems",
"color": "#009688",
"icon": "💾",
"shape": "cylinder",
"sub_components": ["KNOWLEDGE_BASE", "VECTOR_DB", "DOCUMENT_STORE", "CACHE", "MEMORY"]
},
"KNOWLEDGE_BASE": {
"shape": "cylinder",
"color": "#009688",
"icon": "📘",
"description": [
"• Curated domain-specific facts and rules",
"• Structured knowledge representation",
"• Supports inference and reasoning",
"• Maintains consistency and accuracy"
],
"implementation": {
"python_snippet": """
class KnowledgeBase:
def __init__(self, storage_backend):
self.storage = storage_backend
self.index = {}
async def query(self, query_text, context=None):
# Query knowledge base with optional context
results = await self.storage.search(query_text)
return self._format_results(results)
async def update(self, fact, metadata=None):
# Add or update knowledge fact
await self.storage.insert(fact, metadata)
self._update_index(fact)
""",
"dependencies": ["sqlite3", "nltk"],
"config": {
"max_facts": 100000,
"update_frequency": "daily"
}
}
},
"VECTOR_DB": {
"shape": "cylinder",
"color": "#009688",
"icon": "🔍",
"description": [
"• Embedding-based database for semantic search",
"• Stores vector representations of text",
"• Enables similarity-based retrieval",
"• Supports semantic understanding"
],
"implementation": {
"python_snippet": """
import numpy as np
from sentence_transformers import SentenceTransformer
class VectorDB:
def __init__(self, embedding_model="all-MiniLM-L6-v2"):
self.model = SentenceTransformer(embedding_model)
self.vectors = {}
self.metadata = {}
async def add_document(self, doc_id, text, metadata=None):
embedding = self.model.encode(text)
self.vectors[doc_id] = embedding
self.metadata[doc_id] = metadata or {}
async def search(self, query, top_k=5):
query_embedding = self.model.encode(query)
similarities = []
for doc_id, vector in self.vectors.items():
similarity = np.dot(query_embedding, vector) / (
np.linalg.norm(query_embedding) * np.linalg.norm(vector)
)
similarities.append((doc_id, similarity))
return sorted(similarities, key=lambda x: x[1], reverse=True)[:top_k]
""",
"dependencies": ["sentence-transformers", "numpy"],
"config": {
"embedding_model": "all-MiniLM-L6-v2",
"max_documents": 10000
}
}
},
"DOCUMENT_STORE": {
"shape": "cylinder",
"color": "#009688",
"icon": "🗂️",
"description": [
"• Raw document repository (PDFs, web pages, etc.)",
"• Handles various document formats",
"• Provides document parsing and extraction",
"• Manages document lifecycle and metadata"
],
"implementation": {
"python_snippet": """
class DocumentStore:
def __init__(self, storage_path):
self.storage_path = storage_path
self.parsers = {
'.pdf': self._parse_pdf,
'.txt': self._parse_text,
'.docx': self._parse_docx
}
async def store_document(self, filename, content):
# Parse and store document with metadata
ext = os.path.splitext(filename)[1].lower()
parser = self.parsers.get(ext)
if parser:
parsed_content = await parser(content)
# Store in database with metadata
return await self._save_to_db(filename, parsed_content)
async def _parse_pdf(self, content):
# Extract text from PDF
import PyPDF2
pdf_reader = PyPDF2.PdfReader(content)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
return text
""",
"dependencies": ["PyPDF2", "python-docx"],
"config": {
"supported_formats": [".pdf", ".txt", ".docx", ".html"],
"max_file_size_mb": 100
}
}
},
"CACHE": {
"shape": "cylinder",
"color": "#009688",
"icon": "⏱️",
"description": [
"• Temporary fast-access storage for responses or embeddings",
"• Implements LRU or TTL eviction policies",
"• Reduces computation and API costs",
"• Improves response times"
],
"implementation": {
"python_snippet": """
import time
from collections import OrderedDict
class Cache:
def __init__(self, max_size=1000, ttl_seconds=3600):
self.cache = OrderedDict()
self.max_size = max_size
self.ttl = ttl_seconds
async def get(self, key):
if key in self.cache:
value, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl:
return value
else:
del self.cache[key]
return None
async def set(self, key, value):
if len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = (value, time.time())
""",
"dependencies": ["time", "collections"],
"config": {
"max_size": 1000,
"ttl_seconds": 3600,
"eviction_policy": "lru"
}
}
},
"MEMORY": {
"shape": "cylinder",
"color": "#009688",
"icon": "🧠",
"description": [
"• Short-term context memory (conversation history, scratchpad)",
"• Maintains session state and context",
"• Supports conversation continuity",
"• Manages memory lifecycle"
],
"implementation": {
"python_snippet": """
class Memory:
def __init__(self, max_context_length=2000):
self.conversation_history = []
self.scratchpad = {}
self.max_context_length = max_context_length
async def add_interaction(self, user_input, agent_response):
interaction = {
"timestamp": time.time(),
"user": user_input,
"agent": agent_response
}
self.conversation_history.append(interaction)
self._trim_history()
def _trim_history(self):
# Trim history to maintain context length
total_length = sum(len(str(item)) for item in self.conversation_history)
while total_length > self.max_context_length and len(self.conversation_history) > 1:
removed = self.conversation_history.pop(0)
total_length -= len(str(removed))
""",
"dependencies": ["time"],
"config": {
"max_context_length": 2000,
"history_retention_hours": 24
}
}
},
# ===================================
# PROCESSOR: Data processing units
# ===================================
"PROCESSOR": {
"description": "Data processing and transformation units",
"color": "#2196F3",
"icon": "⚙️",
"shape": "rect",
"sub_components": ["QUERY_PROCESSOR", "CONTENT_RETRIEVAL", "PROMPT_TEMPLATE", "RESPONSE_FORMATTER"]
},
"QUERY_PROCESSOR": {
"shape": "rect",
"color": "#2196F3",
"icon": "🔎",
"description": [
"• Parses and enriches incoming queries",
"• Extracts intent and entities",
"• Normalizes query structure",
"• Handles query validation"
],
"implementation": {
"python_snippet": """
class QueryProcessor:
def __init__(self):
self.intent_classifier = IntentClassifier()
self.entity_extractor = EntityExtractor()
async def process_query(self, query_text):
# Classify intent and extract entities
intent = await self.intent_classifier.classify(query_text)
entities = await self.entity_extractor.extract(query_text)
return {
"original_query": query_text,
"intent": intent,
"entities": entities,
"processed_query": self._normalize_query(query_text, entities)
}
def _normalize_query(self, query, entities):
# Normalize query for downstream processing
normalized = query
for entity, value in entities.items():
normalized = normalized.replace(value, f"[{entity}]")
return normalized
""",
"dependencies": ["spacy", "transformers"],
"config": {
"max_query_length": 1000,
"confidence_threshold": 0.7
}
}
},
"CONTENT_RETRIEVAL": {
"shape": "rect",
"color": "#2196F3",
"icon": "📤",
"description": [
"• Fetches relevant content from data stores",
"• Implements semantic and keyword search",
"• Ranks and filters retrieved content",
"• Handles multi-source retrieval"
],
"implementation": {
"python_snippet": """
class ContentRetrieval:
def __init__(self, data_sources):
self.data_sources = data_sources
async def retrieve(self, query, top_k=5, sources=None):
all_results = []
for source_name, source in self.data_sources.items():
if sources is None or source_name in sources:
results = await source.search(query, top_k)
all_results.extend(results)
# Rank and deduplicate results
ranked_results = self._rank_results(all_results, query)
return ranked_results[:top_k]
def _rank_results(self, results, query):
# Implement ranking algorithm
return sorted(results, key=lambda x: x.get('relevance_score', 0), reverse=True)
""",
"dependencies": ["numpy", "scikit-learn"],
"config": {
"top_k": 5,
"max_sources": 10,
"relevance_threshold": 0.5
}
}
},
"PROMPT_TEMPLATE": {
"shape": "rect",
"color": "#2196F3",
"icon": "📝",
"description": [
"• Template-based prompt construction",
"• Supports variable substitution",
"• Handles different prompt formats",
"• Manages prompt versioning"
],
"implementation": {
"python_snippet": """
from jinja2 import Template
class PromptTemplate:
def __init__(self, template_string):
self.template = Template(template_string)
async def format(self, **kwargs):
return self.template.render(**kwargs)
@classmethod
def load_from_file(cls, file_path):
with open(file_path, 'r') as f:
template_string = f.read()
return cls(template_string)
def validate_variables(self, required_vars):
# Validate that all required variables are provided
pass
""",
"dependencies": ["jinja2"],
"config": {
"default_template": "You are a helpful assistant. User: {query}",
"max_template_length": 5000
}
}
},
"RESPONSE_FORMATTER": {
"shape": "rect",
"color": "#2196F3",
"icon": "📄",
"description": [
"• Structures final output (JSON, XML, markdown, etc.)",
"• Applies formatting rules and styles",
"• Validates response structure",
"• Supports multiple output formats"
],
"implementation": {
"python_snippet": """
class ResponseFormatter:
def __init__(self):
self.formatters = {
'json': self._format_json,
'xml': self._format_xml,
'markdown': self._format_markdown,
'text': self._format_text
}
async def format(self, data, format_type='json'):
formatter = self.formatters.get(format_type)
if formatter:
return formatter(data)
else:
raise ValueError(f"Unsupported format: {format_type}")
def _format_json(self, data):
import json
return json.dumps(data, indent=2)
""",
"dependencies": ["json", "xml.etree.ElementTree"],
"config": {
"default_format": "json",
"max_output_length": 10000
}
}
},
# ===================================
# ROUTER: Decision points and workflow routing
# ===================================
"ROUTER": {
"description": "Decision points and workflow routing",
"color": "#FF9800",
"icon": "🎯",
"shape": "diamond",
"sub_components": ["INTENT_DISCOVERY", "MODEL_SELECTOR", "WORKFLOW_ROUTER", "VALIDATOR"]
},
"INTENT_DISCOVERY": {
"shape": "diamond",
"color": "#FF9800",
"icon": "🎯",
"description": [
"• Identifies user intent from input",
"• Uses machine learning classification",
"• Handles intent confidence scoring",
"• Supports intent hierarchy"
],
"implementation": {
"python_snippet": """
class IntentDiscovery:
def __init__(self, model_path):
self.model = self.load_model(model_path)
async def discover_intent(self, text):
# Classify intent using trained model
predictions = await self.model.predict(text)
top_intent = max(predictions, key=predictions.get)
confidence = predictions[top_intent]
return {
"intent": top_intent,
"confidence": confidence,
"all_predictions": predictions
}
""",
"dependencies": ["transformers", "torch"],
"config": {
"confidence_threshold": 0.8,
"fallback_intent": "unknown"
}
}
},
"MODEL_SELECTOR": {
"shape": "diamond",
"color": "#FF9800",
"icon": "🧠",
"description": [
"• Selects appropriate model based on task",
"• Considers task complexity and cost",
"• Handles model availability and load",
"• Supports A/B testing of models"
],
"implementation": {
"python_snippet": """
class ModelSelector:
def __init__(self, models):
self.models = models
self.model_performance = {}
async def select_model(self, task_description, context=None):
# Select best model based on task requirements
suitable_models = self._filter_suitable_models(task_description)
# Choose based on performance metrics and availability
best_model = self._select_best_model(suitable_models)
return best_model
def _filter_suitable_models(self, task_description):
# Filter models based on task compatibility
return [model for model in self.models if model.can_handle(task_description)]
""",
"dependencies": ["numpy"],
"config": {
"selection_strategy": "performance_weighted",
"max_model_candidates": 5
}
}
},
"WORKFLOW_ROUTER": {
"shape": "diamond",
"color": "#FF9800",
"icon": "🔄",
"description": [
"• Routes requests through appropriate workflows",
"• Manages workflow state and transitions",
"• Handles parallel and sequential execution",
"• Supports workflow versioning"
],
"implementation": {
"python_snippet": """
class WorkflowRouter:
def __init__(self, workflows):
self.workflows = workflows
self.current_executions = {}
async def route(self, request, workflow_name=None):
if workflow_name:
workflow = self.workflows.get(workflow_name)
else:
workflow = await self._auto_select_workflow(request)
execution_id = str(uuid.uuid4())
self.current_executions[execution_id] = workflow
result = await workflow.execute(request)
del self.current_executions[execution_id]
return result
""",
"dependencies": ["uuid", "asyncio"],
"config": {
"max_concurrent_workflows": 100,
"workflow_timeout": 300
}
}
},
"VALIDATOR": {
"shape": "diamond",
"color": "#FF9800",
"icon": "✅",
"description": [
"• Validates inputs, outputs, and intermediate results",
"• Implements schema and business rule validation",
"• Handles data quality checks",
"• Provides validation feedback"
],
"implementation": {
"python_snippet": """
from pydantic import BaseModel, ValidationError
class Validator:
def __init__(self, schema_class: BaseModel):
self.schema_class = schema_class
async def validate(self, data):
try:
validated_data = self.schema_class(**data)
return {
"valid": True,
"data": validated_data.dict(),
"errors": []
}
except ValidationError as e:
return {
"valid": False,
"data": None,
"errors": e.errors()
}
""",
"dependencies": ["pydantic"],
"config": {
"strict_validation": True,
"validation_timeout": 10
}
}
},
# ===================================
# INFRASTRUCTURE: System services
# ===================================
"INFRASTRUCTURE": {
"description": "System infrastructure and services",
"color": "#FF5722",
"icon": "🌐",
"shape": "rect",
"sub_components": ["PROVIDER", "MONITOR", "FALLBACK", "ORCHESTRATOR"]
},
"PROVIDER": {
"shape": "rect",
"color": "#FF5722",
"icon": "🌐",
"description": [
"• API connection to LLM service",
"• Manages authentication and rate limits",
"• Handles retries and error recovery",
"• Tracks usage and costs"
],
"implementation": {
"python_snippet": """
class LLMProvider:
def __init__(self, base_url: str, api_key: str = None, model: str = "default"):
self.base_url = base_url
self.api_key = api_key
self.model = model
self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
self.usage_tracker = UsageTracker()
async def generate(self, prompt: str, **kwargs) -> str:
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
**kwargs
)
self.usage_tracker.record_usage(response.usage)
return response.choices[0].message.content
except Exception as e:
raise ProviderError(f"Generation failed: {e}")
def get_cost_estimate(self) -> float:
return self.usage_tracker.calculate_cost()
""",
"supported_providers": {
"openai": {"base_url": "https://api.openai.com/v1", "models": ["gpt-4", "gpt-3.5-turbo"]},
"anthropic": {"base_url": "https://api.anthropic.com/v1", "models": ["claude-3", "claude-2"]},
"local": {"base_url": "http://localhost:1234/v1", "models": ["local-model"]},
"azure": {"base_url": "https://your-resource.openai.azure.com/", "models": ["gpt-4", "gpt-35-turbo"]}
},
"config_template": {
"base_url": "https://api.openai.com/v1",
"api_key": "your-api-key-here",
"model": "gpt-4",
"max_retries": 3,
"timeout": 30
}
}
},
"MONITOR": {
"shape": "rect",
"color": "#FF5722",
"icon": "📊",
"description": [
"• Tracks system performance and metrics",
"• Monitors resource usage and errors",
"• Provides health checks and alerts",
"• Supports logging and analytics"
],
"implementation": {
"python_snippet": """
import time
import logging
from collections import defaultdict
class Monitor:
def __init__(self):
self.metrics = defaultdict(list)
self.logger = logging.getLogger(__name__)
async def record_metric(self, name, value, timestamp=None):
if timestamp is None:
timestamp = time.time()
self.metrics[name].append((timestamp, value))
async def get_health_status(self):
recent_errors = [m for m in self.metrics['errors'] if time.time() - m[0] < 300]
avg_response_time = self._calculate_avg_time('response_time', 300)
return {
"status": "healthy" if len(recent_errors) == 0 else "degraded",
"recent_errors": len(recent_errors),
"avg_response_time": avg_response_time
}
""",
"dependencies": ["logging", "time"],
"config": {
"metrics_retention_hours": 24,
"alert_thresholds": {"error_rate": 0.05, "response_time": 5.0}
}
}
},
"FALLBACK": {
"shape": "rect",
"color": "#FF5722",
"icon": "🔄",
"description": [
"• Provides alternative execution paths",
"• Handles primary system failures",
"• Implements graceful degradation",
"• Maintains service availability"
],
"implementation": {
"python_snippet": """
class FallbackHandler:
def __init__(self, primary_handler, fallback_handlers):
self.primary = primary_handler
self.fallbacks = fallback_handlers
async def execute_with_fallback(self, *args, **kwargs):
try:
return await self.primary(*args, **kwargs)
except PrimaryError as e:
self.logger.warning(f"Primary failed: {e}, trying fallbacks")
for fallback in self.fallbacks:
try:
return await fallback(*args, **kwargs)
except FallbackError:
continue
raise ServiceUnavailableError("All fallbacks exhausted")
""",
"dependencies": ["logging"],
"config": {
"max_fallback_attempts": 3,
"fallback_timeout": 10
}
}
},
"ORCHESTRATOR": {
"shape": "rect",
"color": "#FF5722",
"icon": "🎬",
"description": [
"• Coordinates complex multi-step processes",
"• Manages component interactions",
"• Handles state and error propagation",
"• Supports distributed execution"
],
"implementation": {
"python_snippet": """
class Orchestrator:
def __init__(self, components):
self.components = components
self.state = {}
async def orchestrate(self, workflow_definition, input_data):
current_state = input_data.copy()
for step in workflow_definition.steps:
component = self.components[step.component]
step_result = await component.execute(current_state, step.config)
current_state.update(step_result)
return current_state
""",
"dependencies": ["asyncio"],
"config": {
"max_workflow_steps": 100,
"step_timeout": 60
}
}
}
}
# Hierarchical Component definitions
COMPONENT_HIERARCHY = {
"HIGH_LEVEL": {
"AGENT": {
"description": "Autonomous reasoning and decision-making units",
"color": "#4CAF50",
"icon": "🤖",
"shape": "rect",
"sub_components": ["REASONING_AGENT", "ACTION_AGENT", "PLANNER_AGENT", "REACT_AGENT", "MULTI_AGENT"]
},
"USER": {
"description": "User interaction points and interfaces",
"color": "#9C27B0",
"icon": "👤",
"shape": "ellipse",
"sub_components": ["USER_INPUT", "USER_OUTPUT", "MULTIMODAL_INTERFACE"]
},
"TOOL": {
"description": "External functions and capabilities",
"color": "#795548",
"icon": "🔧",
"shape": "hexagon",
"sub_components": ["MCP_TOOL", "API_TOOL", "LOCAL_TOOL", "AGENT_TOOL", "FUNCTION_TOOL"]
},
"DATA": {
"description": "Data sources and storage systems",
"color": "#009688",
"icon": "💾",
"shape": "cylinder",
"sub_components": ["KNOWLEDGE_BASE", "VECTOR_DB", "DOCUMENT_STORE", "CACHE", "MEMORY"]
},
"PROCESSOR": {
"description": "Data processing and transformation units",
"color": "#2196F3",
"icon": "⚙️",
"shape": "rect",
"sub_components": ["QUERY_PROCESSOR", "CONTENT_RETRIEVAL", "PROMPT_TEMPLATE", "RESPONSE_FORMATTER"]
},
"ROUTER": {
"description": "Decision points and workflow routing",
"color": "#FF9800",
"icon": "🎯",
"shape": "diamond",
"sub_components": ["INTENT_DISCOVERY", "MODEL_SELECTOR", "WORKFLOW_ROUTER", "VALIDATOR"]
},
"INFRASTRUCTURE": {
"description": "System infrastructure and services",
"color": "#FF5722",
"icon": "🌐",
"shape": "rect",
"sub_components": ["PROVIDER", "MONITOR", "FALLBACK", "ORCHESTRATOR"]
}
}
}
# Enhanced Example workflows
EXAMPLE_WORKFLOWS = {
"Simple Chat Agent": {
"description": "Basic conversational agent with single LLM call",
"nodes": [
{"id": "user_1", "type": "USER_INPUT", "x": 150, "y": 200},
{"id": "agent_1", "type": "REASONING_AGENT", "x": 400, "y": 200},
{"id": "provider_1", "type": "PROVIDER", "x": 650, "y": 200},
{"id": "output_1", "type": "USER_OUTPUT", "x": 900, "y": 200}
],
"connections": [
{"from": "user_1", "to": "agent_1"},
{"from": "agent_1", "to": "provider_1"},
{"from": "provider_1", "to": "output_1"}
]
},
"Intent-Driven Routing": {
"description": "Routes to specialized agents based on user intent",
"nodes": [
{"id": "user_1", "type": "USER_INPUT", "x": 150, "y": 300},
{"id": "intent_1", "type": "INTENT_DISCOVERY", "x": 400, "y": 300},
{"id": "agent_1", "type": "REASONING_AGENT", "x": 650, "y": 150},
{"id": "agent_2", "type": "ACTION_AGENT", "x": 650, "y": 450},
{"id": "output_1", "type": "USER_OUTPUT", "x": 900, "y": 300}
],
"connections": [
{"from": "user_1", "to": "intent_1"},
{"from": "intent_1", "to": "agent_1"},
{"from": "intent_1", "to": "agent_2"},
{"from": "agent_1", "to": "output_1"},
{"from": "agent_2", "to": "output_1"}
]
},
"RAG Pipeline": {
"description": "Retrieval-Augmented Generation with context",
"nodes": [
{"id": "user_1", "type": "USER_INPUT", "x": 100, "y": 250},
{"id": "query_1", "type": "QUERY_PROCESSOR", "x": 250, "y": 250},
{"id": "content_1", "type": "CONTENT_RETRIEVAL", "x": 400, "y": 250},
{"id": "prompt_1", "type": "PROMPT_TEMPLATE", "x": 550, "y": 250},
{"id": "agent_1", "type": "REASONING_AGENT", "x": 700, "y": 250},
{"id": "output_1", "type": "USER_OUTPUT", "x": 900, "y": 250}
],
"connections": [
{"from": "user_1", "to": "query_1"},
{"from": "query_1", "to": "content_1"},
{"from": "content_1", "to": "prompt_1"},
{"from": "prompt_1", "to": "agent_1"},
{"from": "agent_1", "to": "output_1"}
]
},
"Multi-Agent with Tools": {
"description": "Coordinated agents with tool access and validation",
"nodes": [
{"id": "user_1", "type": "USER_INPUT", "x": 100, "y": 300},
{"id": "intent_1", "type": "INTENT_DISCOVERY", "x": 280, "y": 300},
{"id": "agent_1", "type": "REASONING_AGENT", "x": 460, "y": 150},
{"id": "agent_2", "type": "ACTION_AGENT", "x": 460, "y": 450},
{"id": "tool_1", "type": "MCP_TOOL", "x": 640, "y": 150},
{"id": "tool_2", "type": "API_TOOL", "x": 640, "y": 450},
{"id": "validator_1", "type": "VALIDATOR", "x": 820, "y": 300},
{"id": "output_1", "type": "USER_OUTPUT", "x": 980, "y": 300}
],
"connections": [
{"from": "user_1", "to": "intent_1"},
{"from": "intent_1", "to": "agent_1"},
{"from": "intent_1", "to": "agent_2"},
{"from": "agent_1", "to": "tool_1"},
{"from": "agent_2", "to": "tool_2"},
{"from": "tool_1", "to": "validator_1"},
{"from": "tool_2", "to": "validator_1"},
{"from": "validator_1", "to": "output_1"}
]
},
"Advanced RAG with Cache": {
"description": "Enhanced RAG with caching and monitoring",
"nodes": [
{"id": "user_1", "type": "USER_INPUT", "x": 100, "y": 200},
{"id": "query_1", "type": "QUERY_PROCESSOR", "x": 250, "y": 200},
{"id": "cache_1", "type": "CACHE", "x": 400, "y": 100},
{"id": "knowledge_1", "type": "KNOWLEDGE_BASE", "x": 400, "y": 300},
{"id": "prompt_1", "type": "PROMPT_TEMPLATE", "x": 550, "y": 200},
{"id": "agent_1", "type": "REASONING_AGENT", "x": 700, "y": 200},
{"id": "monitor_1", "type": "MONITOR", "x": 850, "y": 100},
{"id": "output_1", "type": "USER_OUTPUT", "x": 850, "y": 300}
],
"connections": [
{"from": "user_1", "to": "query_1"},
{"from": "query_1", "to": "cache_1"},
{"from": "query_1", "to": "knowledge_1"},
{"from": "cache_1", "to": "prompt_1"},
{"from": "knowledge_1", "to": "prompt_1"},
{"from": "prompt_1", "to": "agent_1"},
{"from": "agent_1", "to": "monitor_1"},
{"from": "agent_1", "to": "output_1"}
]
},
"MCP Tool Agent": {
"description": "Agent using MCP tools for extended capabilities",
"nodes": [
{"id": "user_1", "type": "USER_INPUT", "x": 100, "y": 250},
{"id": "agent_1", "type": "REACT_AGENT", "x": 300, "y": 250},
{"id": "mcp_tool_1", "type": "MCP_TOOL", "x": 500, "y": 150},
{"id": "mcp_tool_2", "type": "MCP_TOOL", "x": 500, "y": 350},
{"id": "memory_1", "type": "MEMORY", "x": 700, "y": 250},
{"id": "output_1", "type": "USER_OUTPUT", "x": 900, "y": 250}
],
"connections": [
{"from": "user_1", "to": "agent_1"},
{"from": "agent_1", "to": "mcp_tool_1"},
{"from": "agent_1", "to": "mcp_tool_2"},
{"from": "mcp_tool_1", "to": "agent_1"},
{"from": "mcp_tool_2", "to": "agent_1"},
{"from": "agent_1", "to": "memory_1"},
{"from": "agent_1", "to": "output_1"}
]
}
}
@dataclass
class ComponentData:
"""Complete component information"""
type: str
shape: str
color: str
icon: str
description: List[str]
category: Optional[str] = None
sub_category: Optional[str] = None
@dataclass
class AgentNode:
id: str
type: str
x: int
y: int
component_data: ComponentData = field(default_factory=lambda: ComponentData("", "", "", "", []))
@dataclass
class Connection:
from_node: str
to_node: str
class CustomNodeManager:
def __init__(self, storage_path: str = "custom_nodes.pkl"):
self.storage_path = storage_path
self.custom_nodes: Dict[str, Dict[str, Any]] = {}
self.load_custom_nodes()
def load_custom_nodes(self):
"""Load custom nodes from storage"""
if os.path.exists(self.storage_path):
try:
with open(self.storage_path, 'rb') as f:
self.custom_nodes = pickle.load(f)
except Exception as e:
print(f"Error loading custom nodes: {e}")
self.custom_nodes = {}
def save_custom_nodes(self):
"""Save custom nodes to storage"""
try:
with open(self.storage_path, 'wb') as f:
pickle.dump(self.custom_nodes, f)
except Exception as e:
print(f"Error saving custom nodes: {e}")
def create_custom_node(self, name: str, config: Dict[str, Any]):
"""Create a new custom node"""
node_id = f"custom_{name.lower().replace(' ', '_')}"
self.custom_nodes[node_id] = {
"id": node_id,
"name": name,
"type": "CUSTOM",
"config": config,
"created_at": __import__('datetime').datetime.now().isoformat()
}
self.save_custom_nodes()
return node_id
def get_custom_node_info(self, node_id: str) -> Dict[str, Any]:
"""Get information for a custom node"""
return self.custom_nodes.get(node_id, {})
def delete_custom_node(self, node_id: str):
"""Delete a custom node"""
if node_id in self.custom_nodes:
del self.custom_nodes[node_id]
self.save_custom_nodes()
# Initialize custom node manager
custom_node_manager = CustomNodeManager()
class WorkflowDesigner:
def __init__(self):
self.nodes: Dict[str, AgentNode] = {}
self.connections: List[Connection] = []
self.node_counter = 0
self.selected_node: Optional[str] = None
def select_node(self, node_id: str) -> None:
"""Select a node and deselect others"""
self.selected_node = node_id if node_id in self.nodes else None
def move_selected_node(self, dx: int, dy: int) -> None:
"""Move selected node by delta"""
if self.selected_node and self.selected_node in self.nodes:
node = self.nodes[self.selected_node]
node.x = max(0, node.x + dx)
node.y = max(0, node.y + dy)
def add_custom_node(self, custom_config: Dict[str, Any]) -> AgentNode:
"""Add a custom node to the workflow"""
self.node_counter += 1
node_id = f"custom_{self.node_counter}"
# Create custom node configuration
custom_node_config = {
"shape": custom_config.get("shape", "rect"),
"color": custom_config.get("color", "#666666"),
"icon": custom_config.get("icon", "🔧"),
"description": custom_config.get("description", ["Custom node"]),
"implementation": custom_config.get("implementation", {})
}
# Add to COMPONENT_INFO for rendering
COMPONENT_INFO[node_id] = custom_node_config
col = len(self.nodes) % 3
row = len(self.nodes) // 3
x_pos = 200 + (col * 350)
y_pos = 150 + (row * 200)
node = AgentNode(
id=node_id,
type=node_id, # Use node_id as type for custom nodes
x=x_pos,
y=y_pos
)
self.nodes[node_id] = node
self.selected_node = node_id
return node
def get_workflow_json(self) -> Dict[str, Any]:
"""Get complete workflow data including component implementations"""
nodes_data = []
for node in self.nodes.values():
node_info = COMPONENT_INFO.get(node.type, {})
nodes_data.append({
"id": node.id,
"type": node.type,
"x": node.x,
"y": node.y,
"component_info": node_info,
"implementation": node_info.get("implementation", {})
})
return {
"nodes": nodes_data,
"connections": [asdict(c) for c in self.connections],
"selected_node": self.selected_node,
"metadata": {
"total_nodes": len(self.nodes),
"total_connections": len(self.connections),
"generated_at": __import__('datetime').datetime.now().isoformat()
}
}
def add_node(self, node_type: str) -> AgentNode:
self.node_counter += 1
node_id = f"{node_type}_{self.node_counter}"
col = len(self.nodes) % 3
row = len(self.nodes) // 3
x_pos = 200 + (col * 350)
y_pos = 150 + (row * 200)
# Get complete component information
component_info = COMPONENT_INFO.get(node_type, {
"shape": "rect",
"color": "#666666",
"icon": "❓",
"description": ["Unknown component type"]
})
# Create component data with full information
component_data = ComponentData(
type=node_type,
shape=component_info["shape"],
color=component_info["color"],
icon=component_info["icon"],
description=component_info["description"],
category=self._find_component_category(node_type),
sub_category=self._find_component_sub_category(node_type)
)
node = AgentNode(
id=node_id,
type=node_type,
x=x_pos,
y=y_pos,
component_data=component_data
)
self.nodes[node_id] = node
self.selected_node = node_id
return node
def _find_component_category(self, node_type: str) -> Optional[str]:
"""Find which high-level category this component belongs to"""
for category, components in COMPONENT_HIERARCHY["HIGH_LEVEL"].items():
if node_type == category or node_type in components.get('sub_components', []):
return category
return None
def _find_component_sub_category(self, node_type: str) -> Optional[str]:
"""Determine if this is a high-level or sub-component"""
for category, components in COMPONENT_HIERARCHY["HIGH_LEVEL"].items():
if node_type == category:
return "HIGH_LEVEL"
elif node_type in components.get('sub_components', []):
return "SUB_COMPONENT"
return None
def load_example(self, example_name: str):
if example_name not in EXAMPLE_WORKFLOWS:
return
example = EXAMPLE_WORKFLOWS[example_name]
self.nodes.clear()
self.connections.clear()
for node_data in example["nodes"]:
node_type = node_data["type"]
# Get complete component information for example nodes too
component_info = COMPONENT_INFO.get(node_type, {
"shape": "rect",
"color": "#666666",
"icon": "❓",
"description": ["Unknown component type"]
})
component_data = ComponentData(
type=node_type,
shape=component_info["shape"],
color=component_info["color"],
icon=component_info["icon"],
description=component_info["description"],
category=self._find_component_category(node_type),
sub_category=self._find_component_sub_category(node_type)
)
node = AgentNode(
id=node_data["id"],
type=node_type,
x=node_data["x"],
y=node_data["y"],
component_data=component_data
)
self.nodes[node.id] = node
for conn_data in example["connections"]:
conn = Connection(
from_node=conn_data["from"],
to_node=conn_data["to"]
)
self.connections.append(conn)
if self.nodes:
self.selected_node = list(self.nodes.keys())[0]
def get_workflow_json(self) -> Dict[str, Any]:
"""Get complete workflow data including full component information"""
return {
"metadata": {
"total_nodes": len(self.nodes),
"total_connections": len(self.connections),
"selected_node": self.selected_node,
"generated_with": "Agent Workflow Designer"
},
"nodes": [
{
"id": node.id,
"type": node.type,
"position": {"x": node.x, "y": node.y},
"component_data": {
"type": node.component_data.type,
"shape": node.component_data.shape,
"color": node.component_data.color,
"icon": node.component_data.icon,
"description": node.component_data.description,
"category": node.component_data.category,
"sub_category": node.component_data.sub_category
}
}
for node in self.nodes.values()
],
"connections": [
{
"from": conn.from_node,
"to": conn.to_node
}
for conn in self.connections
]
}
def render_svg(self) -> str:
"""Render workflow as beautiful SVG with selection support"""
if not self.nodes:
return '''
<svg width="1200" height="600" style="border-radius: 12px;">
<defs>
<linearGradient id="bg" x1="0%" y1="0%" x2="100%" y2="100%">
<stop offset="0%" style="stop-color:#667eea;stop-opacity:1" />
<stop offset="100%" style="stop-color:#764ba2;stop-opacity:1" />
</linearGradient>
</defs>
<rect width="1200" height="600" fill="url(#bg)"/>
<text x="600" y="280" text-anchor="middle" fill="white" font-size="32" font-weight="bold">🚀 Start Building Your Workflow</text>
<text x="600" y="320" text-anchor="middle" fill="white" font-size="18" opacity="0.9">Add components from the library on the left</text>
</svg>
'''
width = 1200
height = max(600, max([n.y for n in self.nodes.values()], default=0) + 200)
svg_parts = [
f'<svg width="{width}" height="{height}" style="border-radius: 12px; cursor: pointer;">',
'<defs>',
'<linearGradient id="bg" x1="0%" y1="0%" x2="100%" y2="100%">',
'<stop offset="0%" style="stop-color:#667eea;stop-opacity:1" />',
'<stop offset="100%" style="stop-color:#764ba2;stop-opacity:1" />',
'</linearGradient>',
'<marker id="arrowhead" markerWidth="12" markerHeight="12" refX="11" refY="3" orient="auto">',
'<polygon points="0 0, 12 3, 0 6" fill="white" opacity="0.9"/>',
'</marker>',
'<filter id="glow">',
'<feGaussianBlur stdDeviation="3" result="coloredBlur"/>',
'<feMerge><feMergeNode in="coloredBlur"/><feMergeNode in="SourceGraphic"/></feMerge>',
'</filter>',
'<filter id="shadow">',
'<feDropShadow dx="0" dy="4" stdDeviation="4" flood-opacity="0.3"/>',
'</filter>',
'<filter id="selected-glow">',
'<feGaussianBlur stdDeviation="5" result="coloredBlur"/>',
'<feMerge><feMergeNode in="coloredBlur"/><feMergeNode in="SourceGraphic"/></feMerge>',
'</filter>',
'</defs>',
'<rect width="100%" height="100%" fill="url(#bg)"/>'
]
# Draw connections with glow
for conn in self.connections:
if conn.from_node in self.nodes and conn.to_node in self.nodes:
from_node = self.nodes[conn.from_node]
to_node = self.nodes[conn.to_node]
from_x = from_node.x + 85
from_y = from_node.y + 60
to_x = to_node.x + 15
to_y = to_node.y + 60
mid_x = (from_x + to_x) / 2
# Glow path
svg_parts.append(
f'<path d="M {from_x} {from_y} C {mid_x} {from_y}, {mid_x} {to_y}, {to_x} {to_y}" '
f'stroke="white" stroke-width="8" fill="none" opacity="0.3" filter="url(#glow)"/>'
)
# Main path
svg_parts.append(
f'<path d="M {from_x} {from_y} C {mid_x} {from_y}, {mid_x} {to_y}, {to_x} {to_y}" '
f'stroke="white" stroke-width="3" fill="none" opacity="0.8" marker-end="url(#arrowhead)"/>'
)
# Draw nodes with selection support
for node in self.nodes.values():
# Use the stored component data instead of looking it up
shape = node.component_data.shape
color = node.component_data.color
icon = node.component_data.icon
cx = node.x + 85
cy = node.y + 60
label = node.id.replace("_", " ").title()
is_selected = (node.id == self.selected_node)
selection_glow = 'filter="url(#selected-glow)"' if is_selected else 'filter="url(#shadow)"'
selection_stroke = "6" if is_selected else "4"
# Node background with selection highlight
if shape == "ellipse":
svg_parts.append(
f'<ellipse cx="{cx}" cy="{cy}" rx="80" ry="50" '
f'fill="white" stroke="{color}" stroke-width="{selection_stroke}" {selection_glow} '
f'class="node" id="node_{node.id}" style="cursor: move;"/>'
)
elif shape == "diamond":
size = 70
points = f"{cx},{cy-size} {cx+size},{cy} {cx},{cy+size} {cx-size},{cy}"
svg_parts.append(
f'<polygon points="{points}" '
f'fill="white" stroke="{color}" stroke-width="{selection_stroke}" {selection_glow} '
f'class="node" id="node_{node.id}" style="cursor: move;"/>'
)
elif shape == "hexagon":
w, h = 70, 50
points = f"{cx-w},{cy-h/2} {cx-w/2},{cy-h} {cx+w/2},{cy-h} {cx+w},{cy-h/2} {cx+w},{cy+h/2} {cx+w/2},{cy+h} {cx-w/2},{cy+h} {cx-w},{cy+h/2}"
svg_parts.append(
f'<polygon points="{points}" '
f'fill="white" stroke="{color}" stroke-width="{selection_stroke}" {selection_glow} '
f'class="node" id="node_{node.id}" style="cursor: move;"/>'
)
elif shape == "cylinder":
svg_parts.append(
f'<ellipse cx="{cx}" cy="{cy-35}" rx="70" ry="18" '
f'fill="white" stroke="{color}" stroke-width="3"/>'
)
svg_parts.append(
f'<rect x="{cx-70}" y="{cy-35}" width="140" height="70" '
f'fill="white" stroke="none"/>'
)
svg_parts.append(
f'<line x1="{cx-70}" y1="{cy-35}" x2="{cx-70}" y2="{cy+35}" '
f'stroke="{color}" stroke-width="3"/>'
)
svg_parts.append(
f'<line x1="{cx+70}" y1="{cy-35}" x2="{cx+70}" y2="{cy+35}" '
f'stroke="{color}" stroke-width="3"/>'
)
svg_parts.append(
f'<ellipse cx="{cx}" cy="{cy+35}" rx="70" ry="18" '
f'fill="white" stroke="{color}" stroke-width="3" {selection_glow} '
f'class="node" id="node_{node.id}" style="cursor: move;"/>'
)
else: # rect
svg_parts.append(
f'<rect x="{cx-80}" y="{cy-45}" width="160" height="90" rx="12" '
f'fill="white" stroke="{color}" stroke-width="{selection_stroke}" {selection_glow} '
f'class="node" id="node_{node.id}" style="cursor: move;"/>'
)
# Icon
svg_parts.append(
f'<text x="{cx}" y="{cy-10}" text-anchor="middle" font-size="36">{icon}</text>'
)
# Label
svg_parts.append(
f'<text x="{cx}" y="{cy+25}" text-anchor="middle" '
f'fill="#333" font-size="13" font-weight="600">{label}</text>'
)
# Add JavaScript for drag and drop
svg_parts.append('''
<script>
// Node selection and drag functionality
let selectedNode = null;
let isDragging = false;
let startX, startY;
let originalX, originalY;
// Add click handlers for all nodes
document.querySelectorAll('.node').forEach(node => {
node.addEventListener('click', (e) => {
e.stopPropagation();
const nodeId = node.id.replace('node_', '');
selectNode(nodeId);
});
node.addEventListener('mousedown', startDrag);
});
// Click on canvas to deselect
document.querySelector('svg').addEventListener('click', (e) => {
if (e.target.tagName === 'svg') {
selectedNode = null;
updateSelection();
}
});
function selectNode(nodeId) {
selectedNode = nodeId;
updateSelection();
// Notify Gradio about selection
if (window.gradio_api) {
window.gradio_api('select_node', nodeId);
}
}
function updateSelection() {
// Visual feedback handled by server-side re-render
// This will trigger when we call back to Python
}
function startDrag(e) {
if (!selectedNode) return;
isDragging = true;
startX = e.clientX;
startY = e.clientY;
const node = document.getElementById('node_' + selectedNode);
const transform = node.getAttribute('transform') || '';
const match = transform.match(/translate\\(([^,]+),([^)]+)\\)/);
originalX = match ? parseFloat(match[1]) : 0;
originalY = match ? parseFloat(match[2]) : 0;
document.addEventListener('mousemove', doDrag);
document.addEventListener('mouseup', stopDrag);
e.preventDefault();
}
function doDrag(e) {
if (!isDragging || !selectedNode) return;
const dx = e.clientX - startX;
const dy = e.clientY - startY;
const node = document.getElementById('node_' + selectedNode);
node.setAttribute('transform', `translate(${originalX + dx}, ${originalY + dy})`);
}
function stopDrag(e) {
if (!isDragging || !selectedNode) return;
const dx = e.clientX - startX;
const dy = e.clientY - startY;
// Final position update to Gradio
if (window.gradio_api && (Math.abs(dx) > 5 || Math.abs(dy) > 5)) {
window.gradio_api('move_node', {
node_id: selectedNode,
dx: Math.round(dx),
dy: Math.round(dy)
});
}
isDragging = false;
document.removeEventListener('mousemove', doDrag);
document.removeEventListener('mouseup', stopDrag);
}
</script>
''')
svg_parts.append('</svg>')
return '\n'.join(svg_parts)
workflow = WorkflowDesigner()
# Report generation class
class WorkflowReporter:
def __init__(self):
try:
self.client = AsyncOpenAI(base_url="http://localhost:1234/v1", api_key="lm-studio")
except Exception as e:
print("LM Studio client init failed:", e)
async def generate_report(self, workflow_json: str) -> str:
prompt = f"""
Generate a comprehensive system design report based on the following workflow:
{workflow_json}
The report should include a detailed repost and system breif with full examples and implimentations where possible and explanaion of requirement in cases where the workflow is complexed and need further deconstruction, as well as example usages :
1. A high-level system overview
2. User stories for each component or connection expetation
3. Use case briefs for each component interaction and component relationship
4. Pseudocode for the implementation for each component and for the overall workflow
5. Component responsibilities and interfaces
6. Data flow description and example use-cases
"""
try:
response = await self.client.chat.completions.create(
model="leroydyer/qwen/qwen3-0.6b-q4_k_m.gguf",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2048
)
return response.choices[0].message.content
except Exception as e:
return f"Error generating report: {str(e)}"
# Initialize reporter
reporter = WorkflowReporter()
def create_workflow_ui():
with gr.Blocks(title="Agent Workflow Designer", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🎓 Agentic System Workflow Designer")
gr.Markdown("**Educational tool for planning and understanding agent architectures**")
# Hidden components for JavaScript communication
select_node_trigger = gr.Textbox(visible=False)
move_node_trigger = gr.Textbox(visible=False)
# Define all UI components first
with gr.Row():
# Left Sidebar - Component Library
with gr.Column(scale=1):
gr.Markdown("## 📚 Component Library")
# Store component buttons for later connection
component_buttons = []
# High-level component accordions
for category, components in COMPONENT_HIERARCHY["HIGH_LEVEL"].items():
with gr.Accordion(f"{components['icon']} {category}", open=False):
# High-level component button
high_level_btn = gr.Button(
f"{components['icon']} {category}",
size="sm",
variant="primary"
)
component_buttons.append((high_level_btn, category))
# Sub-components
if components['sub_components']:
gr.Markdown("**Sub-components:**")
for sub_comp in components['sub_components']:
sub_info = COMPONENT_INFO[sub_comp]
sub_btn = gr.Button(
f"{sub_info['icon']} {sub_comp.replace('_', ' ').title()}",
size="sm"
)
component_buttons.append((sub_btn, sub_comp))
gr.Markdown("---")
gr.Markdown("## 🔗 Connect Nodes")
from_node = gr.Dropdown(label="From", choices=[], interactive=True)
to_node = gr.Dropdown(label="To", choices=[], interactive=True)
connect_btn = gr.Button("➡️ Connect", variant="secondary")
gr.Markdown("---")
gr.Markdown("## 📋 Examples")
example_dropdown = gr.Dropdown(
choices=list(EXAMPLE_WORKFLOWS.keys()),
label="Load Example Workflow",
interactive=True
)
load_example_btn = gr.Button("📥 Load Example")
gr.Markdown("---")
with gr.Row():
download_json_btn = gr.Button("💾 Download JSON", variant="primary", size="sm")
download_svg_btn = gr.Button("🖼️ Download SVG", variant="primary", size="sm")
clear_btn = gr.Button("🗑️ Clear All", variant="stop", size="sm")
# Output for multiple downloadable files
download_files = gr.Files(label="📥 Download Files", visible=True)
# Center - Canvas
with gr.Column(scale=3):
gr.Markdown("## 🎨 Workflow Canvas")
gr.Markdown("**💡 Tip:** Click nodes to select, then drag or use arrow keys")
canvas = gr.HTML()
gr.Markdown("## 📖 Component Information")
component_info = gr.Markdown("Select a component to see its description")
# Right Sidebar - Movement Controls
with gr.Column(scale=1):
gr.Markdown("## 🎯 Selection & Movement")
gr.Markdown("**Navigation:**")
with gr.Row():
select_prev_btn = gr.Button("⬅️ Prev", size="sm")
select_next_btn = gr.Button("➡️ Next", size="sm")
deselect_btn = gr.Button("❌ Deselect", size="sm")
gr.Markdown("**Selected Node:**")
selected_node_info = gr.Markdown("No node selected")
gr.Markdown("**Move Selected:**")
with gr.Row():
move_left_btn = gr.Button("⬅️", size="sm")
move_up_btn = gr.Button("⬆️", size="sm")
move_down_btn = gr.Button("⬇️", size="sm")
move_right_btn = gr.Button("➡️", size="sm")
gr.Markdown("**Movement Modes:**")
with gr.Row():
move_fine_btn = gr.Button("🎯 Fine (5px)", size="sm")
move_coarse_btn = gr.Button("🚀 Coarse (50px)", size="sm")
gr.Markdown("---")
with gr.Accordion("🗑️ Delete Selected", open=False):
delete_selected_btn = gr.Button("❌ Delete Selected Node", variant="stop", size="sm")
gr.Markdown("---")
with gr.Accordion("📊 Workflow Data", open=False):
json_output = gr.Code(language="json", label="Workflow JSON", lines=10)
gr.Markdown("---")
with gr.Accordion("📋 Generate Report", open=False):
report_btn = gr.Button("📄 Generate System Report", variant="primary")
report_output = gr.Textbox(label="System Design Report", lines=15, interactive=False)
download_report_btn = gr.Button("📝 Download Report", variant="secondary", size="sm")
# Now define all the handler functions after UI components are defined
def get_full_state():
svg = workflow.render_svg()
node_choices = list(workflow.nodes.keys())
workflow_json = json.dumps({
"nodes": [asdict(n) for n in workflow.nodes.values()],
"connections": [asdict(c) for c in workflow.connections],
"selected_node": workflow.selected_node
}, indent=2)
selected_info = "**No node selected**"
comp_info = "Select a component to see its description"
if workflow.selected_node and workflow.selected_node in workflow.nodes:
node = workflow.nodes[workflow.selected_node]
info = COMPONENT_INFO[node.type]
selected_info = f"**Selected:** `{node.id}` ({info['icon']} {node.type.replace('_', ' ').title()}) at position ({node.x}, {node.y})"
comp_info = f"### {node.type.replace('_', ' ').title()} {info['icon']}\n\n" + "\n".join(info["description"])
return svg, workflow_json, node_choices, selected_info, comp_info
def add_node_handler(node_type):
node = workflow.add_node(node_type)
svg, wf_json, choices, selected_info, comp_info = get_full_state()
return svg, wf_json, gr.Dropdown(choices=choices), gr.Dropdown(choices=choices), selected_info, comp_info
# Connect all component buttons
for btn, comp_type in component_buttons:
btn.click(
lambda ct=comp_type: add_node_handler(ct),
outputs=[canvas, json_output, from_node, to_node, selected_node_info, component_info]
)
# Selection handlers
def select_node_handler(node_id):
if node_id:
workflow.select_node(node_id)
svg, wf_json, choices, selected_info, comp_info = get_full_state()
return svg, wf_json, selected_info, comp_info
return workflow.render_svg(), "", "No node selected", "Select a component to see its description"
def select_next_node():
if workflow.nodes:
node_ids = list(workflow.nodes.keys())
if not workflow.selected_node:
workflow.selected_node = node_ids[0]
else:
current_idx = node_ids.index(workflow.selected_node)
next_idx = (current_idx + 1) % len(node_ids)
workflow.selected_node = node_ids[next_idx]
svg, wf_json, choices, selected_info, comp_info = get_full_state()
return svg, wf_json, selected_info, comp_info
def select_prev_node():
if workflow.nodes:
node_ids = list(workflow.nodes.keys())
if not workflow.selected_node:
workflow.selected_node = node_ids[-1]
else:
current_idx = node_ids.index(workflow.selected_node)
prev_idx = (current_idx - 1) % len(node_ids)
workflow.selected_node = node_ids[prev_idx]
svg, wf_json, choices, selected_info, comp_info = get_full_state()
return svg, wf_json, selected_info, comp_info
def deselect_all():
workflow.selected_node = None
svg, wf_json, choices, selected_info, comp_info = get_full_state()
return svg, wf_json, selected_info, comp_info
# Delete selected node
def delete_selected_node():
if workflow.selected_node and workflow.selected_node in workflow.nodes:
workflow.connections = [
c for c in workflow.connections
if c.from_node != workflow.selected_node and c.to_node != workflow.selected_node
]
del workflow.nodes[workflow.selected_node]
workflow.selected_node = None
svg, wf_json, choices, selected_info, comp_info = get_full_state()
return svg, wf_json, gr.Dropdown(choices=choices), gr.Dropdown(choices=choices), selected_info, comp_info
# Movement handlers
def move_selected_node(dx, dy):
if workflow.selected_node:
workflow.move_selected_node(dx, dy)
svg, wf_json, choices, selected_info, comp_info = get_full_state()
return svg, wf_json, selected_info, comp_info
return workflow.render_svg(), "", "No node selected", component_info.value
# Connect selection events
select_node_trigger.change(
select_node_handler,
inputs=[select_node_trigger],
outputs=[canvas, json_output, selected_node_info, component_info]
)
select_next_btn.click(select_next_node, outputs=[canvas, json_output, selected_node_info, component_info])
select_prev_btn.click(select_prev_node, outputs=[canvas, json_output, selected_node_info, component_info])
deselect_btn.click(deselect_all, outputs=[canvas, json_output, selected_node_info, component_info])
# Movement buttons
move_left_btn.click(lambda: move_selected_node(-20, 0), outputs=[canvas, json_output, selected_node_info, component_info])
move_right_btn.click(lambda: move_selected_node(20, 0), outputs=[canvas, json_output, selected_node_info, component_info])
move_up_btn.click(lambda: move_selected_node(0, -20), outputs=[canvas, json_output, selected_node_info, component_info])
move_down_btn.click(lambda: move_selected_node(0, 20), outputs=[canvas, json_output, selected_node_info, component_info])
move_fine_btn.click(lambda: move_selected_node(-5, 0), outputs=[canvas, json_output, selected_node_info, component_info])
move_coarse_btn.click(lambda: move_selected_node(-50, 0), outputs=[canvas, json_output, selected_node_info, component_info])
# Delete button
delete_selected_btn.click(
delete_selected_node,
outputs=[canvas, json_output, from_node, to_node, selected_node_info, component_info]
)
# Drag handler
def handle_node_drag(move_data):
try:
data = json.loads(move_data)
node_id = data.get('node_id')
dx = data.get('dx', 0)
dy = data.get('dy', 0)
if node_id and node_id in workflow.nodes:
workflow.select_node(node_id)
workflow.move_selected_node(dx, dy)
svg, wf_json, choices, selected_info, comp_info = get_full_state()
return svg, wf_json, selected_info, comp_info
except Exception as e:
print("Drag error:", e)
return workflow.render_svg(), "", "Drag completed", component_info.value
move_node_trigger.change(
handle_node_drag,
inputs=[move_node_trigger],
outputs=[canvas, json_output, selected_node_info, component_info]
)
# Connection handler
def connect_nodes(from_n, to_n):
if from_n and to_n and from_n != to_n:
existing = [c for c in workflow.connections if c.from_node == from_n and c.to_node == to_n]
if not existing:
workflow.connections.append(Connection(from_node=from_n, to_node=to_n))
svg, wf_json, choices, selected_info, comp_info = get_full_state()
return svg, wf_json, selected_info
return workflow.render_svg(), "", selected_node_info.value
connect_btn.click(connect_nodes, inputs=[from_node, to_node], outputs=[canvas, json_output, selected_node_info])
# Example loading
def load_example_handler(example_name):
if example_name:
workflow.load_example(example_name)
svg, wf_json, choices, selected_info, comp_info = get_full_state()
desc = EXAMPLE_WORKFLOWS[example_name]["description"]
info_text = f"### {example_name}\n\n{desc}"
return svg, wf_json, gr.Dropdown(choices=choices), gr.Dropdown(choices=choices), selected_info, info_text
return workflow.render_svg(), "", gr.Dropdown(choices=[]), gr.Dropdown(choices=[]), "No node selected", "Select a component to see its description"
load_example_btn.click(
load_example_handler,
inputs=[example_dropdown],
outputs=[canvas, json_output, from_node, to_node, selected_node_info, component_info]
)
# Unified download handler (returns list of files)
def download_all_files():
file_list = []
fid = str(uuid.uuid4())
# JSON
json_data = { ... }
json_path = tempfile.mktemp(suffix=f"_{fid}.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(json_data, f, indent=2)
file_list.append(json_path)
# SVG
svg_path = tempfile.mktemp(suffix=f"_{fid}.svg")
with open(svg_path, "w", encoding="utf-8") as f:
f.write(workflow.render_svg())
file_list.append(svg_path)
return file_list
# In your download_json function, replace:
def download_json():
fid = str(uuid.uuid4())
# Use the new method that includes full component data
json_data = workflow.get_workflow_json()
json_path = tempfile.mktemp(suffix=f"_{fid}.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(json_data, f, indent=2)
return [json_path]
# Download SVG only
def download_svg():
fid = str(uuid.uuid4())
svg_content = workflow.render_svg()
svg_path = tempfile.mktemp(suffix=f"_{fid}.svg")
with open(svg_path, "w", encoding="utf-8") as f: # ←← KEY CHANGE: encoding="utf-8"
f.write(svg_content)
return [svg_path]
# Report generation (sync wrapper)
def sync_generate_report():
workflow_data = {
"nodes": [asdict(n) for n in workflow.nodes.values()],
"connections": [asdict(c) for c in workflow.connections],
"selected_node": workflow.selected_node
}
json_str = json.dumps(workflow_data, indent=2)
try:
report = asyncio.run(reporter.generate_report(json_str))
except Exception as e:
report = f"Failed to generate report: {e}"
return report
def download_report():
report_text = sync_generate_report()
fid = str(uuid.uuid4())
txt_path = tempfile.mktemp(suffix=f"_report_{fid}.txt")
with open(txt_path, "w", encoding="utf-8") as f: # ←←
f.write(f"Agentic Workflow Design Report\nGenerated on: {str(__import__('datetime').datetime.now())}\n\n")
f.write(report_text)
return [txt_path]
# Attach handlers
download_json_btn.click(download_json, outputs=[download_files])
download_svg_btn.click(download_svg, outputs=[download_files])
report_btn.click(sync_generate_report, outputs=[report_output])
download_report_btn.click(download_report, outputs=[download_files])
# Clear handler
def clear_all():
workflow.nodes.clear()
workflow.connections.clear()
workflow.node_counter = 0
workflow.selected_node = None
svg = workflow.render_svg()
return (
svg,
"{}",
gr.Dropdown(choices=[]),
gr.Dropdown(choices=[]),
"No node selected",
"Canvas cleared. Ready to build!"
)
clear_btn.click(clear_all, outputs=[canvas, json_output, from_node, to_node, selected_node_info, component_info])
# Initialize with JavaScript support
def init_app():
svg = workflow.render_svg()
js_code = '''
<script>
window.gradio_api = function(type, data) {
if (type === 'select_node') {
const triggers = document.querySelectorAll('input[type="hidden"]');
const selectTrigger = triggers[0];
if (selectTrigger) {
selectTrigger.value = data;
selectTrigger.dispatchEvent(new Event('change'));
}
} else if (type === 'move_node') {
const triggers = document.querySelectorAll('input[type="hidden"]');
const moveTrigger = triggers[1];
if (moveTrigger) {
moveTrigger.value = JSON.stringify(data);
moveTrigger.dispatchEvent(new Event('change'));
}
}
};
</script>
'''
return svg + js_code
demo.load(init_app, outputs=[canvas])
return demo
if __name__ == "__main__":
demo = create_workflow_ui()
demo.launch()