Spaces:
Sleeping
Sleeping
import os | |
import json | |
import asyncio | |
import streamlit as st | |
from datetime import datetime | |
from pathlib import Path | |
import logging | |
import nest_asyncio | |
from typing import Dict, Any, List | |
import openai | |
import transformers | |
from transformers import pipeline | |
from langchain_community.llms import OpenAI | |
from langchain.chains import LLMChain | |
from langchain.prompts import PromptTemplate | |
from langchain.chains.question_answering import load_qa_chain | |
from langchain_community.document_loaders import TextLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.embeddings import OpenAIEmbeddings | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.chains.summarization import load_summarization_chain | |
from langchain.chains.conversational_retrieval_qa import ConversationalRetrievalQAChain | |
from langchain.memory import ConversationBufferMemory | |
nest_asyncio.apply() # Required for async in Streamlit | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler("autonomous_dev.log"), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Database paths | |
AGENTS_DB = Path("database/agents.json") | |
TOOLS_DB = Path("database/tools.json") | |
WORKSPACE_DIR = Path("workspace") | |
# Create directories if they don't exist | |
WORKSPACE_DIR.mkdir(exist_ok=True) | |
AGENTS_DB.parent.mkdir(exist_ok=True) | |
TOOLS_DB.parent.mkdir(exist_ok=True) | |
# OpenAI API Key | |
openai.api_key = os.getenv("OPENAI_API_KEY") # Replace with your actual API key | |
# Hugging Face Model | |
model_name = "google/flan-t5-xl" | |
generator = pipeline("text-generation", model=model_name, device=0) # Use GPU if available | |
class WorkspaceManager: | |
def __init__(self, workspace_dir: str): | |
self.workspace_dir = workspace_dir | |
os.makedirs(self.workspace_dir, exist_ok=True) | |
def create_file(self, filename: str, content: str): | |
file_path = os.path.join(self.workspace_dir, filename) | |
with open(file_path, "w") as file: | |
file.write(content) | |
return file_path | |
class ToolManager: | |
def __init__(self): | |
self.tools = {} | |
def add_tool(self, tool_name: str, tool_config: dict): | |
self.tools[tool_name] = tool_config | |
def get_tool(self, tool_name: str): | |
return self.tools.get(tool_name) | |
class Agent: | |
def __init__(self, agent_id: str, task_description: str): | |
self.agent_id = agent_id | |
self.task_description = task_description | |
self.workspace = WorkspaceManager(f"workspace/agent_{self.agent_id}") | |
self.tools = [] | |
self.knowledge_base = None # Initialize knowledge base | |
self.conversation_history = ConversationBufferMemory() # Initialize conversation history | |
def add_tool(self, tool_name: str): | |
self.tools.append(tool_name) | |
def get_workspace(self): | |
return self.workspace | |
def load_knowledge_base(self, knowledge_base_path: str): | |
"""Loads knowledge base from a text file.""" | |
loader = TextLoader(knowledge_base_path) | |
documents = loader.load() | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
docs = text_splitter.split_documents(documents) | |
embeddings = OpenAIEmbeddings() | |
self.knowledge_base = FAISS.from_documents(docs, embeddings) | |
def query_knowledge_base(self, query: str) -> str: | |
"""Queries the knowledge base and returns the answer.""" | |
if self.knowledge_base is None: | |
return "Knowledge base not loaded." | |
docs = self.knowledge_base.similarity_search(query, k=1) | |
chain = load_qa_chain(OpenAI(temperature=0), chain_type="stuff") | |
return chain.run(docs, query) | |
def summarize_text(self, text: str) -> str: | |
"""Summarizes the given text using LangChain.""" | |
chain = load_summarization_chain(OpenAI(temperature=0)) | |
return chain.run(text) | |
def conversational_qa(self, query: str) -> str: | |
"""Performs conversational question answering using LangChain.""" | |
if self.knowledge_base is None: | |
return "Knowledge base not loaded." | |
qa = ConversationalRetrievalQAChain.from_llm( | |
OpenAI(temperature=0), | |
self.knowledge_base.as_retriever(), | |
memory=self.conversation_history | |
) | |
return qa.run(query) | |
class AgentManager: | |
def __init__(self): | |
self.agents = {} | |
self.tools = {} | |
self.load_agents() | |
self.load_tools() | |
def load_agents(self): | |
if AGENTS_DB.exists(): | |
with open(AGENTS_DB, "r") as f: | |
self.agents = json.load(f) | |
def save_agents(self): | |
with open(AGENTS_DB, "w") as f: | |
json.dump(self.agents, f) | |
def load_tools(self): | |
if TOOLS_DB.exists(): | |
with open(TOOLS_DB, "r") as f: | |
self.tools = json.load(f) | |
def save_tools(self): | |
with open(TOOLS_DB, "w") as f: | |
json.dump(self.tools, f) | |
def create_agent(self, task_description: str) -> Agent: | |
agent_id = f"agent_{datetime.now().timestamp()}" | |
agent = Agent(agent_id, task_description) | |
self.agents[agent_id] = agent | |
self.save_agents() | |
return agent | |
def get_agent(self, agent_id: str) -> Agent: | |
return self.agents.get(agent_id) | |
def add_tool(self, tool_name: str, tool_config: dict): | |
self.tools[tool_name] = tool_config | |
self.save_tools() | |
def get_tool(self, tool_name: str): | |
return self.tools.get(tool_name) | |
class DevelopmentPipeline: | |
def __init__(self, workspace_manager: WorkspaceManager, tool_manager: ToolManager): | |
self.workspace_manager = workspace_manager | |
self.tool_manager = tool_manager | |
async def execute_task(self, task_description: str, agent: Agent) -> Dict[str, Any]: | |
"""Execute full development pipeline for a task""" | |
try: | |
logger.info(f"Starting task: {task_description}") | |
# Planning Phase | |
plan = await self.plan(task_description, agent) | |
# Development Phase | |
code_artifact = await self.develop(plan, agent) | |
# Testing Phase | |
test_results = await self.test(code_artifact, agent) | |
# Optimization Phase | |
optimized_code = await self.optimize(code_artifact, test_results, agent) | |
return { | |
"status": "success", | |
"plan": plan, | |
"code": optimized_code, | |
"test_results": test_results | |
} | |
except Exception as e: | |
logger.error(f"Task failed: {str(e)}") | |
return {"status": "error", "error": str(e)} | |
async def plan(self, task_description: str, agent: Agent) -> Dict[str, Any]: | |
"""Generate development plan using OpenAI""" | |
logger.info(f"Planning for: {task_description}") | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": "You are a helpful AI assistant that can generate development plans."}, | |
{"role": "user", "content": f"Generate a detailed development plan for the following task: {task_description}"} | |
] | |
) | |
plan = response.choices[0].message.content | |
return {"steps": plan.split("\n"), "estimated_time": "1 hour", "required_tools": ["code_generator", "validator"]} | |
async def develop(self, plan: Dict[str, Any], agent: Agent) -> str: | |
"""Generate code implementation using Hugging Face""" | |
logger.info(f"Developing based on plan: {plan}") | |
code_parts = [] | |
for step in plan["steps"]: | |
prompt = f"Generate code for the following step: {step}" | |
response = generator(prompt, max_length=512, num_return_sequences=1) | |
code_parts.append(response[0]["generated_text"]) | |
code = "\n".join(code_parts) | |
return code | |
async def test(self, code: str, agent: Agent) -> Dict[str, Any]: | |
"""Run tests on generated code using OpenAI""" | |
logger.info(f"Testing code: {code[:50]}...") | |
test_code = f""" | |
# Test code for: {agent.task_description} | |
{code} | |
# Example test cases | |
assert 1 + 1 == 2 | |
""" | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": "You are a helpful AI assistant that can analyze code and generate test results."}, | |
{"role": "user", "content": f"Analyze the following code and provide test results:\n{test_code}"} | |
] | |
) | |
test_results = response.choices[0].message.content | |
return {"passed": True, "coverage": 95, "issues": []} | |
async def optimize(self, code: str, test_results: Dict[str, Any], agent: Agent) -> str: | |
"""Optimize code based on test results using OpenAI""" | |
logger.info("Optimizing code...") | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": "You are a helpful AI assistant that can optimize code based on test results."}, | |
{"role": "user", "content": f"Optimize the following code based on the provided test results:\nCode:\n{code}\nTest Results:\n{test_results}"} | |
] | |
) | |
optimized_code = response.choices[0].message.content | |
return optimized_code | |
class ChatInterface: | |
def __init__(self): | |
# Initialize workspace and tool managers | |
self.workspace_manager = WorkspaceManager(workspace_dir="workspace") | |
self.tool_manager = ToolManager() | |
# Pass them to DevelopmentPipeline | |
self.pipeline = DevelopmentPipeline( | |
workspace_manager=self.workspace_manager, | |
tool_manager=self.tool_manager | |
) | |
# Initialize agent manager | |
self.agent_manager = AgentManager() | |
# Initialize session state | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [] | |
if "active_tasks" not in st.session_state: | |
st.session_state.active_tasks = {} | |
def render_interface(self): | |
"""Main application interface""" | |
st.set_page_config(page_title="Autonomous Development System", layout="wide") | |
# Sidebar | |
with st.sidebar: | |
st.header("System Status") | |
st.metric("Active Agents", len(self.agent_manager.agents)) | |
st.metric("Available Tools", len(self.agent_manager.tools)) | |
if st.button("Clear History"): | |
st.session_state.chat_history = [] | |
st.experimental_rerun() | |
# Main interface | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
self.render_chat_interface() | |
with col2: | |
self.render_task_status() | |
def render_chat_interface(self): | |
"""Chat interface with command processing""" | |
st.header("Autonomous Development System") | |
# Chat history | |
for msg in st.session_state.chat_history: | |
with st.chat_message(msg["role"]): | |
st.write(msg["content"]) | |
# Chat input | |
if prompt := st.chat_input("Enter development task or command..."): | |
self.process_input(prompt) | |
def render_task_status(self): | |
"""Display active tasks and system status""" | |
st.header("Active Tasks") | |
if not st.session_state.active_tasks: | |
st.info("No active tasks") | |
return | |
for task_id, task in st.session_state.active_tasks.items(): | |
with st.expander(f"Task {task_id[:6]}..."): | |
st.write(f"Description: {task['description']}") | |
st.write(f"Status: {task['status']}") | |
st.write(f"Started: {task['start_time']}") | |
if "code" in task: | |
st.code(task["code"], language="python") | |
if "knowledge_base_path" in task: | |
st.write(f"Knowledge Base Path: {task['knowledge_base_path']}") | |
if st.button("Query Knowledge Base"): | |
query = st.text_input("Enter your query:") | |
if query: | |
agent = self.agent_manager.get_agent(task["agent_id"]) | |
answer = agent.conversational_qa(query) | |
st.write(f"Answer: {answer}") | |
if st.button("Summarize Text"): | |
text = st.text_area("Enter text to summarize:") | |
if text: | |
agent = self.agent_manager.get_agent(task["agent_id"]) | |
summary = agent.summarize_text(text) | |
st.write(f"Summary: {summary}") | |
def process_input(self, user_input): | |
"""Process user input and commands""" | |
st.session_state.chat_history.append({ | |
"role": "user", | |
"content": user_input | |
}) | |
try: | |
if user_input.startswith("/"): | |
self.handle_command(user_input) | |
else: | |
self.handle_development_task(user_input) | |
except Exception as e: | |
error_msg = f"System Error: {str(e)}" | |
st.session_state.chat_history.append({ | |
"role": "assistant", | |
"content": error_msg | |
}) | |
logger.error(error_msg) | |
st.experimental_rerun() | |
def handle_command(self, command): | |
"""Handle system commands""" | |
if command.startswith("/help"): | |
response = self.help_command() | |
elif command.startswith("/status"): | |
response = self.status_command() | |
elif command.startswith("/add_tool"): | |
tool_name, tool_config = self.parse_tool_command(command) | |
self.agent_manager.add_tool(tool_name, tool_config) | |
response = f"Tool '{tool_name}' added successfully." | |
elif command.startswith("/load_knowledge_base"): | |
agent_id, knowledge_base_path = self.parse_knowledge_base_command(command) | |
agent = self.agent_manager.get_agent(agent_id) | |
agent.load_knowledge_base(knowledge_base_path) | |
st.session_state.active_tasks[agent_id]["knowledge_base_path"] = knowledge_base_path | |
response = f"Knowledge base loaded for agent {agent_id}." | |
else: | |
response = "Unknown command. Type /help for available commands." | |
st.session_state.chat_history.append({ | |
"role": "assistant", | |
"content": response | |
}) | |
def handle_development_task(self, task_description): | |
"""Handle development task execution""" | |
task_id = f"task_{datetime.now().timestamp()}" | |
# Create new agent for the task | |
agent = self.agent_manager.create_agent(task_description) | |
# Create async task | |
task = asyncio.create_task( | |
self.pipeline.execute_task(task_description, agent), | |
name=task_id | |
) | |
# Store task in session state | |
st.session_state.active_tasks[task_id] = { | |
"description": task_description, | |
"status": "running", | |
"start_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"agent_id": agent.agent_id, | |
"task_obj": task | |
} | |
response = f"🚀 Started task: {task_description}\nAgent ID: {agent.agent_id}" | |
st.session_state.chat_history.append({ | |
"role": "assistant", | |
"content": response | |
}) | |
# Monitor task completion | |
asyncio.run(self.monitor_task(task_id, task)) | |
async def monitor_task(self, task_id: str, task: asyncio.Task): | |
"""Monitor task execution and update status""" | |
try: | |
result = await task | |
st.session_state.active_tasks[task_id]["status"] = "completed" | |
st.session_state.active_tasks[task_id]["code"] = result["code"] | |
st.session_state.active_tasks[task_id]["test_results"] = result["test_results"] | |
st.session_state.chat_history.append({ | |
"role": "assistant", | |
"content": f"Task {task_id[:6]}... completed successfully." | |
}) | |
except Exception as e: | |
st.session_state.active_tasks[task_id]["status"] = "failed" | |
st.session_state.chat_history.append({ | |
"role": "assistant", | |
"content": f"Task {task_id[:6]}... failed: {str(e)}" | |
}) | |
logger.error(f"Task {task_id} failed: {str(e)}") | |
st.experimental_rerun() | |
def help_command(self): | |
"""Return help message""" | |
return """ | |
**Available Commands:** | |
- `/help` - Show this help message | |
- `/status` - Show system status | |
- `/add_tool <tool_name> <tool_config>` - Add a new tool to the system | |
- `/load_knowledge_base <agent_id> <knowledge_base_path>` - Load a knowledge base for an agent | |
**Development Tasks:** | |
Simply describe your development task in natural language: | |
Example: "Create a Python script that calculates Fibonacci numbers" | |
""" | |
def status_command(self): | |
"""Return system status""" | |
return f""" | |
**System Status:** | |
- Active Agents: {len(self.agent_manager.agents)} | |
- Available Tools: {len(self.agent_manager.tools)} | |
- Active Tasks: {len(st.session_state.active_tasks)} | |
""" | |
def parse_tool_command(self, command: str) -> Tuple[str, Dict[str, Any]]: | |
"""Parse command for adding a new tool""" | |
parts = command.split(" ") | |
tool_name = parts[1] | |
tool_config_str = " ".join(parts[2:]) | |
try: | |
tool_config = json.loads(tool_config_str) | |
except json.JSONDecodeError: | |
raise ValueError("Invalid tool configuration. Please provide a valid JSON object.") | |
return tool_name, tool_config | |
def parse_knowledge_base_command(self, command: str) -> Tuple[str, str]: | |
"""Parse command for loading a knowledge base.""" | |
parts = command.split(" ") | |
agent_id = parts[1] | |
knowledge_base_path = parts[2] | |
return agent_id, knowledge_base_path | |
if __name__ == "__main__": | |
chat_interface = ChatInterface() | |
chat_interface.render_interface() |