| |
| import datetime |
| from random import randint |
| from datetime import time |
| import random |
| from typing import Optional |
| from crewai import Crew |
| from crewai.flow.flow import router, or_ |
| from pydantic import BaseModel |
| from crewai.flow import Flow, listen, start |
| from engineering_team_using_flow.crews.engineering_crew.engineering_crew import ( |
| CodeReviewFeedback, |
| EngineeringCrew, |
| ) |
| from .shared_queue import TaskInfo, add_to_queue |
|
|
|
|
| class EngineeringState(BaseModel): |
| module_name: str = "" |
| business_requirement: str = "" |
| technical_design: Optional[str] = "" |
| backend_code: Optional[str] = "" |
| frontend_code: Optional[str] = "" |
| unit_test_code: Optional[str] = "" |
| backend_code_review_feedbacks: list[CodeReviewFeedback] = [] |
| frontend_code_review_feedbacks: list[CodeReviewFeedback] = [] |
|
|
|
|
| MAX_REVIEW_ITERATIONS = 3 |
| BUSINESS_REQUIREMENTS: list[EngineeringState] = [ |
| EngineeringState( |
| module_name="mod_series_eval", |
| business_requirement="Create a web application that allows the user to evaluate the first N terms of a given series and \ |
| multiply the total by X (default to 4). for instance, the series could be something like :\ |
| 1 + 1/3 - 1/5 + 1/7 - 1/9 ... ", |
| ) |
| ] |
|
|
|
|
| class EngineeringFlow(Flow[EngineeringState]): |
|
|
| def __init__(self, module_name:str, business_requirement:str): |
| super().__init__() |
| self.module_name = module_name |
| self.business_requirement = business_requirement |
|
|
| @start() |
| def generate_business_requirement(self): |
| print("Generating business requirement") |
| self.state.business_requirement = self.business_requirement |
| self.state.module_name = self.module_name |
|
|
| add_to_queue( |
| TaskInfo( |
| name="Gathering Business Requirements", |
| type="markdown", |
| output=f"Gathering Business Requirements for {self.state.module_name}...", |
| ) |
| ) |
|
|
| |
| |
| |
| add_to_queue( |
| TaskInfo( |
| name="Gather Business Requirements", |
| type="markdown", |
| output=f"#### Business Requirements for {self.state.module_name}: \n{self.state.business_requirement}", |
| ) |
| ) |
|
|
| @listen(generate_business_requirement) |
| def design_product(self): |
| print("Designing product for requirement: ", self.state) |
| add_to_queue( |
| TaskInfo( |
| name="Generating Design", |
| type="markdown", |
| output=f"Designing the product ...", |
| ) |
| ) |
|
|
| engineeringCrew = EngineeringCrew() |
|
|
| mycrew = Crew( |
| agents=[engineeringCrew.development_lead()], |
| tasks=[engineeringCrew.design_task()], |
| ) |
| result = mycrew.kickoff( |
| inputs={ |
| "id": self.state.id, |
| "requirement": self.state.business_requirement, |
| "module_name": self.state.module_name, |
| "review_comments": "", |
| } |
| ) |
|
|
| print("Product Design Created", result.raw) |
| self.state.technical_design = result.raw |
| add_to_queue( |
| TaskInfo( |
| name="Generate Design", |
| type="markdown", |
| output=f"{self.state.technical_design}", |
| ) |
| ) |
|
|
| @listen(or_("design_product", "REWRITE_BACKEND_CODE")) |
| def develop_backend(self): |
| print("Developing backend : ", self.state) |
| add_to_queue( |
| TaskInfo( |
| name="Generating Backend Code", |
| type="markdown", |
| output=f"Generating Backend Code ...", |
| ) |
| ) |
| engineeringCrew = EngineeringCrew() |
|
|
| mycrew = Crew( |
| agents=[engineeringCrew.backend_engineer()], |
| tasks=[engineeringCrew.backend_coding_task()], |
| ) |
| result = mycrew.kickoff( |
| inputs={ |
| "id": self.state.id, |
| "requirement": self.state.business_requirement, |
| "module_name": self.state.module_name, |
| "review_comments": ( |
| self.state.backend_code_review_feedbacks[ |
| -1 |
| ].review_comments_markdown |
| if self.state.backend_code_review_feedbacks |
| else "" |
| ), |
| } |
| ) |
|
|
| print("Backend Code Created", result.raw) |
| self.state.backend_code = result.raw |
| add_to_queue( |
| TaskInfo( |
| name="Generate Backend Code", |
| type="markdown", |
| output=f"#### Backend Code for {self.state.module_name}: \n ```{self.state.backend_code}\n```", |
| ) |
| ) |
| return "BACKEND_CODE_CREATED" |
|
|
| @router(develop_backend) |
| def review_backend_code(self): |
| print("Reviewing backend code : ", self.state) |
| if len(self.state.backend_code_review_feedbacks) >= MAX_REVIEW_ITERATIONS: |
| add_to_queue( |
| TaskInfo( |
| name="Maximum Iterations Exceeded!", |
| type="markdown", |
| output=f"#### Maximum Iterations Exceeded!", |
| ) |
| ) |
| return "MAX_REVIEW_ITERATIONS_EXCEEDED" |
| engineeringCrew = EngineeringCrew() |
| add_to_queue( |
| TaskInfo( |
| name="Reviewing Backend Code", |
| type="markdown", |
| output=f"Reviewing Backend Code ...", |
| ) |
| ) |
|
|
| mycrew = Crew( |
| agents=[engineeringCrew.code_reviewer()], |
| tasks=[engineeringCrew.code_review_task()], |
| ) |
| if self.state.backend_code_review_feedbacks is None: |
| self.state.backend_code_review_feedbacks = [] |
|
|
| result = mycrew.kickoff( |
| inputs={ |
| "id": self.state.id, |
| "requirement": self.state.business_requirement, |
| "module_name": self.state.module_name, |
| "backend_code": self.state.backend_code, |
| "iteration": len(self.state.backend_code_review_feedbacks), |
| } |
| ) |
|
|
| codeReviewFeedback: CodeReviewFeedback = result.tasks_output[0].pydantic |
| print(codeReviewFeedback) |
| add_to_queue( |
| TaskInfo( |
| name="Generate Backend Code Review", |
| type="markdown", |
| output=f"#### Backend Code Review Iteration {len(self.state.backend_code_review_feedbacks)} for {self.state.module_name}: \n{codeReviewFeedback.review_comments_markdown}", |
| ) |
| ) |
| self.state.backend_code_review_feedbacks.append(codeReviewFeedback) |
|
|
| if codeReviewFeedback.passed_review: |
| return "BACKEND_CODE_REVIEWED" |
| else: |
| return "REWRITE_BACKEND_CODE" |
|
|
| @listen(or_("BACKEND_CODE_REVIEWED", "REWRITE_FRONTEND_CODE")) |
| def develop_frontend(self): |
| print("Developing frontend : ", self.state) |
| add_to_queue( |
| TaskInfo( |
| name="Developing Frontend Code", |
| type="markdown", |
| output=f"Developing Frontend Code ...", |
| ) |
| ) |
| engineeringCrew = EngineeringCrew() |
|
|
| mycrew = Crew( |
| agents=[engineeringCrew.frontend_engineer()], |
| tasks=[engineeringCrew.frontend_coding_task()], |
| ) |
| result = mycrew.kickoff( |
| inputs={ |
| "id": self.state.id, |
| "requirement": self.state.business_requirement, |
| "module_name": self.state.module_name, |
| "review_comments": ( |
| self.state.frontend_code_review_feedbacks[ |
| -1 |
| ].review_comments_markdown |
| if self.state.frontend_code_review_feedbacks |
| else "" |
| ), |
| } |
| ) |
|
|
| print("Frontend Code Created", result.raw) |
| self.state.frontend_code = result.raw |
| add_to_queue( |
| TaskInfo( |
| name="Generate Frontend Code", |
| type="markdown", |
| output=f"#### Frontend Code for {self.state.module_name}:\n ```{self.state.frontend_code}```", |
| ) |
| ) |
| return "FRONTEND_CODE_CREATED" |
|
|
| @router(develop_frontend) |
| def review_frontend_code(self): |
| print("Reviewing frontkend code : ", self.state) |
| if len(self.state.frontend_code_review_feedbacks) >= MAX_REVIEW_ITERATIONS: |
| add_to_queue( |
| TaskInfo( |
| name="Maximum Iterations Exceeded!", |
| type="markdown", |
| output=f"#### Maximum Iterations Exceeded!", |
| ) |
| ) |
| return "MAX_REVIEW_ITERATIONS_EXCEEDED" |
| engineeringCrew = EngineeringCrew() |
| add_to_queue( |
| TaskInfo( |
| name="Reviewing Frontend Code", |
| type="markdown", |
| output=f"Reviewing Frontend Code ...", |
| ) |
| ) |
|
|
| mycrew = Crew( |
| agents=[engineeringCrew.code_reviewer()], |
| tasks=[engineeringCrew.frontend_code_review_task()], |
| ) |
| if self.state.frontend_code_review_feedbacks is None: |
| self.state.frontend_code_review_feedbacks = [] |
|
|
| result = mycrew.kickoff( |
| inputs={ |
| "id": self.state.id, |
| "requirement": self.state.business_requirement, |
| "module_name": self.state.module_name, |
| "frontend_code": self.state.frontend_code, |
| "iteration": len(self.state.frontend_code_review_feedbacks), |
| } |
| ) |
|
|
| codeReviewFeedback: CodeReviewFeedback = result.tasks_output[0].pydantic |
| print(codeReviewFeedback) |
| add_to_queue( |
| TaskInfo( |
| name="Generate Frontend Code Review", |
| type="markdown", |
| output=f"#### Frontend Code Review Iteration {len(self.state.frontend_code_review_feedbacks)} for {self.state.module_name}: \n{codeReviewFeedback.review_comments_markdown}", |
| ) |
| ) |
| self.state.frontend_code_review_feedbacks.append(codeReviewFeedback) |
|
|
| if codeReviewFeedback.passed_review: |
| return "FRONTEND_CODE_REVIEWED" |
| else: |
| return "REWRITE_FRONTEND_CODE" |
|
|
| @listen("FRONTEND_CODE_REVIEWED") |
| def write_test_cases(self): |
| print("Writing test cases : ", self.state) |
| add_to_queue( |
| TaskInfo( |
| name="Writing Test Cases", |
| type="markdown", |
| output=f"Writing Test Cases ...", |
| ) |
| ) |
| engineeringCrew = EngineeringCrew() |
|
|
| mycrew = Crew( |
| agents=[engineeringCrew.test_engineer()], |
| tasks=[engineeringCrew.test_preparation_task()], |
| ) |
| result = mycrew.kickoff( |
| inputs={ |
| "id": self.state.id, |
| "requirement": self.state.business_requirement, |
| "module_name": self.state.module_name, |
| "backend_code": self.state.backend_code, |
| "frontend_code": self.state.frontend_code, |
| } |
| ) |
|
|
| self.state.unit_test_code = result.raw |
| add_to_queue( |
| TaskInfo( |
| name="Generate Test Cases", |
| type="markdown", |
| output=f"#### Test Cases for {self.state.module_name}: \n```{self.state.unit_test_code}```", |
| ) |
| ) |
|
|
| return "TEST_CASES_PREPARED" |
|
|
|
|
| def kickoff(): |
| engineering_flow = EngineeringFlow() |
| engineering_flow.kickoff() |
|
|
|
|
| def plot(): |
| engineering_flow = EngineeringFlow() |
| engineering_flow.plot() |
|
|
|
|
| if __name__ == "__main__": |
| kickoff() |
|
|