|
import os |
|
import re |
|
import shutil |
|
import signal |
|
import subprocess |
|
import time |
|
from typing import Dict |
|
|
|
import openai |
|
import requests |
|
|
|
from chatdev.codes import Codes |
|
from chatdev.documents import Documents |
|
from chatdev.roster import Roster |
|
from chatdev.utils import log_and_print_online |
|
|
|
|
|
class ChatEnvConfig: |
|
def __init__(self, clear_structure, |
|
brainstorming, |
|
gui_design, |
|
git_management): |
|
self.clear_structure = clear_structure |
|
self.brainstorming = brainstorming |
|
self.gui_design = gui_design |
|
self.git_management = git_management |
|
|
|
def __str__(self): |
|
string = "" |
|
string += "ChatEnvConfig.clear_structure: {}\n".format(self.clear_structure) |
|
string += "ChatEnvConfig.brainstorming: {}\n".format(self.brainstorming) |
|
return string |
|
|
|
|
|
class ChatEnv: |
|
def __init__(self, chat_env_config: ChatEnvConfig): |
|
self.config = chat_env_config |
|
self.roster: Roster = Roster() |
|
self.codes: Codes = Codes() |
|
self.proposed_images: Dict[str, str] = {} |
|
self.incorporated_images: Dict[str, str] = {} |
|
self.requirements: Documents = Documents() |
|
self.manuals: Documents = Documents() |
|
self.env_dict = { |
|
"directory": "", |
|
"task_prompt": "", |
|
"modality": "", |
|
"ideas": "", |
|
"language": "", |
|
"review_comments": "", |
|
"error_summary": "", |
|
"test_reports": "" |
|
} |
|
|
|
@staticmethod |
|
def fix_module_not_found_error(test_reports): |
|
if "ModuleNotFoundError" in test_reports: |
|
for match in re.finditer(r"No module named '(\S+)'", test_reports, re.DOTALL): |
|
module = match.group(1) |
|
subprocess.Popen("pip install {}".format(module), shell=True).wait() |
|
log_and_print_online("**[CMD Execute]**\n\n[CMD] pip install {}".format(module)) |
|
|
|
def set_directory(self, directory): |
|
assert len(self.env_dict['directory']) == 0 |
|
self.env_dict['directory'] = directory |
|
self.codes.directory = directory |
|
self.requirements.directory = directory |
|
self.manuals.directory = directory |
|
|
|
if os.path.exists(self.env_dict['directory']) and len(os.listdir(directory)) > 0: |
|
new_directory = "{}.{}".format(directory, time.strftime("%Y%m%d%H%M%S", time.localtime())) |
|
shutil.copytree(directory, new_directory) |
|
print("{} Copied to {}".format(directory, new_directory)) |
|
if self.config.clear_structure: |
|
if os.path.exists(self.env_dict['directory']): |
|
shutil.rmtree(self.env_dict['directory']) |
|
os.mkdir(self.env_dict['directory']) |
|
print("{} Created".format(directory)) |
|
else: |
|
os.mkdir(self.env_dict['directory']) |
|
|
|
def exist_bugs(self) -> tuple[bool, str]: |
|
directory = self.env_dict['directory'] |
|
|
|
success_info = "The software run successfully without errors." |
|
try: |
|
command = "cd {}; ls -l; python3 main.py;".format(directory) |
|
process = subprocess.Popen(command, shell=True, preexec_fn=os.setsid, |
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
time.sleep(3) |
|
return_code = process.returncode |
|
|
|
if process.poll() is None: |
|
os.killpg(os.getpgid(process.pid), signal.SIGTERM) |
|
if return_code == 0: |
|
return False, success_info |
|
else: |
|
error_output = process.stderr.read().decode('utf-8') |
|
if error_output: |
|
if "Traceback".lower() in error_output.lower(): |
|
errs = error_output.replace(directory + "/", "") |
|
return True, errs |
|
else: |
|
return False, success_info |
|
except subprocess.CalledProcessError as e: |
|
return True, f"Error: {e}" |
|
except Exception as ex: |
|
return True, f"An error occurred: {ex}" |
|
|
|
return False, success_info |
|
|
|
def recruit(self, agent_name: str): |
|
self.roster._recruit(agent_name) |
|
|
|
def exist_employee(self, agent_name: str) -> bool: |
|
return self.roster._exist_employee(agent_name) |
|
|
|
def print_employees(self): |
|
self.roster._print_employees() |
|
|
|
def update_codes(self, generated_content): |
|
self.codes._update_codes(generated_content) |
|
|
|
def rewrite_codes(self) -> None: |
|
self.codes._rewrite_codes(self.config.git_management) |
|
|
|
def get_codes(self) -> str: |
|
return self.codes._get_codes() |
|
|
|
def _load_from_hardware(self, directory) -> None: |
|
self.codes._load_from_hardware(directory) |
|
|
|
def _update_requirements(self, generated_content): |
|
self.requirements._update_docs(generated_content) |
|
|
|
def rewrite_requirements(self): |
|
self.requirements._rewrite_docs() |
|
|
|
def get_requirements(self) -> str: |
|
return self.requirements._get_docs() |
|
|
|
def _update_manuals(self, generated_content): |
|
self.manuals._update_docs(generated_content, parse=False, predifined_filename="manual.md") |
|
|
|
def rewrite_manuals(self): |
|
self.manuals._rewrite_docs() |
|
|
|
def write_meta(self) -> None: |
|
directory = self.env_dict['directory'] |
|
|
|
if not os.path.exists(directory): |
|
os.mkdir(directory) |
|
print("{} Created.".format(directory)) |
|
|
|
meta_filename = "meta.txt" |
|
with open(os.path.join(directory, meta_filename), "w", encoding="utf-8") as writer: |
|
writer.write("{}:\n{}\n\n".format("Task", self.env_dict['task_prompt'])) |
|
writer.write("{}:\n{}\n\n".format("Config", self.config.__str__())) |
|
writer.write("{}:\n{}\n\n".format("Roster", ", ".join(self.roster.agents))) |
|
writer.write("{}:\n{}\n\n".format("Modality", self.env_dict['modality'])) |
|
writer.write("{}:\n{}\n\n".format("Ideas", self.env_dict['ideas'])) |
|
writer.write("{}:\n{}\n\n".format("Language", self.env_dict['language'])) |
|
writer.write("{}:\n{}\n\n".format("Code_Version", self.codes.version)) |
|
writer.write("{}:\n{}\n\n".format("Proposed_images", len(self.proposed_images.keys()))) |
|
writer.write("{}:\n{}\n\n".format("Incorporated_images", len(self.incorporated_images.keys()))) |
|
print(os.path.join(directory, meta_filename), "Wrote") |
|
|
|
def generate_images_from_codes(self): |
|
def download(img_url, file_name): |
|
r = requests.get(img_url) |
|
filepath = os.path.join(self.env_dict['directory'], file_name) |
|
if os.path.exists(filepath): |
|
os.remove(filepath) |
|
with open(filepath, "wb") as f: |
|
f.write(r.content) |
|
print("{} Downloaded".format(filepath)) |
|
|
|
regex = r"(\w+.png)" |
|
joined_codes = self.get_codes() |
|
matches = re.finditer(regex, joined_codes, re.DOTALL) |
|
|
|
for match in matches: |
|
filename = match.group(1).strip() |
|
if filename in self.proposed_images.keys(): |
|
self.incorporated_images[filename] = self.proposed_images[filename] |
|
else: |
|
self.incorporated_images[filename] = filename.replace("_", " ") |
|
|
|
for filename in self.incorporated_images.keys(): |
|
if not os.path.exists(os.path.join(self.env_dict['directory'], filename)): |
|
desc = self.incorporated_images[filename] |
|
if desc.endswith(".png"): |
|
desc = desc.replace(".png", "") |
|
print("{}: {}".format(filename, desc)) |
|
response = openai.Image.create( |
|
prompt=desc, |
|
n=1, |
|
size="256x256" |
|
) |
|
image_url = response['data'][0]['url'] |
|
download(image_url, filename) |
|
|
|
def get_proposed_images_from_message(self, messages): |
|
def download(img_url, file_name): |
|
r = requests.get(img_url) |
|
filepath = os.path.join(self.env_dict['directory'], file_name) |
|
if os.path.exists(filepath): |
|
os.remove(filepath) |
|
with open(filepath, "wb") as f: |
|
f.write(r.content) |
|
print("{} Downloaded".format(filepath)) |
|
|
|
regex = r"(\w+.png):(.*?)\n" |
|
matches = re.finditer(regex, messages, re.DOTALL) |
|
images = {} |
|
for match in matches: |
|
filename = match.group(1).strip() |
|
desc = match.group(2).strip() |
|
images[filename] = desc |
|
|
|
if len(images.keys()) == 0: |
|
regex = r"(\w+.png)" |
|
matches = re.finditer(regex, messages, re.DOTALL) |
|
images = {} |
|
for match in matches: |
|
filename = match.group(1).strip() |
|
desc = " ".join(filename.replace(".png", "").split("_")) |
|
images[filename] = desc |
|
print("{}: {}".format(filename, images[filename])) |
|
|
|
for filename in images.keys(): |
|
if not os.path.exists(os.path.join(self.env_dict['directory'], filename)): |
|
desc = images[filename] |
|
if desc.endswith(".png"): |
|
desc = desc.replace(".png", "") |
|
print("{}: {}".format(filename, desc)) |
|
response = openai.Image.create( |
|
prompt=desc, |
|
n=1, |
|
size="256x256" |
|
) |
|
image_url = response['data'][0]['url'] |
|
download(image_url, filename) |
|
|
|
return images |
|
|