import os import re import shutil import signal import subprocess import time from typing import Dict import openai import requests from chatdev.codes import Codes from chatdev.documents import Documents from chatdev.roster import Roster from chatdev.utils import log_and_print_online class ChatEnvConfig: def __init__(self, clear_structure, brainstorming, gui_design, git_management): self.clear_structure = clear_structure self.brainstorming = brainstorming self.gui_design = gui_design self.git_management = git_management def __str__(self): string = "" string += "ChatEnvConfig.clear_structure: {}\n".format(self.clear_structure) string += "ChatEnvConfig.brainstorming: {}\n".format(self.brainstorming) return string class ChatEnv: def __init__(self, chat_env_config: ChatEnvConfig): self.config = chat_env_config self.roster: Roster = Roster() self.codes: Codes = Codes() self.proposed_images: Dict[str, str] = {} self.incorporated_images: Dict[str, str] = {} self.requirements: Documents = Documents() self.manuals: Documents = Documents() self.env_dict = { "directory": "", "task_prompt": "", "modality": "", "ideas": "", "language": "", "review_comments": "", "error_summary": "", "test_reports": "" } @staticmethod def fix_module_not_found_error(test_reports): if "ModuleNotFoundError" in test_reports: for match in re.finditer(r"No module named '(\S+)'", test_reports, re.DOTALL): module = match.group(1) subprocess.Popen("pip install {}".format(module), shell=True).wait() log_and_print_online("**[CMD Execute]**\n\n[CMD] pip install {}".format(module)) def set_directory(self, directory): assert len(self.env_dict['directory']) == 0 self.env_dict['directory'] = directory self.codes.directory = directory self.requirements.directory = directory self.manuals.directory = directory if os.path.exists(self.env_dict['directory']) and len(os.listdir(directory)) > 0: new_directory = "{}.{}".format(directory, time.strftime("%Y%m%d%H%M%S", time.localtime())) shutil.copytree(directory, new_directory) print("{} Copied to {}".format(directory, new_directory)) if self.config.clear_structure: if os.path.exists(self.env_dict['directory']): shutil.rmtree(self.env_dict['directory']) os.mkdir(self.env_dict['directory']) print("{} Created".format(directory)) else: os.mkdir(self.env_dict['directory']) def exist_bugs(self) -> tuple[bool, str]: directory = self.env_dict['directory'] success_info = "The software run successfully without errors." try: command = "cd {}; ls -l; python3 main.py;".format(directory) process = subprocess.Popen(command, shell=True, preexec_fn=os.setsid, stdout=subprocess.PIPE, stderr=subprocess.PIPE) time.sleep(3) return_code = process.returncode # Check if the software is still running if process.poll() is None: os.killpg(os.getpgid(process.pid), signal.SIGTERM) if return_code == 0: return False, success_info else: error_output = process.stderr.read().decode('utf-8') if error_output: if "Traceback".lower() in error_output.lower(): errs = error_output.replace(directory + "/", "") return True, errs else: return False, success_info except subprocess.CalledProcessError as e: return True, f"Error: {e}" except Exception as ex: return True, f"An error occurred: {ex}" return False, success_info def recruit(self, agent_name: str): self.roster._recruit(agent_name) def exist_employee(self, agent_name: str) -> bool: return self.roster._exist_employee(agent_name) def print_employees(self): self.roster._print_employees() def update_codes(self, generated_content): self.codes._update_codes(generated_content) def rewrite_codes(self) -> None: self.codes._rewrite_codes(self.config.git_management) def get_codes(self) -> str: return self.codes._get_codes() def _load_from_hardware(self, directory) -> None: self.codes._load_from_hardware(directory) def _update_requirements(self, generated_content): self.requirements._update_docs(generated_content) def rewrite_requirements(self): self.requirements._rewrite_docs() def get_requirements(self) -> str: return self.requirements._get_docs() def _update_manuals(self, generated_content): self.manuals._update_docs(generated_content, parse=False, predifined_filename="manual.md") def rewrite_manuals(self): self.manuals._rewrite_docs() def write_meta(self) -> None: directory = self.env_dict['directory'] if not os.path.exists(directory): os.mkdir(directory) print("{} Created.".format(directory)) meta_filename = "meta.txt" with open(os.path.join(directory, meta_filename), "w", encoding="utf-8") as writer: writer.write("{}:\n{}\n\n".format("Task", self.env_dict['task_prompt'])) writer.write("{}:\n{}\n\n".format("Config", self.config.__str__())) writer.write("{}:\n{}\n\n".format("Roster", ", ".join(self.roster.agents))) writer.write("{}:\n{}\n\n".format("Modality", self.env_dict['modality'])) writer.write("{}:\n{}\n\n".format("Ideas", self.env_dict['ideas'])) writer.write("{}:\n{}\n\n".format("Language", self.env_dict['language'])) writer.write("{}:\n{}\n\n".format("Code_Version", self.codes.version)) writer.write("{}:\n{}\n\n".format("Proposed_images", len(self.proposed_images.keys()))) writer.write("{}:\n{}\n\n".format("Incorporated_images", len(self.incorporated_images.keys()))) print(os.path.join(directory, meta_filename), "Wrote") def generate_images_from_codes(self): def download(img_url, file_name): r = requests.get(img_url) filepath = os.path.join(self.env_dict['directory'], file_name) if os.path.exists(filepath): os.remove(filepath) with open(filepath, "wb") as f: f.write(r.content) print("{} Downloaded".format(filepath)) regex = r"(\w+.png)" joined_codes = self.get_codes() matches = re.finditer(regex, joined_codes, re.DOTALL) # matched_images = {} for match in matches: filename = match.group(1).strip() if filename in self.proposed_images.keys(): self.incorporated_images[filename] = self.proposed_images[filename] else: self.incorporated_images[filename] = filename.replace("_", " ") for filename in self.incorporated_images.keys(): if not os.path.exists(os.path.join(self.env_dict['directory'], filename)): desc = self.incorporated_images[filename] if desc.endswith(".png"): desc = desc.replace(".png", "") print("{}: {}".format(filename, desc)) response = openai.Image.create( prompt=desc, n=1, size="256x256" ) image_url = response['data'][0]['url'] download(image_url, filename) def get_proposed_images_from_message(self, messages): def download(img_url, file_name): r = requests.get(img_url) filepath = os.path.join(self.env_dict['directory'], file_name) if os.path.exists(filepath): os.remove(filepath) with open(filepath, "wb") as f: f.write(r.content) print("{} Downloaded".format(filepath)) regex = r"(\w+.png):(.*?)\n" matches = re.finditer(regex, messages, re.DOTALL) images = {} for match in matches: filename = match.group(1).strip() desc = match.group(2).strip() images[filename] = desc if len(images.keys()) == 0: regex = r"(\w+.png)" matches = re.finditer(regex, messages, re.DOTALL) images = {} for match in matches: filename = match.group(1).strip() desc = " ".join(filename.replace(".png", "").split("_")) images[filename] = desc print("{}: {}".format(filename, images[filename])) for filename in images.keys(): if not os.path.exists(os.path.join(self.env_dict['directory'], filename)): desc = images[filename] if desc.endswith(".png"): desc = desc.replace(".png", "") print("{}: {}".format(filename, desc)) response = openai.Image.create( prompt=desc, n=1, size="256x256" ) image_url = response['data'][0]['url'] download(image_url, filename) return images