acecalisto3 commited on
Commit
350eab5
1 Parent(s): d7d83b9

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +238 -264
app.py CHANGED
@@ -1,280 +1,254 @@
1
  import os
2
  import subprocess
3
- import random
4
- import time
5
- from typing import Dict, List, Tuple
6
- from datetime import datetime
7
- import logging
8
-
9
- import gradio as gr
10
- from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
11
- from huggingface_hub import InferenceClient, cached_download, Repository, HfApi
12
- from IPython.display import display, HTML
13
-
14
- # --- Configuration ---
15
- VERBOSE = True
16
- MAX_HISTORY = 5
17
- MAX_TOKENS = 2048
18
- TEMPERATURE = 0.7
19
- TOP_P = 0.8
20
- REPETITION_PENALTY = 1.5
21
- DEFAULT_PROJECT_PATH = "./my-hf-project" # Default project directory
22
-
23
- # --- Logging Setup ---
24
- logging.basicConfig(
25
- filename="app.log",
26
- level=logging.INFO,
27
- format="%(asctime)s - %(levelname)s - %(message)s",
28
  )
 
29
 
30
- # --- Global Variables ---
31
- current_model = None # Store the currently loaded model
32
- repo = None # Store the Hugging Face Repository object
33
- model_descriptions = {} # Store model descriptions
34
-
35
- # --- Functions ---
36
- def format_prompt(message: str, history: List[Tuple[str, str]], max_history_turns: int = 2) -> str:
37
- prompt = ""
38
- for user_prompt, bot_response in history[-max_history_turns:]:
39
- prompt += f"Human: {user_prompt}\nAssistant: {bot_response}\n"
40
- prompt += f"Human: {message}\nAssistant:"
41
- return prompt
42
-
43
- def generate_response(
44
- prompt: str,
45
- history: List[Tuple[str, str]],
46
- agent_name: str = "Generic Agent",
47
- sys_prompt: str = "",
48
- temperature: float = TEMPERATURE,
49
- max_new_tokens: int = MAX_TOKENS,
50
- top_p: float = TOP_P,
51
- repetition_penalty: float = REPETITION_PENALTY,
52
- ) -> str:
53
- global current_model
54
- if current_model is None:
55
- return "Error: Please load a model first."
56
 
57
- date_time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
58
- full_prompt = PREFIX.format(
59
- date_time_str=date_time_str,
60
- purpose=sys_prompt,
61
- agent_name=agent_name
62
- ) + format_prompt(prompt, history)
63
 
 
 
64
  if VERBOSE:
65
- logging.info(LOG_PROMPT.format(content=full_prompt))
66
 
67
- response = current_model(
68
- full_prompt,
69
- max_new_tokens=max_new_tokens,
70
- temperature=temperature,
71
- top_p=top_p,
72
- repetition_penalty=repetition_penalty,
73
- do_sample=True
74
- )[0]['generated_text']
75
-
76
- assistant_response = response.split("Assistant:")[-1].strip()
77
 
78
  if VERBOSE:
79
- logging.info(LOG_RESPONSE.format(resp=assistant_response))
80
-
81
- return assistant_response
82
-
83
- def load_hf_model(model_name: str):
84
- """Loads a language model and fetches its description."""
85
- global current_model, model_descriptions
86
- try:
87
- tokenizer = AutoTokenizer.from_pretrained(model_name)
88
- current_model = pipeline(
89
- "text-generation",
90
- model=model_name,
91
- tokenizer=tokenizer,
92
- model_kwargs={"load_in_8bit": True}
93
- )
94
-
95
- # Fetch and store the model description
96
- api = HfApi()
97
- model_info = api.model_info(model_name)
98
- model_descriptions[model_name] = model_info.pipeline_tag
99
- return f"Successfully loaded model: {model_name}"
100
- except Exception as e:
101
- return f"Error loading model: {str(e)}"
102
-
103
- def execute_command(command: str, project_path: str = None) -> str:
104
- """Executes a shell command and returns the output."""
105
- try:
106
- if project_path:
107
- process = subprocess.Popen(command, shell=True, cwd=project_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
108
- else:
109
- process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
110
- output, error = process.communicate()
111
- if error:
112
- return f"Error: {error.decode('utf-8')}"
113
- return output.decode("utf-8")
114
- except Exception as e:
115
- return f"Error executing command: {str(e)}"
116
-
117
- def create_hf_project(project_name: str, project_path: str = DEFAULT_PROJECT_PATH):
118
- """Creates a new Hugging Face project."""
119
- global repo
120
- try:
121
- if os.path.exists(project_path):
122
- return f"Error: Directory '{project_path}' already exists!"
123
- # Create the repository
124
- repo = Repository(local_dir=project_path, clone_from=None)
125
- repo.git_init()
126
-
127
- # Add basic files (optional, you can customize this)
128
- with open(os.path.join(project_path, "README.md"), "w") as f:
129
- f.write(f"# {project_name}\n\nA new Hugging Face project.")
130
-
131
- # Stage all changes
132
- repo.git_add(pattern="*")
133
- repo.git_commit(commit_message="Initial commit")
134
-
135
- return f"Hugging Face project '{project_name}' created successfully at '{project_path}'"
136
- except Exception as e:
137
- return f"Error creating Hugging Face project: {str(e)}"
138
-
139
- def list_project_files(project_path: str = DEFAULT_PROJECT_PATH) -> str:
140
- """Lists files in the project directory."""
141
- try:
142
- files = os.listdir(project_path)
143
- if not files:
144
- return "Project directory is empty."
145
- return "\n".join(files)
146
- except Exception as e:
147
- return f"Error listing project files: {str(e)}"
148
-
149
- def read_file_content(file_path: str, project_path: str = DEFAULT_PROJECT_PATH) -> str:
150
- """Reads and returns the content of a file in the project."""
151
- try:
152
- full_path = os.path.join(project_path, file_path)
153
- with open(full_path, "r") as f:
154
- content = f.read()
155
- return content
156
- except Exception as e:
157
- return f"Error reading file: {str(e)}"
158
-
159
- def write_to_file(file_path: str, content: str, project_path: str = DEFAULT_PROJECT_PATH) -> str:
160
- """Writes content to a file in the project."""
161
- try:
162
- full_path = os.path.join(project_path, file_path)
163
- with open(full_path, "w") as f:
164
- f.write(content)
165
- return f"Successfully wrote to '{file_path}'"
166
- except Exception as e:
167
- return f"Error writing to file: {str(e)}"
168
-
169
- def preview_project(project_path: str = DEFAULT_PROJECT_PATH):
170
- """Provides a preview of the project, if applicable."""
171
- # Assuming a simple HTML preview for now
172
- try:
173
- index_html_path = os.path.join(project_path, "index.html")
174
- if os.path.exists(index_html_path):
175
- with open(index_html_path, "r") as f:
176
- html_content = f.read()
177
- display(HTML(html_content))
178
- return "Previewing 'index.html'"
179
  else:
180
- return "No 'index.html' found for preview."
181
- except Exception as e:
182
- return f"Error previewing project: {str(e)}"
183
-
184
- def main():
185
- with gr.Blocks() as demo:
186
- gr.Markdown("## FragMixt: Your Hugging Face No-Code App Builder")
187
-
188
- # --- Model Selection ---
189
- with gr.Tab("Model"):
190
- # --- Model Dropdown with Categories ---
191
- model_categories = gr.Dropdown(
192
- choices=["Text Generation", "Text Summarization", "Code Generation", "Translation", "Question Answering"],
193
- label="Model Category",
194
- value="Text Generation"
195
- )
196
- model_name = gr.Dropdown(
197
- choices=[], # Initially empty, will be populated based on category
198
- label="Hugging Face Model Name",
199
- )
200
- load_button = gr.Button("Load Model")
201
- load_output = gr.Textbox(label="Output")
202
- model_description = gr.Markdown(label="Model Description")
203
-
204
- # --- Function to populate model names based on category ---
205
- def update_model_dropdown(category):
206
- models = []
207
- api = HfApi()
208
- for model in api.list_models():
209
- if model.pipeline_tag == category:
210
- models.append(model.modelId)
211
- return gr.Dropdown.update(choices=models)
212
-
213
- # --- Event handler for category dropdown ---
214
- model_categories.change(
215
- fn=update_model_dropdown,
216
- inputs=model_categories,
217
- outputs=model_name,
218
- )
219
-
220
- # --- Event handler to display model description ---
221
- def display_model_description(model_name):
222
- global model_descriptions
223
- if model_name in model_descriptions:
224
- return model_descriptions[model_name]
225
- else:
226
- return "Model description not available."
227
-
228
- model_name.change(
229
- fn=display_model_description,
230
- inputs=model_name,
231
- outputs=model_description,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
232
  )
 
 
 
 
233
 
234
- load_button.click(load_hf_model, inputs=model_name, outputs=load_output)
235
-
236
- # --- Chat Interface ---
237
- with gr.Tab("Chat"):
238
- chatbot = gr.Chatbot(show_label=False, show_share_button=False, show_copy_button=True, likeable=True)
239
- message = gr.Textbox(label="Enter your message", placeholder="Ask me anything!")
240
- purpose = gr.Textbox(label="Purpose", placeholder="What is the purpose of this interaction?")
241
- agent_name = gr.Dropdown(label="Agents", choices=["Generic Agent"], value="Generic Agent", interactive=True)
242
- sys_prompt = gr.Textbox(label="System Prompt", max_lines=1, interactive=True)
243
- temperature = gr.Slider(label="Temperature", value=TEMPERATURE, minimum=0.0, maximum=1.0, step=0.05, interactive=True, info="Higher values produce more diverse outputs")
244
- max_new_tokens = gr.Slider(label="Max new tokens", value=MAX_TOKENS, minimum=0, maximum=1048 * 10, step=64, interactive=True, info="The maximum numbers of new tokens")
245
- top_p = gr.Slider(label="Top-p (nucleus sampling)", value=TOP_P, minimum=0.0, maximum=1, step=0.05, interactive=True, info="Higher values sample more low-probability tokens")
246
- repetition_penalty = gr.Slider(label="Repetition penalty", value=REPETITION_PENALTY, minimum=1.0, maximum=2.0, step=0.05, interactive=True, info="Penalize repeated tokens")
247
- submit_button = gr.Button(value="Send")
248
- history = gr.State([])
249
-
250
- def run_chat(purpose: str, message: str, agent_name: str, sys_prompt: str, temperature: float, max_new_tokens: int, top_p: float, repetition_penalty: float, history: List[Tuple[str, str]]) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]:
251
- response = generate_response(message, history, agent_name, sys_prompt, temperature, max_new_tokens, top_p, repetition_penalty)
252
- history.append((message, response))
253
- return history, history
254
-
255
- submit_button.click(run_chat, inputs=[purpose, message, agent_name, sys_prompt, temperature, max_new_tokens, top_p, repetition_penalty, history], outputs=[chatbot, history])
256
 
257
- # --- Project Management ---
258
- with gr.Tab("Project"):
259
- project_name = gr.Textbox(label="Project Name", placeholder="MyHuggingFaceApp")
260
- create_project_button = gr.Button("Create Hugging Face Project")
261
- project_output = gr.Textbox(label="Output", lines=5)
262
- file_content = gr.Code(label="File Content", language="python", lines=20)
263
- file_path = gr.Textbox(label="File Path (relative to project)", placeholder="src/main.py")
264
- read_button = gr.Button("Read File")
265
- write_button = gr.Button("Write to File")
266
- command_input = gr.Textbox(label="Terminal Command", placeholder="pip install -r requirements.txt")
267
- command_output = gr.Textbox(label="Command Output", lines=5)
268
- run_command_button = gr.Button("Run Command")
269
- preview_button = gr.Button("Preview Project")
270
-
271
- create_project_button.click(create_hf_project, inputs=[project_name], outputs=project_output)
272
- read_button.click(read_file_content, inputs=file_path, outputs=file_content)
273
- write_button.click(write_to_file, inputs=[file_path, file_content], outputs=project_output)
274
- run_command_button.click(execute_command, inputs=command_input, outputs=command_output)
275
- preview_button.click(preview_project, outputs=project_output)
276
-
277
- demo.launch(server_port=8080)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
278
 
279
  if __name__ == "__main__":
280
- main()
 
 
1
  import os
2
  import subprocess
3
+ import torch
4
+ from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
5
+ from agent.prompts import (
6
+ ACTION_PROMPT,
7
+ ADD_PROMPT,
8
+ COMPRESS_HISTORY_PROMPT,
9
+ LOG_PROMPT,
10
+ LOG_RESPONSE,
11
+ MODIFY_PROMPT,
12
+ PREFIX,
13
+ READ_PROMPT,
14
+ TASK_PROMPT,
15
+ UNDERSTAND_TEST_RESULTS_PROMPT,
 
 
 
 
 
 
 
 
 
 
 
 
16
  )
17
+ from agent.utils import parse_action, parse_file_content, read_python_module_structure
18
 
19
+ # Initialize Hugging Face model and tokenizer
20
+ TOKENIZER = AutoTokenizer.from_pretrained("typefully/rag-tokenbert-3B")
21
+ MODEL = AutoModelForSeq2SeqLM.from_pretrained("typefully/rag-tokenbert-3B")
22
+ PIPELINE = pipeline('text-generation', model=MODEL, tokenizer=TOKENIZER, device=-1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
+ VERBOSE = False
25
+ MAX_HISTORY = 100
 
 
 
 
26
 
27
+ def hf_run_gpt(prompt_template, stop_tokens, max_length, module_summary, purpose, **prompt_kwargs):
28
+ content = PREFIX.format(module_summary=module_summary, purpose=purpose) + prompt_template.format(**prompt_kwargs)
29
  if VERBOSE:
30
+ print(LOG_PROMPT.format(content))
31
 
32
+ input_seq = TOKENIZER(content, return_tensors='pt', truncation=True, padding='longest')['input_ids']
33
+ output_sequences = PIPELINE(input_seq, max_length=max_length, num_return_sequences=1, do_sample=False)
34
+ resp = TOKENIZER.decode(output_sequences[0]['generated_text'], skip_special_tokens=True)
 
 
 
 
 
 
 
35
 
36
  if VERBOSE:
37
+ print(LOG_RESPONSE.format(resp))
38
+ return resp
39
+
40
+ def compress_history(purpose, task, history, directory):
41
+ module_summary, _, _ = read_python_module_structure(directory)
42
+ resp = hf_run_gpt(
43
+ COMPRESS_HISTORY_PROMPT,
44
+ stop_tokens=["observation:", "task:", "action:", "thought:"],
45
+ max_length=512,
46
+ module_summary=module_summary,
47
+ purpose=purpose,
48
+ task=task,
49
+ history=history,
50
+ )
51
+ history = "observation: {}\n".format(resp)
52
+ return history
53
+
54
+ def call_main(purpose, task, history, directory, action_input):
55
+ module_summary, _, _ = read_python_module_structure(directory)
56
+ resp = hf_run_gpt(
57
+ ACTION_PROMPT,
58
+ stop_tokens=["observation:", "task:"],
59
+ max_length=256,
60
+ module_summary=module_summary,
61
+ purpose=purpose,
62
+ task=task,
63
+ history=history,
64
+ )
65
+ lines = resp.strip().split("\n")
66
+ for line in lines:
67
+ if line.startswith("thought: "):
68
+ history += "{}\n".format(line)
69
+ elif line.startswith("action: "):
70
+ action_name, action_input = parse_action(line)
71
+ history += "{}\n".format(line)
72
+ return action_name, action_input, history, task
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  else:
74
+ assert False, "unknown action: {}".format(line)
75
+ return "MAIN", None, history, task
76
+
77
+ def call_test(purpose, task, history, directory, action_input):
78
+ result = subprocess.run(
79
+ ["python", "-m", "pytest", "--collect-only", directory],
80
+ capture_output=True,
81
+ text=True,
82
+ )
83
+ if result.returncode != 0:
84
+ history += "observation: there are no tests! Test should be written in a test folder under {}\n".format(directory)
85
+ return "MAIN", None, history, task
86
+ result = subprocess.run(
87
+ ["python", "-m", "pytest", directory], capture_output=True, text=True
88
+ )
89
+ if result.returncode == 0:
90
+ history += "observation: tests pass\n"
91
+ return "MAIN", None, history, task
92
+ module_summary, content, _ = read_python_module_structure(directory)
93
+ resp = hf_run_gpt(
94
+ UNDERSTAND_TEST_RESULTS_PROMPT,
95
+ stop_tokens=[],
96
+ max_length=256,
97
+ module_summary=module_summary,
98
+ purpose=purpose,
99
+ task=task,
100
+ history=history,
101
+ stdout=result.stdout[:5000], # limit amount of text
102
+ stderr=result.stderr[:5000], # limit amount of text
103
+ )
104
+ history += "observation: tests failed: {}\n".format(resp)
105
+ return "MAIN", None, history, task
106
+
107
+ def call_set_task(purpose, task, history, directory, action_input):
108
+ module_summary, content, _ = read_python_module_structure(directory)
109
+ task = hf_run_gpt(
110
+ TASK_PROMPT,
111
+ stop_tokens=[],
112
+ max_length=64,
113
+ module_summary=module_summary,
114
+ purpose=purpose,
115
+ task=task,
116
+ history=history,
117
+ ).strip("\n")
118
+ history += "observation: task has been updated to: {}\n".format(task)
119
+ return "MAIN", None, history, task
120
+
121
+ def call_read(purpose, task, history, directory, action_input):
122
+ if not os.path.exists(action_input):
123
+ history += "observation: file does not exist\n"
124
+ return "MAIN", None, history, task
125
+ module_summary, content, _ = read_python_module_structure(directory)
126
+ f_content = content.get(action_input, "< document is empty >")
127
+ resp = hf_run_gpt(
128
+ READ_PROMPT,
129
+ stop_tokens=[],
130
+ max_length=256,
131
+ module_summary=module_summary,
132
+ purpose=purpose,
133
+ task=task,
134
+ history=history,
135
+ file_path=action_input,
136
+ file_contents=f_content,
137
+ ).strip("\n")
138
+ history += "observation: {}\n".format(resp)
139
+ return "MAIN", None, history, task
140
+
141
+ def call_modify(purpose, task, history, directory, action_input):
142
+ if not os.path.exists(action_input):
143
+ history += "observation: file does not exist\n"
144
+ return "MAIN", None, history, task
145
+ module_summary, content, _ = read_python_module_structure(directory)
146
+ f_content = content.get(action_input, "< document is empty >")
147
+ resp = hf_run_gpt(
148
+ MODIFY_PROMPT,
149
+ stop_tokens=["action:", "thought:", "observation:"],
150
+ max_length=2048,
151
+ module_summary=module_summary,
152
+ purpose=purpose,
153
+ task=task,
154
+ history=history,
155
+ file_path=action_input,
156
+ file_contents=f_content,
157
+ )
158
+ new_contents, description = parse_file_content(resp)
159
+ if new_contents is None:
160
+ history += "observation: failed to modify file\n"
161
+ return "MAIN", None, history, task
162
+
163
+ with open(action_input, "w") as f:
164
+ f.write(new_contents)
165
+
166
+ history += "observation: file successfully modified\n"
167
+ history += "observation: {}\n".format(description)
168
+ return "MAIN", None, history, task
169
+
170
+ def call_add(purpose, task, history, directory, action_input):
171
+ d = os.path.dirname(action_input)
172
+ if not d.startswith(directory):
173
+ history += "observation: files must be under directory {}\n".format(directory)
174
+ elif not action_input.endswith(".py"):
175
+ history += "observation: can only write .py files\n"
176
+ else:
177
+ if d and not os.path.exists(d):
178
+ os.makedirs(d)
179
+ if not os.path.exists(action_input):
180
+ module_summary, _, _ = read_python_module_structure(directory)
181
+ resp = hf_run_gpt(
182
+ ADD_PROMPT,
183
+ stop_tokens=["action:", "thought:", "observation:"],
184
+ max_length=2048,
185
+ module_summary=module_summary,
186
+ purpose=purpose,
187
+ task=task,
188
+ history=history,
189
+ file_path=action_input,
190
  )
191
+ new_contents, description = parse_file_content(resp)
192
+ if new_contents is None:
193
+ history += "observation: failed to write file\n"
194
+ return "MAIN", None, history, task
195
 
196
+ with open(action_input, "w") as f:
197
+ f.write(new_contents)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
198
 
199
+ history += "observation: file successfully written\n"
200
+ history += "observation: {}\n".format(description)
201
+ else:
202
+ history += "observation: file already exists\n"
203
+ return "MAIN", None, history, task
204
+
205
+ NAME_TO_FUNC = {
206
+ "MAIN": call_main,
207
+ "UPDATE-TASK": call_set_task,
208
+ "MODIFY-FILE": call_modify,
209
+ "READ-FILE": call_read,
210
+ "ADD-FILE": call_add,
211
+ "TEST": call_test,
212
+ }
213
+
214
+ def run_action(purpose, task, history, directory, action_name, action_input):
215
+ if action_name == "COMPLETE":
216
+ exit(0)
217
+
218
+ # compress the history when it is long
219
+ if len(history.split("\n")) > MAX_HISTORY:
220
+ if VERBOSE:
221
+ print("COMPRESSING HISTORY")
222
+ history = compress_history(purpose, task, history, directory)
223
+
224
+ assert action_name in NAME_TO_FUNC
225
+
226
+ print("RUN: ", action_name, action_input)
227
+ return NAME_TO_FUNC[action_name](purpose, task, history, directory, action_input)
228
+
229
+ def run(purpose, directory, task=None):
230
+ history = ""
231
+ action_name = "UPDATE-TASK" if task is None else "MAIN"
232
+ action_input = None
233
+ while True:
234
+ print("")
235
+ print("")
236
+ print("---")
237
+ print("purpose:", purpose)
238
+ print("task:", task)
239
+ print("---")
240
+ print(history)
241
+ print("---")
242
+
243
+ action_name, action_input, history, task = run_action(
244
+ purpose,
245
+ task,
246
+ history,
247
+ directory,
248
+ action_name,
249
+ action_input,
250
+ )
251
 
252
  if __name__ == "__main__":
253
+ # Example usage
254
+ run("Your purpose here", "path/to/your/directory")