DevToolKit / app.py
acecalisto3's picture
Update app.py
350eab5 verified
raw
history blame
8.89 kB
import os
import subprocess
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
from agent.prompts import (
ACTION_PROMPT,
ADD_PROMPT,
COMPRESS_HISTORY_PROMPT,
LOG_PROMPT,
LOG_RESPONSE,
MODIFY_PROMPT,
PREFIX,
READ_PROMPT,
TASK_PROMPT,
UNDERSTAND_TEST_RESULTS_PROMPT,
)
from agent.utils import parse_action, parse_file_content, read_python_module_structure
# Initialize Hugging Face model and tokenizer
TOKENIZER = AutoTokenizer.from_pretrained("typefully/rag-tokenbert-3B")
MODEL = AutoModelForSeq2SeqLM.from_pretrained("typefully/rag-tokenbert-3B")
PIPELINE = pipeline('text-generation', model=MODEL, tokenizer=TOKENIZER, device=-1)
VERBOSE = False
MAX_HISTORY = 100
def hf_run_gpt(prompt_template, stop_tokens, max_length, module_summary, purpose, **prompt_kwargs):
content = PREFIX.format(module_summary=module_summary, purpose=purpose) + prompt_template.format(**prompt_kwargs)
if VERBOSE:
print(LOG_PROMPT.format(content))
input_seq = TOKENIZER(content, return_tensors='pt', truncation=True, padding='longest')['input_ids']
output_sequences = PIPELINE(input_seq, max_length=max_length, num_return_sequences=1, do_sample=False)
resp = TOKENIZER.decode(output_sequences[0]['generated_text'], skip_special_tokens=True)
if VERBOSE:
print(LOG_RESPONSE.format(resp))
return resp
def compress_history(purpose, task, history, directory):
module_summary, _, _ = read_python_module_structure(directory)
resp = hf_run_gpt(
COMPRESS_HISTORY_PROMPT,
stop_tokens=["observation:", "task:", "action:", "thought:"],
max_length=512,
module_summary=module_summary,
purpose=purpose,
task=task,
history=history,
)
history = "observation: {}\n".format(resp)
return history
def call_main(purpose, task, history, directory, action_input):
module_summary, _, _ = read_python_module_structure(directory)
resp = hf_run_gpt(
ACTION_PROMPT,
stop_tokens=["observation:", "task:"],
max_length=256,
module_summary=module_summary,
purpose=purpose,
task=task,
history=history,
)
lines = resp.strip().split("\n")
for line in lines:
if line.startswith("thought: "):
history += "{}\n".format(line)
elif line.startswith("action: "):
action_name, action_input = parse_action(line)
history += "{}\n".format(line)
return action_name, action_input, history, task
else:
assert False, "unknown action: {}".format(line)
return "MAIN", None, history, task
def call_test(purpose, task, history, directory, action_input):
result = subprocess.run(
["python", "-m", "pytest", "--collect-only", directory],
capture_output=True,
text=True,
)
if result.returncode != 0:
history += "observation: there are no tests! Test should be written in a test folder under {}\n".format(directory)
return "MAIN", None, history, task
result = subprocess.run(
["python", "-m", "pytest", directory], capture_output=True, text=True
)
if result.returncode == 0:
history += "observation: tests pass\n"
return "MAIN", None, history, task
module_summary, content, _ = read_python_module_structure(directory)
resp = hf_run_gpt(
UNDERSTAND_TEST_RESULTS_PROMPT,
stop_tokens=[],
max_length=256,
module_summary=module_summary,
purpose=purpose,
task=task,
history=history,
stdout=result.stdout[:5000], # limit amount of text
stderr=result.stderr[:5000], # limit amount of text
)
history += "observation: tests failed: {}\n".format(resp)
return "MAIN", None, history, task
def call_set_task(purpose, task, history, directory, action_input):
module_summary, content, _ = read_python_module_structure(directory)
task = hf_run_gpt(
TASK_PROMPT,
stop_tokens=[],
max_length=64,
module_summary=module_summary,
purpose=purpose,
task=task,
history=history,
).strip("\n")
history += "observation: task has been updated to: {}\n".format(task)
return "MAIN", None, history, task
def call_read(purpose, task, history, directory, action_input):
if not os.path.exists(action_input):
history += "observation: file does not exist\n"
return "MAIN", None, history, task
module_summary, content, _ = read_python_module_structure(directory)
f_content = content.get(action_input, "< document is empty >")
resp = hf_run_gpt(
READ_PROMPT,
stop_tokens=[],
max_length=256,
module_summary=module_summary,
purpose=purpose,
task=task,
history=history,
file_path=action_input,
file_contents=f_content,
).strip("\n")
history += "observation: {}\n".format(resp)
return "MAIN", None, history, task
def call_modify(purpose, task, history, directory, action_input):
if not os.path.exists(action_input):
history += "observation: file does not exist\n"
return "MAIN", None, history, task
module_summary, content, _ = read_python_module_structure(directory)
f_content = content.get(action_input, "< document is empty >")
resp = hf_run_gpt(
MODIFY_PROMPT,
stop_tokens=["action:", "thought:", "observation:"],
max_length=2048,
module_summary=module_summary,
purpose=purpose,
task=task,
history=history,
file_path=action_input,
file_contents=f_content,
)
new_contents, description = parse_file_content(resp)
if new_contents is None:
history += "observation: failed to modify file\n"
return "MAIN", None, history, task
with open(action_input, "w") as f:
f.write(new_contents)
history += "observation: file successfully modified\n"
history += "observation: {}\n".format(description)
return "MAIN", None, history, task
def call_add(purpose, task, history, directory, action_input):
d = os.path.dirname(action_input)
if not d.startswith(directory):
history += "observation: files must be under directory {}\n".format(directory)
elif not action_input.endswith(".py"):
history += "observation: can only write .py files\n"
else:
if d and not os.path.exists(d):
os.makedirs(d)
if not os.path.exists(action_input):
module_summary, _, _ = read_python_module_structure(directory)
resp = hf_run_gpt(
ADD_PROMPT,
stop_tokens=["action:", "thought:", "observation:"],
max_length=2048,
module_summary=module_summary,
purpose=purpose,
task=task,
history=history,
file_path=action_input,
)
new_contents, description = parse_file_content(resp)
if new_contents is None:
history += "observation: failed to write file\n"
return "MAIN", None, history, task
with open(action_input, "w") as f:
f.write(new_contents)
history += "observation: file successfully written\n"
history += "observation: {}\n".format(description)
else:
history += "observation: file already exists\n"
return "MAIN", None, history, task
NAME_TO_FUNC = {
"MAIN": call_main,
"UPDATE-TASK": call_set_task,
"MODIFY-FILE": call_modify,
"READ-FILE": call_read,
"ADD-FILE": call_add,
"TEST": call_test,
}
def run_action(purpose, task, history, directory, action_name, action_input):
if action_name == "COMPLETE":
exit(0)
# compress the history when it is long
if len(history.split("\n")) > MAX_HISTORY:
if VERBOSE:
print("COMPRESSING HISTORY")
history = compress_history(purpose, task, history, directory)
assert action_name in NAME_TO_FUNC
print("RUN: ", action_name, action_input)
return NAME_TO_FUNC[action_name](purpose, task, history, directory, action_input)
def run(purpose, directory, task=None):
history = ""
action_name = "UPDATE-TASK" if task is None else "MAIN"
action_input = None
while True:
print("")
print("")
print("---")
print("purpose:", purpose)
print("task:", task)
print("---")
print(history)
print("---")
action_name, action_input, history, task = run_action(
purpose,
task,
history,
directory,
action_name,
action_input,
)
if __name__ == "__main__":
# Example usage
run("Your purpose here", "path/to/your/directory")