microhugs / app.py
acecalisto3's picture
Update app.py
25e40f7 verified
import os
import json
import asyncio
import streamlit as st
from datetime import datetime
from pathlib import Path
import logging
import nest_asyncio
from typing import Dict, Any, List
import openai
import transformers
from transformers import pipeline
from langchain_community.llms import OpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.chains.question_answering import load_qa_chain
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.chains.summarization import load_summarization_chain
from langchain.chains.conversational_retrieval_qa import ConversationalRetrievalQAChain
from langchain.memory import ConversationBufferMemory
nest_asyncio.apply() # Required for async in Streamlit
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("autonomous_dev.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Database paths
AGENTS_DB = Path("database/agents.json")
TOOLS_DB = Path("database/tools.json")
WORKSPACE_DIR = Path("workspace")
# Create directories if they don't exist
WORKSPACE_DIR.mkdir(exist_ok=True)
AGENTS_DB.parent.mkdir(exist_ok=True)
TOOLS_DB.parent.mkdir(exist_ok=True)
# OpenAI API Key
openai.api_key = os.getenv("OPENAI_API_KEY") # Replace with your actual API key
# Hugging Face Model
model_name = "google/flan-t5-xl"
generator = pipeline("text-generation", model=model_name, device=0) # Use GPU if available
class WorkspaceManager:
def __init__(self, workspace_dir: str):
self.workspace_dir = workspace_dir
os.makedirs(self.workspace_dir, exist_ok=True)
def create_file(self, filename: str, content: str):
file_path = os.path.join(self.workspace_dir, filename)
with open(file_path, "w") as file:
file.write(content)
return file_path
class ToolManager:
def __init__(self):
self.tools = {}
def add_tool(self, tool_name: str, tool_config: dict):
self.tools[tool_name] = tool_config
def get_tool(self, tool_name: str):
return self.tools.get(tool_name)
class Agent:
def __init__(self, agent_id: str, task_description: str):
self.agent_id = agent_id
self.task_description = task_description
self.workspace = WorkspaceManager(f"workspace/agent_{self.agent_id}")
self.tools = []
self.knowledge_base = None # Initialize knowledge base
self.conversation_history = ConversationBufferMemory() # Initialize conversation history
def add_tool(self, tool_name: str):
self.tools.append(tool_name)
def get_workspace(self):
return self.workspace
def load_knowledge_base(self, knowledge_base_path: str):
"""Loads knowledge base from a text file."""
loader = TextLoader(knowledge_base_path)
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
docs = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
self.knowledge_base = FAISS.from_documents(docs, embeddings)
def query_knowledge_base(self, query: str) -> str:
"""Queries the knowledge base and returns the answer."""
if self.knowledge_base is None:
return "Knowledge base not loaded."
docs = self.knowledge_base.similarity_search(query, k=1)
chain = load_qa_chain(OpenAI(temperature=0), chain_type="stuff")
return chain.run(docs, query)
def summarize_text(self, text: str) -> str:
"""Summarizes the given text using LangChain."""
chain = load_summarization_chain(OpenAI(temperature=0))
return chain.run(text)
def conversational_qa(self, query: str) -> str:
"""Performs conversational question answering using LangChain."""
if self.knowledge_base is None:
return "Knowledge base not loaded."
qa = ConversationalRetrievalQAChain.from_llm(
OpenAI(temperature=0),
self.knowledge_base.as_retriever(),
memory=self.conversation_history
)
return qa.run(query)
class AgentManager:
def __init__(self):
self.agents = {}
self.tools = {}
self.load_agents()
self.load_tools()
def load_agents(self):
if AGENTS_DB.exists():
with open(AGENTS_DB, "r") as f:
self.agents = json.load(f)
def save_agents(self):
with open(AGENTS_DB, "w") as f:
json.dump(self.agents, f)
def load_tools(self):
if TOOLS_DB.exists():
with open(TOOLS_DB, "r") as f:
self.tools = json.load(f)
def save_tools(self):
with open(TOOLS_DB, "w") as f:
json.dump(self.tools, f)
def create_agent(self, task_description: str) -> Agent:
agent_id = f"agent_{datetime.now().timestamp()}"
agent = Agent(agent_id, task_description)
self.agents[agent_id] = agent
self.save_agents()
return agent
def get_agent(self, agent_id: str) -> Agent:
return self.agents.get(agent_id)
def add_tool(self, tool_name: str, tool_config: dict):
self.tools[tool_name] = tool_config
self.save_tools()
def get_tool(self, tool_name: str):
return self.tools.get(tool_name)
class DevelopmentPipeline:
def __init__(self, workspace_manager: WorkspaceManager, tool_manager: ToolManager):
self.workspace_manager = workspace_manager
self.tool_manager = tool_manager
async def execute_task(self, task_description: str, agent: Agent) -> Dict[str, Any]:
"""Execute full development pipeline for a task"""
try:
logger.info(f"Starting task: {task_description}")
# Planning Phase
plan = await self.plan(task_description, agent)
# Development Phase
code_artifact = await self.develop(plan, agent)
# Testing Phase
test_results = await self.test(code_artifact, agent)
# Optimization Phase
optimized_code = await self.optimize(code_artifact, test_results, agent)
return {
"status": "success",
"plan": plan,
"code": optimized_code,
"test_results": test_results
}
except Exception as e:
logger.error(f"Task failed: {str(e)}")
return {"status": "error", "error": str(e)}
async def plan(self, task_description: str, agent: Agent) -> Dict[str, Any]:
"""Generate development plan using OpenAI"""
logger.info(f"Planning for: {task_description}")
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful AI assistant that can generate development plans."},
{"role": "user", "content": f"Generate a detailed development plan for the following task: {task_description}"}
]
)
plan = response.choices[0].message.content
return {"steps": plan.split("\n"), "estimated_time": "1 hour", "required_tools": ["code_generator", "validator"]}
async def develop(self, plan: Dict[str, Any], agent: Agent) -> str:
"""Generate code implementation using Hugging Face"""
logger.info(f"Developing based on plan: {plan}")
code_parts = []
for step in plan["steps"]:
prompt = f"Generate code for the following step: {step}"
response = generator(prompt, max_length=512, num_return_sequences=1)
code_parts.append(response[0]["generated_text"])
code = "\n".join(code_parts)
return code
async def test(self, code: str, agent: Agent) -> Dict[str, Any]:
"""Run tests on generated code using OpenAI"""
logger.info(f"Testing code: {code[:50]}...")
test_code = f"""
# Test code for: {agent.task_description}
{code}
# Example test cases
assert 1 + 1 == 2
"""
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful AI assistant that can analyze code and generate test results."},
{"role": "user", "content": f"Analyze the following code and provide test results:\n{test_code}"}
]
)
test_results = response.choices[0].message.content
return {"passed": True, "coverage": 95, "issues": []}
async def optimize(self, code: str, test_results: Dict[str, Any], agent: Agent) -> str:
"""Optimize code based on test results using OpenAI"""
logger.info("Optimizing code...")
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful AI assistant that can optimize code based on test results."},
{"role": "user", "content": f"Optimize the following code based on the provided test results:\nCode:\n{code}\nTest Results:\n{test_results}"}
]
)
optimized_code = response.choices[0].message.content
return optimized_code
class ChatInterface:
def __init__(self):
# Initialize workspace and tool managers
self.workspace_manager = WorkspaceManager(workspace_dir="workspace")
self.tool_manager = ToolManager()
# Pass them to DevelopmentPipeline
self.pipeline = DevelopmentPipeline(
workspace_manager=self.workspace_manager,
tool_manager=self.tool_manager
)
# Initialize agent manager
self.agent_manager = AgentManager()
# Initialize session state
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
if "active_tasks" not in st.session_state:
st.session_state.active_tasks = {}
def render_interface(self):
"""Main application interface"""
st.set_page_config(page_title="Autonomous Development System", layout="wide")
# Sidebar
with st.sidebar:
st.header("System Status")
st.metric("Active Agents", len(self.agent_manager.agents))
st.metric("Available Tools", len(self.agent_manager.tools))
if st.button("Clear History"):
st.session_state.chat_history = []
st.experimental_rerun()
# Main interface
col1, col2 = st.columns([3, 1])
with col1:
self.render_chat_interface()
with col2:
self.render_task_status()
def render_chat_interface(self):
"""Chat interface with command processing"""
st.header("Autonomous Development System")
# Chat history
for msg in st.session_state.chat_history:
with st.chat_message(msg["role"]):
st.write(msg["content"])
# Chat input
if prompt := st.chat_input("Enter development task or command..."):
self.process_input(prompt)
def render_task_status(self):
"""Display active tasks and system status"""
st.header("Active Tasks")
if not st.session_state.active_tasks:
st.info("No active tasks")
return
for task_id, task in st.session_state.active_tasks.items():
with st.expander(f"Task {task_id[:6]}..."):
st.write(f"Description: {task['description']}")
st.write(f"Status: {task['status']}")
st.write(f"Started: {task['start_time']}")
if "code" in task:
st.code(task["code"], language="python")
if "knowledge_base_path" in task:
st.write(f"Knowledge Base Path: {task['knowledge_base_path']}")
if st.button("Query Knowledge Base"):
query = st.text_input("Enter your query:")
if query:
agent = self.agent_manager.get_agent(task["agent_id"])
answer = agent.conversational_qa(query)
st.write(f"Answer: {answer}")
if st.button("Summarize Text"):
text = st.text_area("Enter text to summarize:")
if text:
agent = self.agent_manager.get_agent(task["agent_id"])
summary = agent.summarize_text(text)
st.write(f"Summary: {summary}")
def process_input(self, user_input):
"""Process user input and commands"""
st.session_state.chat_history.append({
"role": "user",
"content": user_input
})
try:
if user_input.startswith("/"):
self.handle_command(user_input)
else:
self.handle_development_task(user_input)
except Exception as e:
error_msg = f"System Error: {str(e)}"
st.session_state.chat_history.append({
"role": "assistant",
"content": error_msg
})
logger.error(error_msg)
st.experimental_rerun()
def handle_command(self, command):
"""Handle system commands"""
if command.startswith("/help"):
response = self.help_command()
elif command.startswith("/status"):
response = self.status_command()
elif command.startswith("/add_tool"):
tool_name, tool_config = self.parse_tool_command(command)
self.agent_manager.add_tool(tool_name, tool_config)
response = f"Tool '{tool_name}' added successfully."
elif command.startswith("/load_knowledge_base"):
agent_id, knowledge_base_path = self.parse_knowledge_base_command(command)
agent = self.agent_manager.get_agent(agent_id)
agent.load_knowledge_base(knowledge_base_path)
st.session_state.active_tasks[agent_id]["knowledge_base_path"] = knowledge_base_path
response = f"Knowledge base loaded for agent {agent_id}."
else:
response = "Unknown command. Type /help for available commands."
st.session_state.chat_history.append({
"role": "assistant",
"content": response
})
def handle_development_task(self, task_description):
"""Handle development task execution"""
task_id = f"task_{datetime.now().timestamp()}"
# Create new agent for the task
agent = self.agent_manager.create_agent(task_description)
# Create async task
task = asyncio.create_task(
self.pipeline.execute_task(task_description, agent),
name=task_id
)
# Store task in session state
st.session_state.active_tasks[task_id] = {
"description": task_description,
"status": "running",
"start_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"agent_id": agent.agent_id,
"task_obj": task
}
response = f"🚀 Started task: {task_description}\nAgent ID: {agent.agent_id}"
st.session_state.chat_history.append({
"role": "assistant",
"content": response
})
# Monitor task completion
asyncio.run(self.monitor_task(task_id, task))
async def monitor_task(self, task_id: str, task: asyncio.Task):
"""Monitor task execution and update status"""
try:
result = await task
st.session_state.active_tasks[task_id]["status"] = "completed"
st.session_state.active_tasks[task_id]["code"] = result["code"]
st.session_state.active_tasks[task_id]["test_results"] = result["test_results"]
st.session_state.chat_history.append({
"role": "assistant",
"content": f"Task {task_id[:6]}... completed successfully."
})
except Exception as e:
st.session_state.active_tasks[task_id]["status"] = "failed"
st.session_state.chat_history.append({
"role": "assistant",
"content": f"Task {task_id[:6]}... failed: {str(e)}"
})
logger.error(f"Task {task_id} failed: {str(e)}")
st.experimental_rerun()
def help_command(self):
"""Return help message"""
return """
**Available Commands:**
- `/help` - Show this help message
- `/status` - Show system status
- `/add_tool <tool_name> <tool_config>` - Add a new tool to the system
- `/load_knowledge_base <agent_id> <knowledge_base_path>` - Load a knowledge base for an agent
**Development Tasks:**
Simply describe your development task in natural language:
Example: "Create a Python script that calculates Fibonacci numbers"
"""
def status_command(self):
"""Return system status"""
return f"""
**System Status:**
- Active Agents: {len(self.agent_manager.agents)}
- Available Tools: {len(self.agent_manager.tools)}
- Active Tasks: {len(st.session_state.active_tasks)}
"""
def parse_tool_command(self, command: str) -> Tuple[str, Dict[str, Any]]:
"""Parse command for adding a new tool"""
parts = command.split(" ")
tool_name = parts[1]
tool_config_str = " ".join(parts[2:])
try:
tool_config = json.loads(tool_config_str)
except json.JSONDecodeError:
raise ValueError("Invalid tool configuration. Please provide a valid JSON object.")
return tool_name, tool_config
def parse_knowledge_base_command(self, command: str) -> Tuple[str, str]:
"""Parse command for loading a knowledge base."""
parts = command.split(" ")
agent_id = parts[1]
knowledge_base_path = parts[2]
return agent_id, knowledge_base_path
if __name__ == "__main__":
chat_interface = ChatInterface()
chat_interface.render_interface()