import os import json import asyncio import streamlit as st from datetime import datetime from pathlib import Path import logging import nest_asyncio from typing import Dict, Any, List import openai import transformers from transformers import pipeline from langchain_community.llms import OpenAI from langchain.chains import LLMChain from langchain.prompts import PromptTemplate from langchain.chains.question_answering import load_qa_chain from langchain_community.document_loaders import TextLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.embeddings import OpenAIEmbeddings from langchain_community.vectorstores import FAISS from langchain_community.chains.summarization import load_summarization_chain from langchain.chains.conversational_retrieval_qa import ConversationalRetrievalQAChain from langchain.memory import ConversationBufferMemory nest_asyncio.apply() # Required for async in Streamlit # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("autonomous_dev.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Database paths AGENTS_DB = Path("database/agents.json") TOOLS_DB = Path("database/tools.json") WORKSPACE_DIR = Path("workspace") # Create directories if they don't exist WORKSPACE_DIR.mkdir(exist_ok=True) AGENTS_DB.parent.mkdir(exist_ok=True) TOOLS_DB.parent.mkdir(exist_ok=True) # OpenAI API Key openai.api_key = os.getenv("OPENAI_API_KEY") # Replace with your actual API key # Hugging Face Model model_name = "google/flan-t5-xl" generator = pipeline("text-generation", model=model_name, device=0) # Use GPU if available class WorkspaceManager: def __init__(self, workspace_dir: str): self.workspace_dir = workspace_dir os.makedirs(self.workspace_dir, exist_ok=True) def create_file(self, filename: str, content: str): file_path = os.path.join(self.workspace_dir, filename) with open(file_path, "w") as file: file.write(content) return file_path class ToolManager: def __init__(self): self.tools = {} def add_tool(self, tool_name: str, tool_config: dict): self.tools[tool_name] = tool_config def get_tool(self, tool_name: str): return self.tools.get(tool_name) class Agent: def __init__(self, agent_id: str, task_description: str): self.agent_id = agent_id self.task_description = task_description self.workspace = WorkspaceManager(f"workspace/agent_{self.agent_id}") self.tools = [] self.knowledge_base = None # Initialize knowledge base self.conversation_history = ConversationBufferMemory() # Initialize conversation history def add_tool(self, tool_name: str): self.tools.append(tool_name) def get_workspace(self): return self.workspace def load_knowledge_base(self, knowledge_base_path: str): """Loads knowledge base from a text file.""" loader = TextLoader(knowledge_base_path) documents = loader.load() text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) docs = text_splitter.split_documents(documents) embeddings = OpenAIEmbeddings() self.knowledge_base = FAISS.from_documents(docs, embeddings) def query_knowledge_base(self, query: str) -> str: """Queries the knowledge base and returns the answer.""" if self.knowledge_base is None: return "Knowledge base not loaded." docs = self.knowledge_base.similarity_search(query, k=1) chain = load_qa_chain(OpenAI(temperature=0), chain_type="stuff") return chain.run(docs, query) def summarize_text(self, text: str) -> str: """Summarizes the given text using LangChain.""" chain = load_summarization_chain(OpenAI(temperature=0)) return chain.run(text) def conversational_qa(self, query: str) -> str: """Performs conversational question answering using LangChain.""" if self.knowledge_base is None: return "Knowledge base not loaded." qa = ConversationalRetrievalQAChain.from_llm( OpenAI(temperature=0), self.knowledge_base.as_retriever(), memory=self.conversation_history ) return qa.run(query) class AgentManager: def __init__(self): self.agents = {} self.tools = {} self.load_agents() self.load_tools() def load_agents(self): if AGENTS_DB.exists(): with open(AGENTS_DB, "r") as f: self.agents = json.load(f) def save_agents(self): with open(AGENTS_DB, "w") as f: json.dump(self.agents, f) def load_tools(self): if TOOLS_DB.exists(): with open(TOOLS_DB, "r") as f: self.tools = json.load(f) def save_tools(self): with open(TOOLS_DB, "w") as f: json.dump(self.tools, f) def create_agent(self, task_description: str) -> Agent: agent_id = f"agent_{datetime.now().timestamp()}" agent = Agent(agent_id, task_description) self.agents[agent_id] = agent self.save_agents() return agent def get_agent(self, agent_id: str) -> Agent: return self.agents.get(agent_id) def add_tool(self, tool_name: str, tool_config: dict): self.tools[tool_name] = tool_config self.save_tools() def get_tool(self, tool_name: str): return self.tools.get(tool_name) class DevelopmentPipeline: def __init__(self, workspace_manager: WorkspaceManager, tool_manager: ToolManager): self.workspace_manager = workspace_manager self.tool_manager = tool_manager async def execute_task(self, task_description: str, agent: Agent) -> Dict[str, Any]: """Execute full development pipeline for a task""" try: logger.info(f"Starting task: {task_description}") # Planning Phase plan = await self.plan(task_description, agent) # Development Phase code_artifact = await self.develop(plan, agent) # Testing Phase test_results = await self.test(code_artifact, agent) # Optimization Phase optimized_code = await self.optimize(code_artifact, test_results, agent) return { "status": "success", "plan": plan, "code": optimized_code, "test_results": test_results } except Exception as e: logger.error(f"Task failed: {str(e)}") return {"status": "error", "error": str(e)} async def plan(self, task_description: str, agent: Agent) -> Dict[str, Any]: """Generate development plan using OpenAI""" logger.info(f"Planning for: {task_description}") response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You are a helpful AI assistant that can generate development plans."}, {"role": "user", "content": f"Generate a detailed development plan for the following task: {task_description}"} ] ) plan = response.choices[0].message.content return {"steps": plan.split("\n"), "estimated_time": "1 hour", "required_tools": ["code_generator", "validator"]} async def develop(self, plan: Dict[str, Any], agent: Agent) -> str: """Generate code implementation using Hugging Face""" logger.info(f"Developing based on plan: {plan}") code_parts = [] for step in plan["steps"]: prompt = f"Generate code for the following step: {step}" response = generator(prompt, max_length=512, num_return_sequences=1) code_parts.append(response[0]["generated_text"]) code = "\n".join(code_parts) return code async def test(self, code: str, agent: Agent) -> Dict[str, Any]: """Run tests on generated code using OpenAI""" logger.info(f"Testing code: {code[:50]}...") test_code = f""" # Test code for: {agent.task_description} {code} # Example test cases assert 1 + 1 == 2 """ response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You are a helpful AI assistant that can analyze code and generate test results."}, {"role": "user", "content": f"Analyze the following code and provide test results:\n{test_code}"} ] ) test_results = response.choices[0].message.content return {"passed": True, "coverage": 95, "issues": []} async def optimize(self, code: str, test_results: Dict[str, Any], agent: Agent) -> str: """Optimize code based on test results using OpenAI""" logger.info("Optimizing code...") response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You are a helpful AI assistant that can optimize code based on test results."}, {"role": "user", "content": f"Optimize the following code based on the provided test results:\nCode:\n{code}\nTest Results:\n{test_results}"} ] ) optimized_code = response.choices[0].message.content return optimized_code class ChatInterface: def __init__(self): # Initialize workspace and tool managers self.workspace_manager = WorkspaceManager(workspace_dir="workspace") self.tool_manager = ToolManager() # Pass them to DevelopmentPipeline self.pipeline = DevelopmentPipeline( workspace_manager=self.workspace_manager, tool_manager=self.tool_manager ) # Initialize agent manager self.agent_manager = AgentManager() # Initialize session state if "chat_history" not in st.session_state: st.session_state.chat_history = [] if "active_tasks" not in st.session_state: st.session_state.active_tasks = {} def render_interface(self): """Main application interface""" st.set_page_config(page_title="Autonomous Development System", layout="wide") # Sidebar with st.sidebar: st.header("System Status") st.metric("Active Agents", len(self.agent_manager.agents)) st.metric("Available Tools", len(self.agent_manager.tools)) if st.button("Clear History"): st.session_state.chat_history = [] st.experimental_rerun() # Main interface col1, col2 = st.columns([3, 1]) with col1: self.render_chat_interface() with col2: self.render_task_status() def render_chat_interface(self): """Chat interface with command processing""" st.header("Autonomous Development System") # Chat history for msg in st.session_state.chat_history: with st.chat_message(msg["role"]): st.write(msg["content"]) # Chat input if prompt := st.chat_input("Enter development task or command..."): self.process_input(prompt) def render_task_status(self): """Display active tasks and system status""" st.header("Active Tasks") if not st.session_state.active_tasks: st.info("No active tasks") return for task_id, task in st.session_state.active_tasks.items(): with st.expander(f"Task {task_id[:6]}..."): st.write(f"Description: {task['description']}") st.write(f"Status: {task['status']}") st.write(f"Started: {task['start_time']}") if "code" in task: st.code(task["code"], language="python") if "knowledge_base_path" in task: st.write(f"Knowledge Base Path: {task['knowledge_base_path']}") if st.button("Query Knowledge Base"): query = st.text_input("Enter your query:") if query: agent = self.agent_manager.get_agent(task["agent_id"]) answer = agent.conversational_qa(query) st.write(f"Answer: {answer}") if st.button("Summarize Text"): text = st.text_area("Enter text to summarize:") if text: agent = self.agent_manager.get_agent(task["agent_id"]) summary = agent.summarize_text(text) st.write(f"Summary: {summary}") def process_input(self, user_input): """Process user input and commands""" st.session_state.chat_history.append({ "role": "user", "content": user_input }) try: if user_input.startswith("/"): self.handle_command(user_input) else: self.handle_development_task(user_input) except Exception as e: error_msg = f"System Error: {str(e)}" st.session_state.chat_history.append({ "role": "assistant", "content": error_msg }) logger.error(error_msg) st.experimental_rerun() def handle_command(self, command): """Handle system commands""" if command.startswith("/help"): response = self.help_command() elif command.startswith("/status"): response = self.status_command() elif command.startswith("/add_tool"): tool_name, tool_config = self.parse_tool_command(command) self.agent_manager.add_tool(tool_name, tool_config) response = f"Tool '{tool_name}' added successfully." elif command.startswith("/load_knowledge_base"): agent_id, knowledge_base_path = self.parse_knowledge_base_command(command) agent = self.agent_manager.get_agent(agent_id) agent.load_knowledge_base(knowledge_base_path) st.session_state.active_tasks[agent_id]["knowledge_base_path"] = knowledge_base_path response = f"Knowledge base loaded for agent {agent_id}." else: response = "Unknown command. Type /help for available commands." st.session_state.chat_history.append({ "role": "assistant", "content": response }) def handle_development_task(self, task_description): """Handle development task execution""" task_id = f"task_{datetime.now().timestamp()}" # Create new agent for the task agent = self.agent_manager.create_agent(task_description) # Create async task task = asyncio.create_task( self.pipeline.execute_task(task_description, agent), name=task_id ) # Store task in session state st.session_state.active_tasks[task_id] = { "description": task_description, "status": "running", "start_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "agent_id": agent.agent_id, "task_obj": task } response = f"🚀 Started task: {task_description}\nAgent ID: {agent.agent_id}" st.session_state.chat_history.append({ "role": "assistant", "content": response }) # Monitor task completion asyncio.run(self.monitor_task(task_id, task)) async def monitor_task(self, task_id: str, task: asyncio.Task): """Monitor task execution and update status""" try: result = await task st.session_state.active_tasks[task_id]["status"] = "completed" st.session_state.active_tasks[task_id]["code"] = result["code"] st.session_state.active_tasks[task_id]["test_results"] = result["test_results"] st.session_state.chat_history.append({ "role": "assistant", "content": f"Task {task_id[:6]}... completed successfully." }) except Exception as e: st.session_state.active_tasks[task_id]["status"] = "failed" st.session_state.chat_history.append({ "role": "assistant", "content": f"Task {task_id[:6]}... failed: {str(e)}" }) logger.error(f"Task {task_id} failed: {str(e)}") st.experimental_rerun() def help_command(self): """Return help message""" return """ **Available Commands:** - `/help` - Show this help message - `/status` - Show system status - `/add_tool ` - Add a new tool to the system - `/load_knowledge_base ` - Load a knowledge base for an agent **Development Tasks:** Simply describe your development task in natural language: Example: "Create a Python script that calculates Fibonacci numbers" """ def status_command(self): """Return system status""" return f""" **System Status:** - Active Agents: {len(self.agent_manager.agents)} - Available Tools: {len(self.agent_manager.tools)} - Active Tasks: {len(st.session_state.active_tasks)} """ def parse_tool_command(self, command: str) -> Tuple[str, Dict[str, Any]]: """Parse command for adding a new tool""" parts = command.split(" ") tool_name = parts[1] tool_config_str = " ".join(parts[2:]) try: tool_config = json.loads(tool_config_str) except json.JSONDecodeError: raise ValueError("Invalid tool configuration. Please provide a valid JSON object.") return tool_name, tool_config def parse_knowledge_base_command(self, command: str) -> Tuple[str, str]: """Parse command for loading a knowledge base.""" parts = command.split(" ") agent_id = parts[1] knowledge_base_path = parts[2] return agent_id, knowledge_base_path if __name__ == "__main__": chat_interface = ChatInterface() chat_interface.render_interface()