Spaces:
Sleeping
Sleeping
import os | |
import subprocess | |
import streamlit as st | |
import gradio as gr | |
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
import black | |
from pylint import lint | |
from io import StringIO | |
import sys | |
from datetime import datetime | |
import requests | |
from bs4 import BeautifulSoup | |
from typing import List, Dict, Optional | |
# --- Custom Exceptions for Enhanced Error Handling --- | |
class InvalidActionError(Exception): | |
"""Raised when an invalid action is provided.""" | |
pass | |
class InvalidInputError(Exception): | |
"""Raised when invalid input is provided for an action.""" | |
pass | |
class CodeGenerationError(Exception): | |
"""Raised when code generation fails.""" | |
pass | |
class AppTestingError(Exception): | |
"""Raised when app testing fails.""" | |
pass | |
class WorkspaceExplorerError(Exception): | |
"""Raised when workspace exploration fails.""" | |
pass | |
class PromptManagementError(Exception): | |
"""Raised when prompt management fails.""" | |
pass | |
class SearchError(Exception): | |
"""Raised when search fails.""" | |
pass | |
class CodeRefinementError(Exception): | |
"""Raised when code refinement fails.""" | |
pass | |
class CodeTestingError(Exception): | |
"""Raised when code testing fails.""" | |
pass | |
class CodeIntegrationError(Exception): | |
"""Raised when code integration fails.""" | |
pass | |
# --- AI Agent Class --- | |
class AIAgent: | |
def __init__(self): | |
# --- Initialize Tools and Attributes --- | |
self.tools = { | |
"SEARCH": self.search, | |
"CODEGEN": self.code_generation, | |
"REFINE-CODE": self.refine_code, # Use internal function | |
"TEST-CODE": self.test_code, # Use internal function | |
"INTEGRATE-CODE": self.integrate_code, # Use internal function | |
"TEST-APP": self.test_app, | |
"GENERATE-REPORT": self.generate_report, | |
"WORKSPACE-EXPLORER": self.workspace_explorer, | |
"ADD_PROMPT": self.add_prompt, | |
"ACTION_PROMPT": self.action_prompt, | |
"COMPRESS_HISTORY_PROMPT": self.compress_history_prompt, | |
"LOG_PROMPT": self.log_prompt, | |
"LOG_RESPONSE": self.log_response, | |
"MODIFY_PROMPT": self.modify_prompt, | |
"PREFIX": self.prefix, | |
"SEARCH_QUERY": self.search_query, | |
"READ_PROMPT": self.read_prompt, | |
"TASK_PROMPT": self.task_prompt, | |
"UNDERSTAND_TEST_RESULTS_PROMPT": self.understand_test_results_prompt, | |
"EXECUTE_COMMAND": self.execute_command, # Add command execution | |
"PYTHON_INTERPRET": self.python_interpret, # Add Python interpretation | |
"NLP": self.nlp, # Add NLP capabilities | |
} | |
self.task_history: List[Dict[str, str]] = [] | |
self.current_task: Optional[str] = None | |
self.search_engine_url: str = "https://www.google.com/search?q=" # Default search engine | |
self.prompts: List[str] = [] # Store prompts for future use | |
self.code_generator = None # Initialize code generator later | |
self.available_models = [ | |
"gpt2", | |
"facebook/bart-large-cnn", | |
"google/flan-t5-xl", | |
"bigscience/T0_3B", | |
] # Add more as needed | |
self.selected_model = "gpt2" # Default model | |
self.nlp_pipeline = None # Initialize NLP pipeline later | |
# --- Search Functionality --- | |
def search(self, query: str) -> List[str]: | |
"""Performs a web search using the specified search engine.""" | |
search_url = self.search_engine_url + query | |
try: | |
response = requests.get(search_url) | |
response.raise_for_status() # Raise an exception for bad status codes | |
soup = BeautifulSoup(response.content, 'html.parser') | |
results = soup.find_all('a', href=True) | |
return [result['href'] for result in results] | |
except requests.exceptions.RequestException as e: | |
raise SearchError(f"Error during search: {e}") | |
# --- Code Generation Functionality --- | |
def code_generation(self, snippet: str) -> str: | |
"""Generates code based on the provided snippet or description.""" | |
try: | |
if not self.code_generator: | |
self.code_generator = pipeline( | |
'text-generation', model=self.selected_model | |
) | |
generated_text = self.code_generator( | |
snippet, max_length=500, num_return_sequences=1 | |
)[0]['generated_text'] | |
return generated_text | |
except Exception as e: | |
raise CodeGenerationError(f"Error during code generation: {e}") | |
# --- Code Refinement Functionality --- | |
def refine_code(self, code: str) -> str: | |
"""Refines the provided code string.""" | |
try: | |
refined_code = black.format_str(code, mode=black.FileMode()) | |
return refined_code | |
except black.InvalidInput: | |
raise CodeRefinementError("Error: Invalid code input for black formatting.") | |
except Exception as e: | |
raise CodeRefinementError(f"Error during code refinement: {e}") | |
# --- Code Testing Functionality --- | |
def test_code(self, code: str) -> str: | |
"""Tests the provided code string using pylint.""" | |
try: | |
# Use pylint to lint the code | |
lint_output = StringIO() | |
sys.stdout = lint_output | |
lint.Run(code.split('\n'), do_exit=False) | |
sys.stdout = sys.__stdout__ | |
return lint_output.getvalue() | |
except Exception as e: | |
raise CodeTestingError(f"Error during code testing: {e}") | |
# --- Code Integration Functionality --- | |
def integrate_code(self, file_path: str, code_snippet: str) -> str: | |
"""Integrates the code snippet into the specified file.""" | |
try: | |
# For simplicity, we'll just append the code snippet to the file | |
# In a real scenario, you'd need more sophisticated logic | |
with open(file_path, 'a') as f: | |
f.write(code_snippet) | |
return f"Code snippet integrated into {file_path}" | |
except Exception as e: | |
raise CodeIntegrationError(f"Error during code integration: {e}") | |
# --- App Testing Functionality --- | |
def test_app(self) -> str: | |
"""Tests the functionality of the app.""" | |
try: | |
subprocess.run(['streamlit', 'run', 'app.py'], check=True) | |
return "App tested successfully." | |
except subprocess.CalledProcessError as e: | |
raise AppTestingError(f"Error during app testing: {e}") | |
# --- Report Generation Functionality --- | |
def generate_report(self) -> str: | |
"""Generates a report based on the task history.""" | |
report = f"## Task Report: {self.current_task}\n\n" | |
for task in self.task_history: | |
report += f"**Action:** {task['action']}\n" | |
report += f"**Input:** {task['input']}\n" | |
report += f"**Output:** {task['output']}\n\n" | |
return report | |
# --- Workspace Exploration Functionality --- | |
def workspace_explorer(self) -> str: | |
"""Provides a workspace explorer functionality.""" | |
try: | |
current_directory = os.getcwd() | |
directories = [] | |
files = [] | |
for item in os.listdir(current_directory): | |
item_path = os.path.join(current_directory, item) | |
if os.path.isdir(item_path): | |
directories.append(item) | |
elif os.path.isfile(item_path): | |
files.append(item) | |
return f"**Directories:** {directories}\n**Files:** {files}" | |
except Exception as e: | |
raise WorkspaceExplorerError(f"Error during workspace exploration: {e}") | |
# --- Prompt Management Functionality --- | |
def add_prompt(self, prompt: str) -> str: | |
"""Adds a new prompt to the agent's knowledge base.""" | |
try: | |
self.prompts.append(prompt) | |
return f"Prompt '{prompt}' added successfully." | |
except Exception as e: | |
raise PromptManagementError(f"Error adding prompt: {e}") | |
# --- Prompt Generation Functionality --- | |
def action_prompt(self, action: str) -> str: | |
"""Provides a prompt for a specific action.""" | |
try: | |
if action == "SEARCH": | |
return "What do you want to search for?" | |
elif action == "CODEGEN": | |
return "Provide a code snippet to generate code from, or describe what you want the code to do." | |
elif action == "REFINE-CODE": | |
return "Provide the code to refine." | |
elif action == "TEST-CODE": | |
return "Provide the code to test." | |
elif action == "INTEGRATE-CODE": | |
return "Provide the file path and code snippet to integrate. For example: /path/to/your/file.py \"\"\"print('Hello, World!')\"\"\"" | |
elif action == "TEST-APP": | |
return "Test the application." | |
elif action == "GENERATE-REPORT": | |
return "Generate a report based on the task history." | |
elif action == "WORKSPACE-EXPLORER": | |
return "Explore the current workspace." | |
elif action == "ADD_PROMPT": | |
return "Enter the new prompt to add." | |
elif action == "ACTION_PROMPT": | |
return "Enter the action to get a prompt for." | |
elif action == "COMPRESS_HISTORY_PROMPT": | |
return "Compress the task history." | |
elif action == "LOG_PROMPT": | |
return "Enter the event to log." | |
elif action == "LOG_RESPONSE": | |
return "Log the specified event." | |
elif action == "MODIFY_PROMPT": | |
return "Enter the prompt to modify." | |
elif action == "PREFIX": | |
return "Enter the text to add a prefix to." | |
elif action == "SEARCH_QUERY": | |
return "Enter the topic to generate a search query for." | |
elif action == "READ_PROMPT": | |
return "Enter the file path to read." | |
elif action == "TASK_PROMPT": | |
return "Enter the new task to start." | |
elif action == "UNDERSTAND_TEST_RESULTS_PROMPT": | |
return "Enter your question about the test results." | |
elif action == "EXECUTE_COMMAND": | |
return "Enter the command to execute." | |
elif action == "PYTHON_INTERPRET": | |
return "Enter the Python code to interpret." | |
elif action == "NLP": | |
return "Enter the text for NLP analysis." | |
else: | |
raise InvalidActionError("Please provide a valid action.") | |
except InvalidActionError as e: | |
raise e | |
# --- Prompt Generation Functionality --- | |
def compress_history_prompt(self) -> str: | |
"""Provides a prompt to compress the task history.""" | |
return "Do you want to compress the task history?" | |
# --- Prompt Generation Functionality --- | |
def log_prompt(self) -> str: | |
"""Provides a prompt to log a specific event.""" | |
return "What event do you want to log?" | |
# --- Logging Functionality --- | |
def log_response(self, event: str) -> str: | |
"""Logs the specified event.""" | |
print(f"Event logged: {event}") | |
return "Event logged successfully." | |
# --- Prompt Modification Functionality --- | |
def modify_prompt(self, prompt: str) -> str: | |
"""Modifies an existing prompt.""" | |
try: | |
# Find the prompt to modify | |
# Update the prompt | |
return f"Prompt '{prompt}' modified successfully." | |
except Exception as e: | |
raise PromptManagementError(f"Error modifying prompt: {e}") | |
# --- Prefix Functionality --- | |
def prefix(self, text: str) -> str: | |
"""Adds a prefix to the provided text.""" | |
return f"PREFIX: {text}" | |
# --- Search Query Generation Functionality --- | |
def search_query(self, query: str) -> str: | |
"""Provides a search query for the specified topic.""" | |
return f"Search query: {query}" | |
# --- File Reading Functionality --- | |
def read_prompt(self, file_path: str) -> str: | |
"""Provides a prompt to read the contents of a file.""" | |
try: | |
with open(file_path, 'r') as f: | |
contents = f.read() | |
return contents | |
except FileNotFoundError: | |
raise InvalidInputError(f"Error: File not found: {file_path}") | |
except Exception as e: | |
raise InvalidInputError(f"Error reading file: {e}") | |
# --- Task Prompt Generation Functionality --- | |
def task_prompt(self) -> str: | |
"""Provides a prompt to start a new task.""" | |
return "What task do you want to start?" | |
# --- Test Results Understanding Prompt Generation Functionality --- | |
def understand_test_results_prompt(self) -> str: | |
"""Provides a prompt to understand the test results.""" | |
return "What do you want to know about the test results?" | |
# --- Command Execution Functionality --- | |
def execute_command(self, command: str) -> str: | |
"""Executes the provided command in the terminal.""" | |
try: | |
process = subprocess.run( | |
command.split(), capture_output=True, text=True | |
) | |
return f"Command output:\n{process.stdout}" | |
except subprocess.CalledProcessError as e: | |
return f"Error executing command: {e}" | |
# --- Python Interpretation Functionality --- | |
def python_interpret(self, code: str) -> str: | |
"""Interprets the provided Python code.""" | |
try: | |
exec(code) | |
return "Python code executed successfully." | |
except Exception as e: | |
return f"Error interpreting Python code: {e}" | |
# --- NLP Functionality --- | |
def nlp(self, text: str) -> str: | |
"""Performs NLP analysis on the provided text.""" | |
try: | |
if not self.nlp_pipeline: | |
self.nlp_pipeline = pipeline( | |
"sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-5-lit" | |
) # Example NLP pipeline | |
analysis = self.nlp_pipeline(text) | |
return f"NLP Analysis: {analysis}" | |
except Exception as e: | |
return f"Error performing NLP analysis: {e}" | |
# --- Input Handling Functionality --- | |
def handle_input(self, input_str: str): | |
"""Handles user input and executes the corresponding action.""" | |
try: | |
action, *args = input_str.split() | |
if action in self.tools: | |
if args: | |
try: | |
self.task_history.append( | |
{ | |
"action": action, | |
"input": " ".join(args), | |
"output": self.tools[action](" ".join(args)), | |
} | |
) | |
print( | |
f"Action: {action}\nInput: {' '.join(args)}\nOutput: {self.tools[action](' '.join(args))}" | |
) | |
except Exception as e: | |
self.task_history.append( | |
{ | |
"action": action, | |
"input": " ".join(args), | |
"output": f"Error: {e}", | |
} | |
) | |
print( | |
f"Action: {action}\nInput: {' '.join(args)}\nOutput: Error: {e}" | |
) | |
else: | |
try: | |
self.task_history.append( | |
{ | |
"action": action, | |
"input": None, | |
"output": self.tools[action](), | |
} | |
) | |
print( | |
f"Action: {action}\nInput: None\nOutput: {self.tools[action]()}" | |
) | |
except Exception as e: | |
self.task_history.append( | |
{ | |
"action": action, | |
"input": None, | |
"output": f"Error: {e}", | |
} | |
) | |
print( | |
f"Action: {action}\nInput: None\nOutput: Error: {e}" | |
) | |
else: | |
raise InvalidActionError( | |
"Invalid action. Please choose a valid action from the list of tools." | |
) | |
except ( | |
InvalidActionError, | |
InvalidInputError, | |
CodeGenerationError, | |
CodeRefinementError, | |
CodeTestingError, | |
CodeIntegrationError, | |
AppTestingError, | |
WorkspaceExplorerError, | |
PromptManagementError, | |
SearchError, | |
) as e: | |
print(f"Error: {e}") | |
# --- Main Loop of the Agent --- | |
def run(self): | |
"""Runs the agent continuously, waiting for user input.""" | |
while True: | |
input_str = input("Enter a command for the AI Agent: ") | |
self.handle_input(input_str) | |
# --- Streamlit Integration --- | |
if __name__ == "__main__": | |
agent = AIAgent() | |
st.set_page_config( | |
page_title="AI Agent", | |
page_icon="🤖", | |
layout="wide", | |
initial_sidebar_state="expanded", | |
) | |
# --- Tabbed Navigation --- | |
tabs = st.tabs(["Agent Generation", "Chat App"]) | |
# --- Agent Generation Tab --- | |
with tabs[0]: | |
st.title("AI Agent Generation") | |
st.sidebar.title("Agent Settings") | |
# --- Command Dropdown --- | |
command_options = [ | |
"SEARCH", | |
"CODEGEN", | |
"REFINE-CODE", | |
"TEST-CODE", | |
"INTEGRATE-CODE", | |
"TEST-APP", | |
"GENERATE-REPORT", | |
"WORKSPACE-EXPLORER", | |
"ADD_PROMPT", | |
"ACTION_PROMPT", | |
"COMPRESS_HISTORY_PROMPT", | |
"LOG_PROMPT", | |
"LOG_RESPONSE", | |
"MODIFY_PROMPT", | |
"PREFIX", | |
"SEARCH_QUERY", | |
"READ_PROMPT", | |
"TASK_PROMPT", | |
"UNDERSTAND_TEST_RESULTS_PROMPT", | |
] | |
selected_command = st.sidebar.selectbox("Command", command_options) | |
# --- Model Dropdown --- | |
selected_model = st.sidebar.selectbox( | |
"Model", | |
agent.available_models, | |
index=agent.available_models.index(agent.selected_model), | |
) | |
agent.selected_model = selected_model | |
# --- Input Field --- | |
input_str = st.text_input(f"Enter input for {selected_command}:") | |
# --- Execute Command --- | |
if st.button("Execute"): | |
if input_str: | |
agent.handle_input(f"{selected_command} {input_str}") | |
st.write(f"Output: {agent.task_history[-1]['output']}") | |
# --- Task History --- | |
st.subheader("Task History") | |
for task in agent.task_history: | |
st.write(f"**Action:** {task['action']}") | |
st.write(f"**Input:** {task['input']}") | |
st.write(f"**Output:** {task['output']}") | |
# --- Workspace Explorer --- | |
st.subheader("Workspace Explorer") | |
with st.expander("Explore Workspace"): | |
try: | |
workspace_output = agent.workspace_explorer() | |
st.write(workspace_output) | |
except WorkspaceExplorerError as e: | |
st.error(f"Error exploring workspace: {e}") | |
# --- Chat App Tab --- | |
with tabs[1]: | |
st.title("Chat App") | |
# --- Chat History --- | |
chat_history = st.empty() | |
chat_history.text("Chat History:") | |
# --- Input Field --- | |
user_input = st.text_input("Enter your message:") | |
# --- Send Message --- | |
if st.button("Send"): | |
if user_input: | |
# --- Display User Message --- | |
chat_history.text(f"You: {user_input}") | |
# --- Process User Input --- | |
try: | |
# --- Extract Command and Arguments --- | |
action, *args = user_input.split() | |
if action in agent.tools: | |
if args: | |
output = agent.tools[action](" ".join(args)) | |
else: | |
output = agent.tools[action]() | |
# --- Display Agent Response --- | |
chat_history.text(f"Agent: {output}") | |
else: | |
# --- Treat as regular chat message --- | |
output = agent.code_generation(user_input) | |
# --- Display Agent Response --- | |
chat_history.text(f"Agent: {output}") | |
except Exception as e: | |
# --- Display Error Message --- | |
chat_history.text(f"Agent: Error: {e}") | |
# --- Clear Input Field --- | |
user_input = "" | |
# --- Gradio Integration --- | |
def gradio_interface(input_text): | |
"""Gradio interface function.""" | |
try: | |
agent.handle_input(input_text) | |
output = agent.task_history[-1]["output"] # Get the latest output | |
return output | |
except Exception as e: | |
return f"Error: {e}" | |
iface = gr.Interface( | |
fn=gradio_interface, | |
inputs=gr.Textbox(label="Enter Command"), | |
outputs=gr.Textbox(label="Output"), | |
title="AI Agent", | |
description="Interact with the AI Agent.", | |
) | |
iface.launch() |