from core.base_agent import BaseAgent from core.database import db from core.workflow import DevelopmentWorkflow from typing import Dict, Any class ProjectManagerAgent(BaseAgent): def __init__(self): super().__init__("Project Manager") self.workflow = DevelopmentWorkflow() self.create_chain(""" You are a Project Manager. Your task is to oversee the development process and provide status updates. Current Status: {input} Please provide a comprehensive status report including: 1. Project progress 2. Any blockers or issues 3. Next steps 4. Quality metrics 5. Recommendations Please provide clear, actionable insights. """) async def start_project(self, requirements: str) -> Dict[str, Any]: """Start a new project with the given requirements""" # Run the complete workflow result = await self.workflow.run(requirements) # Store all artifacts in the database self._store_artifacts(result) return { "status": "success", "user_stories": result["user_stories"], "design": result["design"], "code": result["code"], "test_results": result["test_results"], "messages": result["messages"], "message": "Project completed successfully" } def _store_artifacts(self, result: Dict[str, Any]): """Store all project artifacts in the database""" # Store user stories db.store_artifact( "user_stories", result["user_stories"], { "type": "user_story", "source": "business_analyst", "status": "created" } ) # Store design db.store_artifact( "designs", result["design"], { "type": "design", "source": "designer", "status": "created" } ) # Store code db.store_artifact( "code", result["code"], { "type": "code", "source": "developer", "status": "created", "version": "1.0.0" } ) # Store test results db.store_artifact( "test_results", result["test_results"], { "type": "test_result", "source": "tester", "status": "executed" } ) async def get_status(self) -> Dict[str, Any]: """Get the current project status""" # Query all artifacts from the database user_stories = db.query_artifacts("user_stories", "status:created") designs = db.query_artifacts("designs", "status:created") code = db.query_artifacts("code", "status:created") test_results = db.query_artifacts("test_results", "status:executed") status = { "user_stories": len(user_stories["ids"]), "designs": len(designs["ids"]), "code": len(code["ids"]), "test_results": len(test_results["ids"]) } return { "status": "success", "data": status, "message": "Status retrieved successfully" } async def check_quality(self) -> Dict[str, Any]: """Check the quality of project artifacts""" result = await self.process({ "input": "Checking quality of all project artifacts" }) return { "status": "success", "quality_report": result, "message": "Quality check completed successfully" }