Spaces:
Running
Running
import os | |
import subprocess | |
import random | |
import time | |
from typing import Dict, List, Tuple | |
from datetime import datetime | |
import logging | |
import gradio as gr | |
from huggingface_hub import InferenceClient | |
from safe_search import safe_search | |
from i_search import google, i_search as i_s | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
import random | |
import prompts | |
# --- Configuration --- | |
VERBOSE = True | |
MAX_HISTORY = 5 | |
MAX_TOKENS = 2048 | |
TEMPERATURE = 0.7 | |
TOP_P = 0.8 | |
REPETITION_PENALTY = 1.5 | |
MODEL_NAME = "codellama/CodeLlama-7b-Python-hf" # Use CodeLlama for code-related tasks | |
API_KEY = os.getenv("HUGGINGFACE_API_KEY") | |
# --- Logging Setup --- | |
logging.basicConfig( | |
filename="app.log", | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
) | |
# --- Agents --- | |
agents = [ | |
"WEB_DEV", | |
"AI_SYSTEM_PROMPT", | |
"PYTHON_CODE_DEV", | |
"DATA_SCIENCE", | |
"UI_UX_DESIGN", | |
] | |
# --- Prompts --- | |
PREFIX = """ | |
{date_time_str} | |
Purpose: {purpose} | |
Safe Search: {safe_search} | |
""" | |
LOG_PROMPT = """ | |
PROMPT: {content} | |
""" | |
LOG_RESPONSE = """ | |
RESPONSE: {resp} | |
""" | |
COMPRESS_HISTORY_PROMPT = """ | |
You are a helpful AI assistant. Your task is to compress the following history into a summary that is no longer than 512 tokens. | |
History: {history} | |
""" | |
ACTION_PROMPT = """ | |
You are a helpful AI assistant. You are working on the task: {task} | |
Your current history is: {history} | |
What is your next thought? | |
thought: | |
What is your next action? | |
action: | |
""" | |
TASK_PROMPT = """ | |
You are a helpful AI assistant. Your current history is: {history} | |
What is the next task? | |
task: | |
""" | |
UNDERSTAND_TEST_RESULTS_PROMPT = """ | |
You are a helpful AI assistant. The test results are: {test_results} | |
What do you want to know about the test results? | |
thought: | |
""" | |
# --- Functions --- | |
def format_prompt(message: str, history: List[Tuple[str, str]], max_history_turns: int = 2) -> str: | |
"""Formats the prompt for the LLM, including the message and recent history.""" | |
prompt = " " | |
for user_prompt, bot_response in history[-max_history_turns:]: | |
prompt += f"[INST] {user_prompt} [/INST] {bot_response} " | |
prompt += f"[INST] {message} [/INST]" | |
return prompt | |
def run_llm( | |
prompt_template: str, | |
stop_tokens: List[str], | |
purpose: str, | |
**prompt_kwargs: Dict, | |
) -> str: | |
"""Runs the LLM with the given prompt template, stop tokens, and purpose.""" | |
seed = random.randint(1, 1111111111111111) | |
logging.info(f"Seed: {seed}") | |
content = PREFIX.format( | |
date_time_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
purpose=purpose, | |
safe_search=safe_search, | |
) + prompt_template.format(**prompt_kwargs) | |
if VERBOSE: | |
logging.info(LOG_PROMPT.format(content=content)) | |
client = InferenceClient(model=MODEL_NAME, token=API_KEY) | |
resp = client.text_generation( | |
content, | |
max_new_tokens=MAX_TOKENS, | |
stop_sequences=stop_tokens, | |
temperature=TEMPERATURE, | |
top_p=TOP_P, | |
repetition_penalty=REPETITION_PENALTY, | |
) | |
if VERBOSE: | |
logging.info(LOG_RESPONSE.format(resp=resp)) | |
return resp.text # Access the text attribute of the response | |
def generate( | |
prompt: str, | |
history: List[Tuple[str, str]], | |
agent_name: str = agents[0], | |
sys_prompt: str = "", | |
temperature: float = TEMPERATURE, | |
max_new_tokens: int = MAX_TOKENS, | |
top_p: float = TOP_P, | |
repetition_penalty: float = REPETITION_PENALTY, | |
) -> str: | |
"""Generates a response from the LLM based on the prompt, history, and other parameters.""" | |
content = PREFIX.format( | |
date_time_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
purpose=f"Generating response as {agent_name}", | |
safe_search=safe_search, | |
) + sys_prompt + "\n" + prompt | |
if VERBOSE: | |
logging.info(LOG_PROMPT.format(content=content)) | |
client = InferenceClient(model=MODEL_NAME, token=API_KEY) | |
response = client.text_generation( | |
content, | |
max_new_tokens=max_new_tokens, | |
temperature=temperature, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
) | |
if VERBOSE: | |
logging.info(LOG_RESPONSE.format(resp=response)) | |
return response.text | |
# --- Mixtral Integration --- | |
def mixtral_generate( | |
prompt: str, | |
history: List[Tuple[str, str]], | |
agent_name: str = agents[0], | |
sys_prompt: str = "", | |
temperature: float = TEMPERATURE, | |
max_new_tokens: int = MAX_TOKENS, | |
top_p: float = TOP_P, | |
repetition_penalty: float = REPETITION_PENALTY, | |
) -> str: | |
"""Generates a response using the Mixtral model.""" | |
tokenizer = AutoTokenizer.from_pretrained("mistralai/Mixtral-8x7B-Instruct-v0.1") # Use Mixtral model | |
model = AutoModelForCausalLM.from_pretrained("mistralai/Mixtral-8x7B-Instruct-v0.1") | |
content = PREFIX.format( | |
date_time_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
purpose=f"Generating response as {agent_name}", | |
safe_search=safe_search, | |
) + sys_prompt + "\n" + prompt | |
inputs = tokenizer(content, return_tensors="pt") | |
outputs = model.generate(**inputs, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, repetition_penalty=repetition_penalty) | |
return tokenizer.decode(outputs[0], skip_special_tokens=True) | |
def main(): | |
"""Main function to launch the Gradio interface.""" | |
with gr.Blocks() as demo: | |
gr.Markdown("## FragMixt: The No-Code Development Powerhouse") | |
gr.Markdown("### Your AI-Powered Development Companion") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
chatbot = gr.Chatbot( | |
show_label=False, | |
show_share_button=False, | |
show_copy_button=True, | |
likeable=True, | |
layout="panel", | |
) | |
message = gr.Textbox( | |
label="Enter your message", placeholder="Ask me anything!" | |
) | |
submit_button = gr.Button(value="Send") | |
with gr.Column(scale=1): | |
purpose = gr.Textbox( | |
label="Purpose", placeholder="What is the purpose of this interaction?" | |
) | |
agent_name = gr.Dropdown( | |
label="Agents", | |
choices=[s for s in agents], | |
value=agents[0], | |
interactive=True, | |
) | |
sys_prompt = gr.Textbox( | |
label="System Prompt", max_lines=1, interactive=True | |
) | |
temperature = gr.Slider( | |
label="Temperature", | |
value=TEMPERATURE, | |
minimum=0.0, | |
maximum=1.0, | |
step=0.05, | |
interactive=True, | |
info="Higher values produce more diverse outputs", | |
) | |
max_new_tokens = gr.Slider( | |
label="Max new tokens", | |
value=MAX_TOKENS, | |
minimum=0, | |
maximum=1048 * 10, | |
step=64, | |
interactive=True, | |
info="The maximum numbers of new tokens", | |
) | |
top_p = gr.Slider( | |
label="Top-p (nucleus sampling)", | |
value=TOP_P, | |
minimum=0.0, | |
maximum=1, | |
step=0.05, | |
interactive=True, | |
info="Higher values sample more low-probability tokens", | |
) | |
repetition_penalty = gr.Slider( | |
label="Repetition penalty", | |
value=REPETITION_PENALTY, | |
minimum=1.0, | |
maximum=2.0, | |
step=0.05, | |
interactive=True, | |
info="Penalize repeated tokens", | |
) | |
with gr.Tabs(): | |
with gr.TabItem("Project Explorer"): | |
project_path = gr.Textbox( | |
label="Project Path", placeholder="/home/user/app/current_project" | |
) | |
explore_button = gr.Button(value="Explore") | |
project_output = gr.Textbox(label="File Tree", lines=20) | |
with gr.TabItem("Code Editor"): | |
code_editor = gr.Code(label="Code Editor", language="python") | |
run_code_button = gr.Button(value="Run Code") | |
code_output = gr.Textbox(label="Code Output", lines=10) | |
with gr.TabItem("File Management"): | |
file_list = gr.Dropdown( | |
label="Select File", choices=[], interactive=True | |
) | |
file_content = gr.Textbox(label="File Content", lines=20) | |
save_file_button = gr.Button(value="Save File") | |
create_file_button = gr.Button(value="Create New File") | |
delete_file_button = gr.Button(value="Delete File") | |
history = gr.State([]) | |
def chat( | |
purpose: str, | |
message: str, | |
agent_name: str, | |
sys_prompt: str, | |
temperature: float, | |
max_new_tokens: int, | |
top_p: float, | |
repetition_penalty: float, | |
history: List[Tuple[str, str]], | |
) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]: | |
"""Handles the chat interaction, generating responses and updating history.""" | |
prompt = format_prompt(message, history) | |
# Use Mixtral for generation | |
response = mixtral_generate( | |
prompt, | |
history, | |
agent_name, | |
sys_prompt, | |
temperature, | |
max_new_tokens, | |
top_p, | |
repetition_penalty, | |
) | |
history.append((message, response)) | |
return history, history | |
submit_button.click( | |
chat, | |
inputs=[ | |
purpose, | |
message, | |
agent_name, | |
sys_prompt, | |
temperature, | |
max_new_tokens, | |
top_p, | |
repetition_penalty, | |
history, | |
], | |
outputs=[chatbot, history], | |
) | |
def explore_project(project_path: str) -> str: | |
"""Explores the project directory and displays the file tree.""" | |
try: | |
tree = subprocess.check_output(["tree", project_path]).decode("utf-8") | |
return tree | |
except Exception as e: | |
return f"Error exploring project: {e}" | |
explore_button.click( | |
explore_project, inputs=[project_path], outputs=[project_output] | |
) | |
def run_code(code: str) -> str: | |
"""Executes the Python code in the code editor and returns the output.""" | |
try: | |
exec_globals = {} | |
exec(code, exec_globals) | |
output = exec_globals.get("__builtins__", {}).get("print", print) | |
return str(output) | |
except Exception as e: | |
return f"Error running code: {e}" | |
run_code_button.click( | |
run_code, inputs=[code_editor], outputs=[code_output] | |
) | |
def load_file_list(project_path: str) -> List[str]: | |
"""Loads the list of files in the project directory.""" | |
try: | |
return [ | |
f | |
for f in os.listdir(project_path) | |
if os.path.isfile(os.path.join(project_path, f)) | |
] | |
except Exception as e: | |
return [f"Error loading file list: {e}"] | |
def load_file_content(project_path: str, file_name: str) -> str: | |
"""Loads the content of the selected file.""" | |
try: | |
with open(os.path.join(project_path, file_name), "r") as file: | |
return file.read() | |
except Exception as e: | |
return f"Error loading file content: {e}" | |
def save_file(project_path: str, file_name: str, content: str) -> str: | |
"""Saves the content to the selected file.""" | |
try: | |
with open(os.path.join(project_path, file_name), "w") as file: | |
file.write(content) | |
return f"File {file_name} saved successfully." | |
except Exception as e: | |
return f"Error saving file: {e}" | |
def create_file(project_path: str, file_name: str) -> str: | |
"""Creates a new file in the project directory.""" | |
try: | |
os.makedirs(os.path.dirname(os.path.join(project_path, file_name)), exist_ok=True) # Create directory if needed | |
open(os.path.join(project_path, file_name), "a").close() | |
return f"File {file_name} created successfully." | |
except Exception as e: | |
return f"Error creating file: {e}" | |
def delete_file(project_path: str, file_name: str) -> str: | |
"""Deletes the selected file from the project directory.""" | |
try: | |
os.remove(os.path.join(project_path, file_name)) | |
return f"File {file_name} deleted successfully." | |
except Exception as e: | |
return f"Error deleting file: {e}" | |
project_path.change( | |
load_file_list, inputs=[project_path], outputs=[file_list] | |
) | |
file_list.change( | |
load_file_content, inputs=[project_path, file_list], outputs=[file_content] | |
) | |
save_file_button.click( | |
save_file, inputs=[project_path, file_list, file_content], outputs=[gr.Textbox()] | |
) | |
create_file_button.click( | |
create_file, | |
inputs=[project_path, gr.Textbox(label="New File Name")], | |
outputs=[gr.Textbox()], | |
) | |
delete_file_button.click( | |
delete_file, inputs=[project_path, file_list], outputs=[gr.Textbox()] | |
) | |
demo.launch() | |
if __name__ == "__main__": | |
main() |