#!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2023/5/11 14:43 @Author : alexanderwu @File : engineer.py @Modified By: mashenquan, 2023-11-1. In accordance with Chapter 2.2.1 and 2.2.2 of RFC 116: 1. Modify the data type of the `cause_by` value in the `Message` to a string, and utilize the new message distribution feature for message filtering. 2. Consolidate message reception and processing logic within `_observe`. 3. Fix bug: Add logic for handling asynchronous message processing when messages are not ready. 4. Supplemented the external transmission of internal messages. @Modified By: mashenquan, 2023-11-27. 1. According to Section 2.2.3.1 of RFC 135, replace file data in the message with the file name. 2. According to the design in Section 2.2.3.5.5 of RFC 135, add incremental iteration functionality. @Modified By: mashenquan, 2023-12-5. Enhance the workflow to navigate to WriteCode or QaEngineer based on the results of SummarizeCode. """ from __future__ import annotations import json from collections import defaultdict from pathlib import Path from typing import Optional, Set from metagpt.actions import Action, WriteCode, WriteCodeReview, WriteTasks from metagpt.actions.fix_bug import FixBug from metagpt.actions.project_management_an import REFINED_TASK_LIST, TASK_LIST from metagpt.actions.summarize_code import SummarizeCode from metagpt.actions.write_code_plan_and_change_an import WriteCodePlanAndChange from metagpt.const import ( BUGFIX_FILENAME, CODE_PLAN_AND_CHANGE_FILE_REPO, REQUIREMENT_FILENAME, SYSTEM_DESIGN_FILE_REPO, TASK_FILE_REPO, ) from metagpt.logs import logger from metagpt.roles import Role from metagpt.schema import ( CodePlanAndChangeContext, CodeSummarizeContext, CodingContext, Document, Documents, Message, ) from metagpt.utils.common import any_to_name, any_to_str, any_to_str_set IS_PASS_PROMPT = """ {context} ---- Does the above log indicate anything that needs to be done? If there are any tasks to be completed, please answer 'NO' along with the to-do list in JSON format; otherwise, answer 'YES' in JSON format. """ class Engineer(Role): """ Represents an Engineer role responsible for writing and possibly reviewing code. Attributes: name (str): Name of the engineer. profile (str): Role profile, default is 'Engineer'. goal (str): Goal of the engineer. constraints (str): Constraints for the engineer. n_borg (int): Number of borgs. use_code_review (bool): Whether to use code review. """ name: str = "Alex" profile: str = "Engineer" goal: str = "write elegant, readable, extensible, efficient code" constraints: str = ( "the code should conform to standards like google-style and be modular and maintainable. " "Use same language as user requirement" ) n_borg: int = 1 use_code_review: bool = False code_todos: list = [] summarize_todos: list = [] next_todo_action: str = "" n_summarize: int = 0 def __init__(self, **kwargs) -> None: super().__init__(**kwargs) self.set_actions([WriteCode]) self._watch([WriteTasks, SummarizeCode, WriteCode, WriteCodeReview, FixBug, WriteCodePlanAndChange]) self.code_todos = [] self.summarize_todos = [] self.next_todo_action = any_to_name(WriteCode) @staticmethod def _parse_tasks(task_msg: Document) -> list[str]: m = json.loads(task_msg.content) return m.get(TASK_LIST.key) or m.get(REFINED_TASK_LIST.key) async def _act_sp_with_cr(self, review=False) -> Set[str]: changed_files = set() for todo in self.code_todos: """ # Select essential information from the historical data to reduce the length of the prompt (summarized from human experience): 1. All from Architect 2. All from ProjectManager 3. Do we need other codes (currently needed)? TODO: The goal is not to need it. After clear task decomposition, based on the design idea, you should be able to write a single file without needing other codes. If you can't, it means you need a clearer definition. This is the key to writing longer code. """ coding_context = await todo.run() # Code review if review: action = WriteCodeReview(i_context=coding_context, context=self.context, llm=self.llm) self._init_action(action) coding_context = await action.run() dependencies = {coding_context.design_doc.root_relative_path, coding_context.task_doc.root_relative_path} if self.config.inc: dependencies.add(coding_context.code_plan_and_change_doc.root_relative_path) await self.project_repo.srcs.save( filename=coding_context.filename, dependencies=list(dependencies), content=coding_context.code_doc.content, ) msg = Message( content=coding_context.model_dump_json(), instruct_content=coding_context, role=self.profile, cause_by=WriteCode, ) self.rc.memory.add(msg) changed_files.add(coding_context.code_doc.filename) if not changed_files: logger.info("Nothing has changed.") return changed_files async def _act(self) -> Message | None: """Determines the mode of action based on whether code review is used.""" if self.rc.todo is None: return None if isinstance(self.rc.todo, WriteCodePlanAndChange): self.next_todo_action = any_to_name(WriteCode) return await self._act_code_plan_and_change() if isinstance(self.rc.todo, WriteCode): self.next_todo_action = any_to_name(SummarizeCode) return await self._act_write_code() if isinstance(self.rc.todo, SummarizeCode): self.next_todo_action = any_to_name(WriteCode) return await self._act_summarize() return None async def _act_write_code(self): changed_files = await self._act_sp_with_cr(review=self.use_code_review) return Message( content="\n".join(changed_files), role=self.profile, cause_by=WriteCodeReview if self.use_code_review else WriteCode, send_to=self, sent_from=self, ) async def _act_summarize(self): tasks = [] for todo in self.summarize_todos: summary = await todo.run() summary_filename = Path(todo.i_context.design_filename).with_suffix(".md").name dependencies = {todo.i_context.design_filename, todo.i_context.task_filename} for filename in todo.i_context.codes_filenames: rpath = self.project_repo.src_relative_path / filename dependencies.add(str(rpath)) await self.project_repo.resources.code_summary.save( filename=summary_filename, content=summary, dependencies=dependencies ) is_pass, reason = await self._is_pass(summary) if not is_pass: todo.i_context.reason = reason tasks.append(todo.i_context.model_dump()) await self.project_repo.docs.code_summary.save( filename=Path(todo.i_context.design_filename).name, content=todo.i_context.model_dump_json(), dependencies=dependencies, ) else: await self.project_repo.docs.code_summary.delete(filename=Path(todo.i_context.design_filename).name) logger.info(f"--max-auto-summarize-code={self.config.max_auto_summarize_code}") if not tasks or self.config.max_auto_summarize_code == 0: return Message( content="", role=self.profile, cause_by=SummarizeCode, sent_from=self, send_to="Edward", # The name of QaEngineer ) # The maximum number of times the 'SummarizeCode' action is automatically invoked, with -1 indicating unlimited. # This parameter is used for debugging the workflow. self.n_summarize += 1 if self.config.max_auto_summarize_code > self.n_summarize else 0 return Message( content=json.dumps(tasks), role=self.profile, cause_by=SummarizeCode, send_to=self, sent_from=self ) async def _act_code_plan_and_change(self): """Write code plan and change that guides subsequent WriteCode and WriteCodeReview""" node = await self.rc.todo.run() code_plan_and_change = node.instruct_content.model_dump_json() dependencies = { REQUIREMENT_FILENAME, str(self.project_repo.docs.prd.root_path / self.rc.todo.i_context.prd_filename), str(self.project_repo.docs.system_design.root_path / self.rc.todo.i_context.design_filename), str(self.project_repo.docs.task.root_path / self.rc.todo.i_context.task_filename), } code_plan_and_change_filepath = Path(self.rc.todo.i_context.design_filename) await self.project_repo.docs.code_plan_and_change.save( filename=code_plan_and_change_filepath.name, content=code_plan_and_change, dependencies=dependencies ) await self.project_repo.resources.code_plan_and_change.save( filename=code_plan_and_change_filepath.with_suffix(".md").name, content=node.content, dependencies=dependencies, ) return Message( content=code_plan_and_change, role=self.profile, cause_by=WriteCodePlanAndChange, send_to=self, sent_from=self, ) async def _is_pass(self, summary) -> (str, str): rsp = await self.llm.aask(msg=IS_PASS_PROMPT.format(context=summary), stream=False) logger.info(rsp) if "YES" in rsp: return True, rsp return False, rsp async def _think(self) -> Action | None: if not self.src_workspace: self.src_workspace = self.git_repo.workdir / self.git_repo.workdir.name write_plan_and_change_filters = any_to_str_set([WriteTasks, FixBug]) write_code_filters = any_to_str_set([WriteTasks, WriteCodePlanAndChange, SummarizeCode]) summarize_code_filters = any_to_str_set([WriteCode, WriteCodeReview]) if not self.rc.news: return None msg = self.rc.news[0] if self.config.inc and msg.cause_by in write_plan_and_change_filters: logger.debug(f"TODO WriteCodePlanAndChange:{msg.model_dump_json()}") await self._new_code_plan_and_change_action(cause_by=msg.cause_by) return self.rc.todo if msg.cause_by in write_code_filters: logger.debug(f"TODO WriteCode:{msg.model_dump_json()}") await self._new_code_actions() return self.rc.todo if msg.cause_by in summarize_code_filters and msg.sent_from == any_to_str(self): logger.debug(f"TODO SummarizeCode:{msg.model_dump_json()}") await self._new_summarize_actions() return self.rc.todo return None async def _new_coding_context(self, filename, dependency) -> CodingContext: old_code_doc = await self.project_repo.srcs.get(filename) if not old_code_doc: old_code_doc = Document(root_path=str(self.project_repo.src_relative_path), filename=filename, content="") dependencies = {Path(i) for i in await dependency.get(old_code_doc.root_relative_path)} task_doc = None design_doc = None code_plan_and_change_doc = await self._get_any_code_plan_and_change() if await self._is_fixbug() else None for i in dependencies: if str(i.parent.as_posix()) == TASK_FILE_REPO: task_doc = await self.project_repo.docs.task.get(i.name) elif str(i.parent.as_posix()) == SYSTEM_DESIGN_FILE_REPO: design_doc = await self.project_repo.docs.system_design.get(i.name) elif str(i.parent.as_posix()) == CODE_PLAN_AND_CHANGE_FILE_REPO: code_plan_and_change_doc = await self.project_repo.docs.code_plan_and_change.get(i.name) if not task_doc or not design_doc: logger.error(f'Detected source code "{filename}" from an unknown origin.') raise ValueError(f'Detected source code "{filename}" from an unknown origin.') context = CodingContext( filename=filename, design_doc=design_doc, task_doc=task_doc, code_doc=old_code_doc, code_plan_and_change_doc=code_plan_and_change_doc, ) return context async def _new_coding_doc(self, filename, dependency): context = await self._new_coding_context(filename, dependency) coding_doc = Document( root_path=str(self.project_repo.src_relative_path), filename=filename, content=context.model_dump_json() ) return coding_doc async def _new_code_actions(self): bug_fix = await self._is_fixbug() # Prepare file repos changed_src_files = self.project_repo.srcs.all_files if bug_fix else self.project_repo.srcs.changed_files changed_task_files = self.project_repo.docs.task.changed_files changed_files = Documents() # Recode caused by upstream changes. for filename in changed_task_files: design_doc = await self.project_repo.docs.system_design.get(filename) task_doc = await self.project_repo.docs.task.get(filename) code_plan_and_change_doc = await self.project_repo.docs.code_plan_and_change.get(filename) task_list = self._parse_tasks(task_doc) for task_filename in task_list: old_code_doc = await self.project_repo.srcs.get(task_filename) if not old_code_doc: old_code_doc = Document( root_path=str(self.project_repo.src_relative_path), filename=task_filename, content="" ) if not code_plan_and_change_doc: context = CodingContext( filename=task_filename, design_doc=design_doc, task_doc=task_doc, code_doc=old_code_doc ) else: context = CodingContext( filename=task_filename, design_doc=design_doc, task_doc=task_doc, code_doc=old_code_doc, code_plan_and_change_doc=code_plan_and_change_doc, ) coding_doc = Document( root_path=str(self.project_repo.src_relative_path), filename=task_filename, content=context.model_dump_json(), ) if task_filename in changed_files.docs: logger.warning( f"Log to expose potential conflicts: {coding_doc.model_dump_json()} & " f"{changed_files.docs[task_filename].model_dump_json()}" ) changed_files.docs[task_filename] = coding_doc self.code_todos = [ WriteCode(i_context=i, context=self.context, llm=self.llm) for i in changed_files.docs.values() ] # Code directly modified by the user. dependency = await self.git_repo.get_dependency() for filename in changed_src_files: if filename in changed_files.docs: continue coding_doc = await self._new_coding_doc(filename=filename, dependency=dependency) changed_files.docs[filename] = coding_doc self.code_todos.append(WriteCode(i_context=coding_doc, context=self.context, llm=self.llm)) if self.code_todos: self.set_todo(self.code_todos[0]) async def _new_summarize_actions(self): src_files = self.project_repo.srcs.all_files # Generate a SummarizeCode action for each pair of (system_design_doc, task_doc). summarizations = defaultdict(list) for filename in src_files: dependencies = await self.project_repo.srcs.get_dependency(filename=filename) ctx = CodeSummarizeContext.loads(filenames=list(dependencies)) summarizations[ctx].append(filename) for ctx, filenames in summarizations.items(): ctx.codes_filenames = filenames new_summarize = SummarizeCode(i_context=ctx, context=self.context, llm=self.llm) for i, act in enumerate(self.summarize_todos): if act.i_context.task_filename == new_summarize.i_context.task_filename: self.summarize_todos[i] = new_summarize new_summarize = None break if new_summarize: self.summarize_todos.append(new_summarize) if self.summarize_todos: self.set_todo(self.summarize_todos[0]) self.summarize_todos.pop(0) async def _new_code_plan_and_change_action(self, cause_by: str): """Create a WriteCodePlanAndChange action for subsequent to-do actions.""" files = self.project_repo.all_files options = {} if cause_by != any_to_str(FixBug): requirement_doc = await self.project_repo.docs.get(REQUIREMENT_FILENAME) options["requirement"] = requirement_doc.content else: fixbug_doc = await self.project_repo.docs.get(BUGFIX_FILENAME) options["issue"] = fixbug_doc.content code_plan_and_change_ctx = CodePlanAndChangeContext.loads(files, **options) self.rc.todo = WriteCodePlanAndChange(i_context=code_plan_and_change_ctx, context=self.context, llm=self.llm) @property def action_description(self) -> str: """AgentStore uses this attribute to display to the user what actions the current role should take.""" return self.next_todo_action async def _is_fixbug(self) -> bool: fixbug_doc = await self.project_repo.docs.get(BUGFIX_FILENAME) return bool(fixbug_doc and fixbug_doc.content) async def _get_any_code_plan_and_change(self) -> Optional[Document]: changed_files = self.project_repo.docs.code_plan_and_change.changed_files for filename in changed_files.keys(): doc = await self.project_repo.docs.code_plan_and_change.get(filename) if doc and doc.content: return doc return None