Multi_agent_test / utils /langgraph_pipeline.py
Rahul-8799's picture
Upload langgraph_pipeline.py
30d86ab verified
import uuid, zipfile, re, json
from pathlib import Path
from typing import TypedDict, List, Dict, Any, Tuple
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.messages.base import BaseMessage
from agents import (
product_manager_agent,
project_manager_agent,
software_architect_agent,
software_engineer_agent,
quality_assurance_agent,
ui_designer_agent,
)
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 1) State definitions
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
class InputState(TypedDict):
messages: List[BaseMessage]
chat_log: List[Dict[str, Any]]
iteration: int
feedback: str
class OutputState(TypedDict):
pm_output: str
proj_output: str
arch_output: str
ui_design_output: str
dev_output: str
qa_output: str
chat_log: List[Dict[str, Any]]
iteration: int
feedback: str
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 2) Wrap agents so they see full history
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
def wrap_agent(agent_run, output_key: str):
def node(state: Dict[str, Any]) -> Dict[str, Any]:
history = state["messages"]
log = state["chat_log"]
iteration = state.get("iteration", 0)
feedback = state.get("feedback", "")
# Add feedback to the prompt if it exists
if feedback:
history = history + [AIMessage(content=f"Previous feedback: {feedback}")]
result = agent_run({"messages": history, "chat_log": log})
return {
"messages": history + result["messages"],
"chat_log": result["chat_log"],
output_key: result[output_key],
"iteration": iteration,
"feedback": feedback
}
return node
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 3) Bridge β†’ ProductManager
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
def bridge_to_pm(state: Dict[str, Any]) -> Dict[str, Any]:
history = state["messages"]
log = state["chat_log"]
iteration = state.get("iteration", 0)
feedback = state.get("feedback", "")
if not history or not isinstance(history[-1], HumanMessage):
raise ValueError("bridge_to_pm expected a HumanMessage at history end")
prompt = history[-1].content
spec_prompt = (
f"# Stakeholder Prompt (Iteration {iteration})\n\n"
f"\"{prompt}\"\n\n"
)
if feedback:
spec_prompt += f"Previous feedback to consider:\n{feedback}\n\n"
spec_prompt += (
"Generate a structured product specification including:\n"
"- Goals\n"
"- Key features\n"
"- User stories\n"
"- Success metrics\n"
)
return {
"messages": [AIMessage(content=spec_prompt)],
"chat_log": log + [{"role": "System", "content": spec_prompt}],
"iteration": iteration,
"feedback": feedback
}
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 4) Feedback Loop Handler
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
def handle_feedback(state: Dict[str, Any]) -> Dict[str, Any]:
qa_output = state["qa_output"]
iteration = state.get("iteration", 0)
# Check if we need another iteration
if iteration < 3: # Maximum 3 iterations
return {
"messages": state["messages"],
"chat_log": state["chat_log"],
"iteration": iteration + 1,
"feedback": f"Iteration {iteration + 1} feedback: {qa_output}"
}
return END
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 5) Build & compile the LangGraph
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
graph = StateGraph(input=InputState, output=OutputState)
# Add nodes
graph.add_node("BridgePM", bridge_to_pm)
graph.add_node("ProductManager", wrap_agent(product_manager_agent.run, "pm_output"))
graph.add_node("ProjectManager", wrap_agent(project_manager_agent.run, "proj_output"))
graph.add_node("SoftwareArchitect", wrap_agent(software_architect_agent.run, "arch_output"))
graph.add_node("UIDesigner", wrap_agent(ui_designer_agent.run, "ui_design_output"))
graph.add_node("SoftwareEngineer", wrap_agent(software_engineer_agent.run, "dev_output"))
graph.add_node("QualityAssurance", wrap_agent(quality_assurance_agent.run, "qa_output"))
graph.add_node("FeedbackHandler", handle_feedback)
# Add edges with feedback loop
graph.set_entry_point("BridgePM")
graph.add_edge("BridgePM", "ProductManager")
graph.add_edge("ProductManager", "ProjectManager")
graph.add_edge("ProjectManager", "SoftwareArchitect")
graph.add_edge("SoftwareArchitect", "UIDesigner")
graph.add_edge("UIDesigner", "SoftwareEngineer")
graph.add_edge("SoftwareEngineer", "QualityAssurance")
graph.add_edge("QualityAssurance", "FeedbackHandler")
graph.add_edge("FeedbackHandler", "BridgePM") # Feedback loop back to start
compiled_graph = graph.compile()
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 6) Parse spec into sections
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
def parse_spec(spec: str) -> Dict[str, List[str]]:
sections: Dict[str, List[str]] = {}
for m in re.finditer(r"##\s*(.+?)\n((?:- .+\n?)+)", spec):
name = m.group(1).strip()
items = [line[2:].strip() for line in m.group(2).splitlines() if line.startswith("- ")]
sections[name] = items
return sections
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 7) Run pipeline, generate site, zip, return (chat_log, zip_path)
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
def run_pipeline_and_save(prompt: str) -> Tuple[List[Dict[str, Any]], str]:
# a) invoke agents
initial_state = {"messages": [HumanMessage(content=prompt)], "chat_log": [], "iteration": 0, "feedback": ""}
final_state = compiled_graph.invoke(initial_state)
chat_log = final_state["chat_log"]
dev_output = final_state["dev_output"]
# b) parse the developer output to extract code sections
sections = parse_code_sections(dev_output)
# c) write & zip
site_id = uuid.uuid4().hex
out_dir = Path("output")
site_dir = out_dir / f"site_{site_id}"
site_dir.mkdir(parents=True, exist_ok=True)
# Write HTML file
(site_dir / "index.html").write_text(sections.get("HTML Structure", ""), encoding="utf-8")
# Write CSS file
(site_dir / "styles.css").write_text(sections.get("CSS Styles", ""), encoding="utf-8")
# Write JavaScript file
(site_dir / "script.js").write_text(sections.get("JavaScript", ""), encoding="utf-8")
# Write Tailwind config
(site_dir / "tailwind.config.js").write_text(sections.get("Tailwind Config", ""), encoding="utf-8")
# Create package.json for dependencies
package_json = {
"name": f"site_{site_id}",
"version": "1.0.0",
"description": "Generated responsive website",
"scripts": {
"build": "tailwindcss -i ./styles.css -o ./dist/output.css",
"watch": "tailwindcss -i ./styles.css -o ./dist/output.css --watch"
},
"dependencies": {
"tailwindcss": "^3.4.1",
"alpinejs": "^3.13.3"
}
}
(site_dir / "package.json").write_text(
json.dumps(package_json, indent=2),
encoding="utf-8"
)
# Create README
readme_content = f"""# Generated Website
This is a responsive website generated by the Multi-Agent UI Generator.
## Setup
1. Install dependencies:
```bash
npm install
```
2. Build the CSS:
```bash
npm run build
```
3. For development with live reload:
```bash
npm run watch
```
## Features
- Responsive design using Tailwind CSS
- Interactive elements with JavaScript
- Modern animations and transitions
- Mobile-first approach
"""
(site_dir / "README.md").write_text(readme_content, encoding="utf-8")
# Create zip file
zip_path = out_dir / f"site_{site_id}.zip"
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for f in site_dir.iterdir():
zf.write(f, arcname=f.name)
return chat_log, str(zip_path)
def parse_code_sections(output: str) -> Dict[str, str]:
"""Parse code sections from the developer output"""
sections = {}
current_section = None
current_code = []
for line in output.split("\n"):
if line.startswith("## "):
if current_section:
sections[current_section] = "\n".join(current_code)
current_section = line[3:].strip()
current_code = []
elif line.startswith("```"):
continue
elif current_section:
current_code.append(line)
if current_section:
sections[current_section] = "\n".join(current_code)
return sections