|
from typing import Dict, Any |
|
from aiflows.base_flows.atomic import AtomicFlow |
|
import time |
|
import subprocess |
|
import os |
|
|
|
|
|
class ParseFeedbackAtomicFlow(AtomicFlow): |
|
"""This flow parses the feedback from the user. The flow is supposed to be called after the CodeFileEdit or PlanFileEdit flow. |
|
The flow will open the file in VSCode and wait for the user to edit the file. Once the user closes the file, the flow will parse the file and return the content of the file. |
|
|
|
*Input Interface*: |
|
- `temp_code_file_location`: Notice that if we are parsing the plan file, this should be changed to `temp_plan_file_location`. |
|
|
|
*Output Interface*: |
|
- `code`: The content of the code file. |
|
- `feedback`: The feedback from the user. |
|
""" |
|
def _read_content(self, file_location, file_type): |
|
if file_type == "code": |
|
header_string = "# Code:\n" |
|
elif file_type == "plan": |
|
header_string = "Plan:\n" |
|
else: |
|
raise NotImplemented |
|
|
|
with open(file_location, "r") as file: |
|
content = file.read() |
|
|
|
start_index = content.find(header_string) |
|
if start_index == -1: |
|
raise ValueError(f"Format error: {header_string} tag not found.") |
|
|
|
end_index = content.find("\n############\n") |
|
if end_index == -1 or end_index < start_index: |
|
raise ValueError(f"Format error: {file_type} section is malformed.") |
|
|
|
ret = content[start_index + len(header_string):end_index].strip() |
|
|
|
return ret |
|
|
|
def _parse_user_thoughts(self, file_location): |
|
with open(file_location, "r") as file: |
|
content = file.read() |
|
|
|
delimiter = "\n############\n" |
|
parts = content.split(delimiter) |
|
if len(parts) < 2: |
|
raise ValueError("Format error: Unable to find the thoughts section.") |
|
|
|
thoughts_section = parts[1].strip() |
|
|
|
thoughts_index = thoughts_section.find("Thoughts:") |
|
if thoughts_index == -1: |
|
raise ValueError("Format error: 'Thoughts:' tag not found.") |
|
|
|
thoughts = thoughts_section[thoughts_index + len("Thoughts:"):].strip() |
|
|
|
if thoughts == '': |
|
thoughts = 'Looks good, go on.' |
|
|
|
return thoughts |
|
|
|
def _check_input(self, input_data: Dict[str, Any]): |
|
code_file_exists = "temp_code_file_location" in input_data |
|
plan_file_exists = "temp_plan_file_location" in input_data |
|
if code_file_exists == False and plan_file_exists == False: |
|
raise AssertionError("Neither code file nor plan file is passed to ParseFeedbackAtomicFlow") |
|
elif code_file_exists and plan_file_exists: |
|
raise AssertionError("Both code file and plan file are passed to ParseFeedbackAtomic, which one to parse?") |
|
|
|
def _open_file_and_wait_for_file_update(self, file_location, check_interval=1): |
|
process = subprocess.Popen(["code", "--wait", file_location]) |
|
while True: |
|
if process.poll() is not None: |
|
break |
|
time.sleep(check_interval) |
|
|
|
def run( |
|
self, |
|
input_data: Dict[str, Any] |
|
): |
|
response = {} |
|
self._check_input(input_data) |
|
if "temp_code_file_location" in input_data: |
|
temp_code_file_location = input_data['temp_code_file_location'] |
|
self._open_file_and_wait_for_file_update(temp_code_file_location) |
|
code_content = self._read_content(temp_code_file_location, "code") |
|
user_thoughts = self._parse_user_thoughts(temp_code_file_location) |
|
response = {"code": code_content, "feedback": user_thoughts} |
|
|
|
elif "temp_plan_file_location" in input_data: |
|
plan_file_location = input_data["temp_plan_file_location"] |
|
self._open_file_and_wait_for_file_update(plan_file_location) |
|
plan_content = self._read_content(plan_file_location, "plan") |
|
user_thoughts = self._parse_user_thoughts(plan_file_location) |
|
response = {"plan": plan_content, "feedback": user_thoughts} |
|
|
|
return response |
|
|