Spaces:
Sleeping
Sleeping
import os | |
#import agi | |
import gradio as gr | |
directory = "./app" | |
#pa=os.environ['PASS'] | |
import os | |
import subprocess | |
from hf_file import hf_file as hfwrite | |
from huggingface_hub import InferenceClient,HfApi | |
#import openai | |
from text_generation import Client | |
from pathlib import Path | |
from prompts import ( | |
ACTION_PROMPT, | |
ADD_PROMPT, | |
COMPRESS_HISTORY_PROMPT, | |
LOG_PROMPT, | |
LOG_RESPONSE, | |
MODIFY_PROMPT, | |
PREFIX, | |
READ_PROMPT, | |
TASK_PROMPT, | |
UNDERSTAND_TEST_RESULTS_PROMPT, | |
) | |
from utils import parse_action, parse_file_content, read_python_module_structure | |
#API_TOKEN = os.getenv("HF_AUTH_TOKEN") | |
VERBOSE = True | |
MAX_HISTORY = 100 | |
#MODEL = "gpt-3.5-turbo" # "gpt-4" | |
API_PATHS = { | |
"HuggingFaceM4/idefics-9b-instruct": ( | |
"https://api-inference.huggingface.co/models/HuggingFaceM4/idefics-9b-instruct" | |
), | |
"HuggingFaceM4/idefics-80b-instruct": ( | |
"https://api-inference.huggingface.co/models/HuggingFaceM4/idefics-80b-instruct" | |
), | |
} | |
def run_gpt( | |
prompt_template, | |
stop_tokens, | |
max_tokens, | |
module_summary, | |
purpose, | |
**prompt_kwargs, | |
): | |
client = InferenceClient( | |
"mistralai/Mixtral-8x7B-Instruct-v0.1" | |
) | |
content = PREFIX.format( | |
module_summary=module_summary, | |
purpose=purpose, | |
) + prompt_template.format(**prompt_kwargs) | |
if VERBOSE: | |
print(LOG_PROMPT.format(content)) | |
#query = prompt_list_to_tgi_input(formated_prompt_list) | |
#stream = client.generate_stream(prompt=query, **generation_args) | |
generation_args = { | |
"max_new_tokens": max_tokens, | |
"repetition_penalty": 1.0, | |
"stop_sequences": stop_tokens, | |
"do_sample": True, | |
} | |
#content = ([{"role": "system", "content": f"{content}"}]) | |
#resp = client.generate( | |
# prompt=([{"role": "system", "content": f"{content}"}],generation_args))["choices"][0]["message"]["content"] | |
#resp = client.text_generation(content, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
stream = client.text_generation(content, **generation_args, stream=True, details=True, return_full_text=False) | |
resp = "" | |
for response in stream: | |
resp += response.token.text | |
if VERBOSE: | |
print(LOG_RESPONSE.format(resp)) | |
return resp | |
''' | |
def run_gpt( | |
prompt_template, | |
stop_tokens, | |
max_tokens, | |
module_summary, | |
purpose, | |
**prompt_kwargs, | |
): | |
content = PREFIX.format( | |
module_summary=module_summary, | |
purpose=purpose, | |
) + prompt_template.format(**prompt_kwargs) | |
if VERBOSE: | |
print(LOG_PROMPT.format(content)) | |
resp = openai.ChatCompletion.create( | |
model=MODEL, | |
messages=[ | |
{"role": "system", "content": content}, | |
], | |
temperature=0.0, | |
max_tokens=max_tokens, | |
stop=stop_tokens if stop_tokens else None, | |
)["choices"][0]["message"]["content"] | |
if VERBOSE: | |
print(LOG_RESPONSE.format(resp)) | |
return resp | |
''' | |
def compress_history(purpose, task, history, directory): | |
module_summary, _, _ = read_python_module_structure(directory) | |
resp = run_gpt( | |
COMPRESS_HISTORY_PROMPT, | |
stop_tokens=["observation:", "task:", "action:", "thought:"], | |
max_tokens=512, | |
module_summary=module_summary, | |
purpose=purpose, | |
task=task, | |
history=history, | |
) | |
history = "observation: {}\n".format(resp) | |
return history | |
def call_main(purpose, task, history, directory, action_input,repo_name,hf_token): | |
print("RUNNING MAIN") | |
module_summary, _, _ = read_python_module_structure(directory) | |
resp = run_gpt( | |
ACTION_PROMPT, | |
stop_tokens=["observation:", "task:"], | |
max_tokens=256, | |
module_summary=module_summary, | |
purpose=purpose, | |
task=task, | |
history=history, | |
) | |
lines = resp.strip().strip("\n").split("\n") | |
for line in lines: | |
if line == "": | |
continue | |
if line.startswith("thought: "): | |
history += "{}\n".format(line) | |
elif line.startswith("action: "): | |
action_name, action_input = parse_action(line) | |
history += "{}\n".format(line) | |
return action_name, action_input, history, task | |
else: | |
history += "unknown action: {}".format(line) | |
#assert False, "unknown action: {}".format(line) | |
return "MAIN", None, history, task | |
def call_test(purpose, task, history, directory, action_input,repo_name,hf_token): | |
#directory = f"https://huggingface.co/spaces/{repo_name}/raw/main{directory.strip('.')}" | |
result = subprocess.run( | |
["python", "-m", "pytest", "--collect-only", directory], | |
capture_output=True, | |
text=True, | |
) | |
if result.returncode != 0: | |
history += "observation: there are no tests! Test should be written in a test folder under {}\n".format( | |
directory | |
) | |
return "MAIN", None, history, task | |
result = subprocess.run( | |
["python", "-m", "pytest", directory], capture_output=True, text=True | |
) | |
if result.returncode == 0: | |
history += "observation: tests pass\n" | |
return "MAIN", None, history, task | |
module_summary, content, _ = read_python_module_structure(directory) | |
resp = run_gpt( | |
UNDERSTAND_TEST_RESULTS_PROMPT, | |
stop_tokens=[], | |
max_tokens=256, | |
module_summary=module_summary, | |
purpose=purpose, | |
task=task, | |
history=history, | |
stdout=result.stdout[:5000], # limit amount of text | |
stderr=result.stderr[:5000], # limit amount of text | |
) | |
history += "observation: tests failed: {}\n".format(resp) | |
return "MAIN", None, history, task | |
def call_set_task(purpose, task, history, directory, action_input,repo_name,hf_token): | |
module_summary, content, _ = read_python_module_structure(directory) | |
task = run_gpt( | |
TASK_PROMPT, | |
stop_tokens=[], | |
max_tokens=64, | |
module_summary=module_summary, | |
purpose=purpose, | |
task=task, | |
history=history, | |
).strip("\n") | |
history += "observation: task has been updated to: {}\n".format(task) | |
return "MAIN", None, history, task | |
def call_read(purpose, task, history, directory, action_input,repo_name,hf_token): | |
#action_input1=f"https://huggingface.co/spaces/{repo_name}/raw/main/{action_input.split('./',1)[1]}" | |
#print (f'DIRECTORY: {directory}') | |
print (f'ACTION INPUT: {action_input}') | |
if "'" in action_input: | |
action_input.strip("'") | |
if not os.path.exists(f"{action_input}"): | |
history += "observation: file does not exist\n" | |
return "MAIN", None, history, task | |
#directory = f"https://huggingface.co/spaces/{repo_name}/raw/main{directory.strip('.')}" | |
module_summary, content, _ = read_python_module_structure(directory) | |
f_content = ( | |
content[action_input] if content[action_input] else "< document is empty >" | |
) | |
resp = run_gpt( | |
READ_PROMPT, | |
stop_tokens=[], | |
max_tokens=256, | |
module_summary=module_summary, | |
purpose=purpose, | |
task=task, | |
history=history, | |
file_path=action_input, | |
file_contents=f_content, | |
).strip("\n") | |
history += "observation: {}\n".format(resp) | |
return "MAIN", None, history, task | |
def call_modify(purpose, task, history, directory, action_input,repo_name,hf_token): | |
#action_input1=f"https://huggingface.co/spaces/{repo_name}/raw/main/{action_input.split('./',1)[1]}" | |
if "'" in action_input: | |
action_input.strip("'") | |
print (f'ACTION INPUT: {action_input}') | |
if not os.path.exists(f"{action_input}"): | |
history += "observation: file does not exist\n" | |
return "MAIN", None, history, task | |
( | |
module_summary, | |
content, | |
_, | |
) = read_python_module_structure(directory) | |
f_content = ( | |
content[action_input] if content[action_input] else "< document is empty >" | |
) | |
resp = run_gpt( | |
MODIFY_PROMPT, | |
stop_tokens=["action:", "thought:", "observation:"], | |
max_tokens=2048, | |
module_summary=module_summary, | |
purpose=purpose, | |
task=task, | |
history=history, | |
file_path=action_input, | |
file_contents=f_content, | |
) | |
new_contents, description = parse_file_content(resp) | |
if new_contents is None: | |
history += "observation: failed to modify file\n" | |
return "MAIN", None, history, task | |
hfwrite(new_contents,action_input,repo_name,hf_token) | |
with open(action_input, "w") as f: | |
f.write(new_contents) | |
#print (new_contents) | |
history += "observation: file successfully modified\n" | |
history += "obsertation: {}\n".format(description) | |
return "MAIN", None, history, task | |
def call_add(purpose, task, history, directory, action_input,repo_name,hf_token): | |
d = os.path.dirname(action_input) | |
if not d.startswith(directory): | |
history += "observation: files must be under directory {}\n".format(directory) | |
elif not action_input.endswith(".py"): | |
history += "observation: can only write .py and .txt files\n" | |
else: | |
if d and not os.path.exists(d): | |
os.makedirs(d) | |
if not os.path.exists(action_input): | |
module_summary, _, _ = read_python_module_structure(directory) | |
resp = run_gpt( | |
ADD_PROMPT, | |
stop_tokens=["action:", "thought:", "observation:"], | |
max_tokens=2048, | |
module_summary=module_summary, | |
purpose=purpose, | |
task=task, | |
history=history, | |
file_path=action_input, | |
) | |
new_contents, description = parse_file_content(resp) | |
if new_contents is None: | |
history += "observation: failed to write file\n" | |
return "MAIN", None, history, task | |
hfwrite(new_contents,action_input,repo_name,hf_token) | |
with open(action_input, "w") as f: | |
f.write(new_contents) | |
print (f'filepath: {action_input}') | |
print (f'filepath: {Path(action_input)}') | |
#new_dir = Path(action_input) | |
#input("File Added, Press key to Continue") | |
history += "observation: file successfully written" | |
history += "obsertation: {}\n".format(description) | |
else: | |
history += "observation: file already exists\n" | |
return "MAIN", None, history, task | |
def call_research(purpose, task, history, directory, action_input,repo_name,hf_token): | |
history += "observation: cannot perform research on internet\n" | |
return "MAIN", None, history, task | |
def call_find(purpose, task, history, directory, action_input,repo_name,hf_token): | |
history += "observation: have no ability to find external information\n" | |
return "MAIN", None, history, task | |
def call_run(purpose, task, history, directory, action_input,repo_name,hf_token): | |
history += "observation: cannot install packages directly" | |
return "MAIN", None, history, task | |
NAME_TO_FUNC = { | |
"MAIN": call_main, | |
"UPDATE-TASK": call_set_task, | |
"MODIFY-FILE": call_modify, | |
"READ-FILE": call_read, | |
"ADD-FILE": call_add, | |
"TEST": call_test, | |
"RESEARCH": call_research, | |
"FIND": call_find, | |
"RUN": call_run, | |
} | |
def run_action(purpose, task, history, directory, action_name, action_input,repo_name,hf_token): | |
if action_name == "COMPLETE": | |
exit(0) | |
# compress the history when it is long | |
if len(history.split("\n")) > MAX_HISTORY: | |
if VERBOSE: | |
print("COMPRESSING HISTORY") | |
history = compress_history(purpose, task, history, directory) | |
if action_name in NAME_TO_FUNC: | |
assert action_name in NAME_TO_FUNC | |
print(f"RUN: {action_name} ACTION_NAME: {action_input}") | |
return NAME_TO_FUNC[action_name](purpose, task, history, directory, action_input,repo_name,hf_token) | |
else: | |
history += "observation: The TOOL I tried to use returned an error\n" | |
return "UPDATE-TASK", None, history, task | |
def run(purpose, directory, repo_name,hf_token,task=""): | |
#openai.api_key = "" | |
history = "" | |
action_name = "UPDATE-TASK" if task == "" else "MAIN" | |
action_input = None | |
while True: | |
print("") | |
print("") | |
print("---") | |
print("purpose:", purpose) | |
print("task:", task) | |
print("---") | |
print(history) | |
print("---") | |
action_name, action_input, history, task = run_action( | |
purpose, | |
task, | |
history, | |
directory, | |
action_name, | |
action_input, | |
repo_name, | |
hf_token, | |
) | |
yield f'{history}' | |
hf_tok=os.environ.get('HF_TOKEN') | |
def run_ma(purpose,repo_name,hf_token): | |
hf_token=hf_tok | |
purpose = f'{purpose}' | |
dr = run(purpose, directory,repo_name=repo_name,hf_token=hf_token) | |
#dr = agi.run(purpose, directory,repo_name=repo_name,hf_token=hf_token) | |
def checkp(inp): | |
if inp == pa: | |
return gr.update(visible=False), gr.update(visible=True) | |
elif inp != pa: | |
return gr.update(visible=True), gr.update(visible=False) | |
with gr.Blocks() as app: | |
with gr.Row(visible = False) as no: | |
enterp = gr.Textbox() | |
checkb=gr.Button() | |
with gr.Box(visible=True) as go: | |
with gr.Row(): | |
box1=gr.Textbox(label="Repo/Name") | |
box2=gr.Textbox(label="Write Token") | |
box3=gr.Textbox(label="OpenAI API Token") | |
go_btn=gr.Button() | |
purp=gr.Textbox(label="Describe Python Program",lines=10) | |
outp=gr.Textbox() | |
#checkb.click(checkp,enterp,[no,go]) | |
go_btn.click(run_ma,[purp,box1,box2],outp) | |
app.launch(show_api=False) |