| | """ |
| | Developer Productivity Agent |
| | RAG-based system using Pinecone for vector storage and GPT-4o-mini. |
| | |
| | Features: |
| | - Pinecone vector database (2GB free tier) |
| | - Divided LLM Architecture for cost optimization |
| | - Real-time cost tracking and analytics |
| | - OpenAI embeddings (text-embedding-3-small) |
| | """ |
| |
|
| | import os |
| | import json |
| | import time |
| | from pathlib import Path |
| | from typing import List, Dict, Any, Optional |
| | import hashlib |
| | from datetime import datetime |
| |
|
| | |
| | from fastapi import FastAPI, HTTPException |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from pydantic import BaseModel |
| | import uvicorn |
| |
|
| | |
| | from pinecone import Pinecone, ServerlessSpec |
| |
|
| | |
| | from openai import OpenAI |
| |
|
| | |
| | import ast |
| | import re |
| | from dataclasses import dataclass, field |
| |
|
| | |
| | |
| | |
| |
|
| | class Config: |
| | """Application configuration""" |
| | |
| | OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") |
| | |
| | |
| | PINECONE_API_KEY = os.getenv("PINECONE_API_KEY", "") |
| | PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "codebase-index") |
| | PINECONE_CLOUD = "aws" |
| | PINECONE_REGION = "us-east-1" |
| | |
| | |
| | ARCHITECT_MODEL = "gpt-4o-mini" |
| | DEVELOPER_MODEL = "gpt-4o-mini" |
| | EMBEDDING_MODEL = "text-embedding-3-small" |
| | EMBEDDING_DIM = 1536 |
| | |
| | |
| | CHUNK_SIZE = 1500 |
| | CHUNK_OVERLAP = 200 |
| | TOP_K_RESULTS = 10 |
| | |
| | |
| | COST_GPT4O_MINI_INPUT = 0.15 |
| | COST_GPT4O_MINI_OUTPUT = 0.60 |
| | COST_EMBEDDING = 0.02 |
| | COST_GPT4_INPUT = 30.0 |
| | COST_GPT4_OUTPUT = 60.0 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class CostTracker: |
| | """Tracks API costs and calculates savings""" |
| | |
| | def __init__(self): |
| | self.reset() |
| | |
| | def reset(self): |
| | """Reset all counters""" |
| | self.embedding_tokens = 0 |
| | self.architect_input_tokens = 0 |
| | self.architect_output_tokens = 0 |
| | self.developer_input_tokens = 0 |
| | self.developer_output_tokens = 0 |
| | self.api_calls = 0 |
| | self.tickets_processed = 0 |
| | self.questions_answered = 0 |
| | self.start_time = datetime.now() |
| | self.history = [] |
| | |
| | def add_embedding(self, tokens: int): |
| | """Track embedding tokens""" |
| | self.embedding_tokens += tokens |
| | self.api_calls += 1 |
| | |
| | def add_architect_call(self, input_tokens: int, output_tokens: int): |
| | """Track architect LLM call""" |
| | self.architect_input_tokens += input_tokens |
| | self.architect_output_tokens += output_tokens |
| | self.api_calls += 1 |
| | |
| | def add_developer_call(self, input_tokens: int, output_tokens: int): |
| | """Track developer LLM call""" |
| | self.developer_input_tokens += input_tokens |
| | self.developer_output_tokens += output_tokens |
| | self.api_calls += 1 |
| | |
| | def record_ticket(self): |
| | """Record a processed ticket""" |
| | self.tickets_processed += 1 |
| | self._add_to_history("ticket") |
| | |
| | def record_question(self): |
| | """Record an answered question""" |
| | self.questions_answered += 1 |
| | self._add_to_history("question") |
| | |
| | def _add_to_history(self, event_type: str): |
| | """Add event to history""" |
| | self.history.append({ |
| | "timestamp": datetime.now().isoformat(), |
| | "type": event_type, |
| | "cumulative_cost": self.get_actual_cost(), |
| | "cumulative_savings": self.get_savings() |
| | }) |
| | |
| | def get_actual_cost(self) -> float: |
| | """Calculate actual cost with our approach""" |
| | config = Config() |
| | |
| | embedding_cost = (self.embedding_tokens / 1_000_000) * config.COST_EMBEDDING |
| | architect_cost = ( |
| | (self.architect_input_tokens / 1_000_000) * config.COST_GPT4O_MINI_INPUT + |
| | (self.architect_output_tokens / 1_000_000) * config.COST_GPT4O_MINI_OUTPUT |
| | ) |
| | developer_cost = ( |
| | (self.developer_input_tokens / 1_000_000) * config.COST_GPT4O_MINI_INPUT + |
| | (self.developer_output_tokens / 1_000_000) * config.COST_GPT4O_MINI_OUTPUT |
| | ) |
| | |
| | return embedding_cost + architect_cost + developer_cost |
| | |
| | def get_traditional_cost(self) -> float: |
| | """Calculate what it would cost with traditional GPT-4 approach""" |
| | config = Config() |
| | |
| | |
| | total_input = self.architect_input_tokens + self.developer_input_tokens |
| | total_output = self.architect_output_tokens + self.developer_output_tokens |
| | |
| | return ( |
| | (total_input / 1_000_000) * config.COST_GPT4_INPUT + |
| | (total_output / 1_000_000) * config.COST_GPT4_OUTPUT |
| | ) |
| | |
| | def get_savings(self) -> float: |
| | """Calculate cost savings""" |
| | return self.get_traditional_cost() - self.get_actual_cost() |
| | |
| | def get_savings_percentage(self) -> float: |
| | """Calculate savings as percentage""" |
| | traditional = self.get_traditional_cost() |
| | if traditional == 0: |
| | return 0 |
| | return ((traditional - self.get_actual_cost()) / traditional) * 100 |
| | |
| | def get_stats(self) -> Dict[str, Any]: |
| | """Get comprehensive statistics""" |
| | return { |
| | "actual_cost": round(self.get_actual_cost(), 6), |
| | "traditional_cost": round(self.get_traditional_cost(), 6), |
| | "savings": round(self.get_savings(), 6), |
| | "savings_percentage": round(self.get_savings_percentage(), 2), |
| | "total_tokens": { |
| | "embedding": self.embedding_tokens, |
| | "architect_input": self.architect_input_tokens, |
| | "architect_output": self.architect_output_tokens, |
| | "developer_input": self.developer_input_tokens, |
| | "developer_output": self.developer_output_tokens, |
| | "total": (self.embedding_tokens + self.architect_input_tokens + |
| | self.architect_output_tokens + self.developer_input_tokens + |
| | self.developer_output_tokens) |
| | }, |
| | "api_calls": self.api_calls, |
| | "tickets_processed": self.tickets_processed, |
| | "questions_answered": self.questions_answered, |
| | "session_duration_minutes": round((datetime.now() - self.start_time).seconds / 60, 2), |
| | "cost_per_ticket": round(self.get_actual_cost() / max(self.tickets_processed, 1), 6), |
| | "history": self.history[-50:] |
| | } |
| |
|
| |
|
| | |
| | cost_tracker = CostTracker() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class JiraTicket(BaseModel): |
| | ticket_id: str |
| | title: str |
| | description: str |
| | acceptance_criteria: Optional[str] = None |
| | labels: Optional[List[str]] = None |
| |
|
| | class ImplementationPlan(BaseModel): |
| | ticket_summary: str |
| | key_entities: List[str] |
| | relevant_files: List[Dict[str, str]] |
| | implementation_steps: List[str] |
| | prerequisites: List[str] |
| | boilerplate_code: Dict[str, str] |
| | architecture_notes: str |
| | estimated_complexity: str |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class CodebaseIndexer: |
| | """Indexes codebase into Pinecone vector database""" |
| | |
| | def __init__(self, config: Config): |
| | self.config = config |
| | self._openai_client = None |
| | self._pinecone_client = None |
| | self._index = None |
| | |
| | @property |
| | def openai_client(self): |
| | if self._openai_client is None: |
| | if not self.config.OPENAI_API_KEY: |
| | raise ValueError("OpenAI API key required") |
| | self._openai_client = OpenAI(api_key=self.config.OPENAI_API_KEY) |
| | return self._openai_client |
| | |
| | @property |
| | def index(self): |
| | if self._index is None: |
| | if not self.config.PINECONE_API_KEY: |
| | raise ValueError("Pinecone API key required") |
| | |
| | try: |
| | |
| | pc = Pinecone(api_key=self.config.PINECONE_API_KEY) |
| | |
| | |
| | existing_indexes = pc.list_indexes() |
| | index_names = [idx.name for idx in existing_indexes] if hasattr(existing_indexes, '__iter__') else [] |
| | |
| | if self.config.PINECONE_INDEX_NAME not in index_names: |
| | pc.create_index( |
| | name=self.config.PINECONE_INDEX_NAME, |
| | dimension=self.config.EMBEDDING_DIM, |
| | metric="cosine", |
| | spec=ServerlessSpec( |
| | cloud=self.config.PINECONE_CLOUD, |
| | region=self.config.PINECONE_REGION |
| | ) |
| | ) |
| | |
| | print(f"β³ Waiting for index to be ready...") |
| | time.sleep(10) |
| | |
| | self._index = pc.Index(self.config.PINECONE_INDEX_NAME) |
| | print(f"π Pinecone index ready: {self.config.PINECONE_INDEX_NAME}") |
| | except Exception as e: |
| | print(f"β Pinecone initialization error: {str(e)}") |
| | raise ValueError(f"Failed to initialize Pinecone: {str(e)}") |
| | |
| | return self._index |
| | |
| | def _get_embedding(self, text: str) -> List[float]: |
| | """Get embedding and track cost""" |
| | |
| | tokens = len(text) // 4 |
| | cost_tracker.add_embedding(tokens) |
| | |
| | response = self.openai_client.embeddings.create( |
| | model=self.config.EMBEDDING_MODEL, |
| | input=text |
| | ) |
| | return response.data[0].embedding |
| | |
| | def _get_embeddings_batch(self, texts: List[str]) -> List[List[float]]: |
| | """Batch embeddings with cost tracking""" |
| | if not texts: |
| | return [] |
| | |
| | tokens = sum(len(t) // 4 for t in texts) |
| | cost_tracker.add_embedding(tokens) |
| | |
| | response = self.openai_client.embeddings.create( |
| | model=self.config.EMBEDDING_MODEL, |
| | input=texts |
| | ) |
| | return [item.embedding for item in response.data] |
| | |
| | def _detect_language(self, file_path: str) -> str: |
| | ext_map = { |
| | '.py': 'python', '.js': 'javascript', '.jsx': 'javascript', |
| | '.ts': 'typescript', '.tsx': 'typescript', '.java': 'java', |
| | '.go': 'go', '.rs': 'rust', '.cpp': 'cpp', '.c': 'c', |
| | } |
| | return ext_map.get(Path(file_path).suffix.lower(), 'unknown') |
| | |
| | def _chunk_content(self, content: str, file_path: str) -> List[Dict[str, Any]]: |
| | """Chunk content with overlap""" |
| | chunks = [] |
| | lines = content.split('\n') |
| | chunk_lines = self.config.CHUNK_SIZE // 50 |
| | overlap_lines = self.config.CHUNK_OVERLAP // 50 |
| | |
| | i = 0 |
| | chunk_idx = 0 |
| | while i < len(lines): |
| | end = min(i + chunk_lines, len(lines)) |
| | chunk_content = '\n'.join(lines[i:end]) |
| | |
| | if chunk_content.strip(): |
| | chunks.append({ |
| | 'content': chunk_content, |
| | 'file_path': file_path, |
| | 'chunk_index': chunk_idx, |
| | 'line_start': i + 1, |
| | 'line_end': end, |
| | 'language': self._detect_language(file_path) |
| | }) |
| | |
| | i = end - overlap_lines if end < len(lines) else end |
| | chunk_idx += 1 |
| | |
| | return chunks |
| | |
| | def index_file(self, file_path: str, content: str) -> int: |
| | """Index a single file into Pinecone""" |
| | chunks = self._chunk_content(content, file_path) |
| | |
| | if not chunks: |
| | return 0 |
| | |
| | |
| | texts = [c['content'] for c in chunks] |
| | embeddings = self._get_embeddings_batch(texts) |
| | |
| | |
| | vectors = [] |
| | for i, chunk in enumerate(chunks): |
| | vector_id = hashlib.md5( |
| | f"{file_path}_{chunk['chunk_index']}".encode() |
| | ).hexdigest() |
| | |
| | vectors.append({ |
| | "id": vector_id, |
| | "values": embeddings[i], |
| | "metadata": { |
| | "file_path": file_path, |
| | "chunk_index": chunk['chunk_index'], |
| | "language": chunk['language'], |
| | "line_start": chunk['line_start'], |
| | "line_end": chunk['line_end'], |
| | "content": chunk['content'][:1000] |
| | } |
| | }) |
| | |
| | |
| | self.index.upsert(vectors=vectors) |
| | |
| | return len(chunks) |
| | |
| | def index_directory(self, directory_path: str, extensions: List[str] = None) -> Dict[str, int]: |
| | """Index all files in a directory""" |
| | if extensions is None: |
| | extensions = ['.py', '.js', '.jsx', '.ts', '.tsx', '.java', '.go'] |
| | |
| | results = {} |
| | directory = Path(directory_path) |
| | |
| | for ext in extensions: |
| | for file_path in directory.rglob(f"*{ext}"): |
| | if any(skip in str(file_path) for skip in ['node_modules', '__pycache__', '.git', 'venv']): |
| | continue |
| | |
| | try: |
| | content = file_path.read_text(encoding='utf-8') |
| | chunks = self.index_file(str(file_path), content) |
| | results[str(file_path)] = chunks |
| | print(f" β
{file_path.name}: {chunks} chunks") |
| | except Exception as e: |
| | results[str(file_path)] = f"Error: {e}" |
| | |
| | return results |
| | |
| | def search(self, query: str, top_k: int = None) -> List[Dict[str, Any]]: |
| | """Search codebase""" |
| | if top_k is None: |
| | top_k = self.config.TOP_K_RESULTS |
| | |
| | query_embedding = self._get_embedding(query) |
| | |
| | results = self.index.query( |
| | vector=query_embedding, |
| | top_k=top_k, |
| | include_metadata=True |
| | ) |
| | |
| | formatted = [] |
| | for match in results.matches: |
| | formatted.append({ |
| | 'content': match.metadata.get('content', ''), |
| | 'metadata': { |
| | 'file_path': match.metadata.get('file_path', ''), |
| | 'line_start': match.metadata.get('line_start', 0), |
| | 'line_end': match.metadata.get('line_end', 0), |
| | 'language': match.metadata.get('language', '') |
| | }, |
| | 'score': match.score |
| | }) |
| | |
| | return formatted |
| | |
| | def get_stats(self) -> Dict[str, Any]: |
| | """Get index statistics""" |
| | try: |
| | stats = self.index.describe_index_stats() |
| | return { |
| | 'total_chunks': stats.total_vector_count, |
| | 'index_name': self.config.PINECONE_INDEX_NAME, |
| | 'dimension': stats.dimension |
| | } |
| | except: |
| | return {'total_chunks': 0, 'index_name': self.config.PINECONE_INDEX_NAME} |
| | |
| | def clear_index(self): |
| | """Clear all vectors""" |
| | try: |
| | self.index.delete(delete_all=True) |
| | print("β οΈ Index cleared!") |
| | except: |
| | pass |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class ArchitectLLM: |
| | """LLM #1: Architect - planning and analysis""" |
| | |
| | def __init__(self, config: Config): |
| | self.config = config |
| | self._client = None |
| | self.model = config.ARCHITECT_MODEL |
| | |
| | @property |
| | def client(self): |
| | if self._client is None: |
| | if not self.config.OPENAI_API_KEY: |
| | raise ValueError("OpenAI API key not set!") |
| | self._client = OpenAI(api_key=self.config.OPENAI_API_KEY) |
| | return self._client |
| | |
| | def reset_client(self): |
| | self._client = None |
| | |
| | def analyze_ticket(self, ticket: JiraTicket) -> Dict[str, Any]: |
| | prompt = f"""Analyze this Jira ticket for implementation: |
| | |
| | ID: {ticket.ticket_id} |
| | Title: {ticket.title} |
| | Description: {ticket.description} |
| | Acceptance Criteria: {ticket.acceptance_criteria or 'Not specified'} |
| | |
| | Provide JSON: |
| | {{ |
| | "summary": "2-3 sentence summary", |
| | "key_entities": ["entity1", "entity2"], |
| | "technical_keywords": ["keyword1", "keyword2"], |
| | "prerequisites": ["prereq1"], |
| | "complexity": "Low/Medium/High", |
| | "complexity_reason": "why", |
| | "risks": ["risk1"] |
| | }}""" |
| |
|
| | response = self.client.chat.completions.create( |
| | model=self.model, |
| | messages=[{"role": "user", "content": prompt}], |
| | temperature=0.3 |
| | ) |
| | |
| | |
| | usage = response.usage |
| | cost_tracker.add_architect_call(usage.prompt_tokens, usage.completion_tokens) |
| | |
| | content = response.choices[0].message.content |
| | try: |
| | content = re.sub(r'^```json?\s*', '', content.strip()) |
| | content = re.sub(r'\s*```$', '', content) |
| | return json.loads(content) |
| | except: |
| | return {"summary": content, "key_entities": [], "technical_keywords": [], |
| | "prerequisites": [], "complexity": "Unknown", "complexity_reason": "", "risks": []} |
| | |
| | def create_implementation_strategy(self, ticket_analysis: Dict, code_context: List[Dict]) -> Dict: |
| | context_str = "\n".join([ |
| | f"File: {c['metadata'].get('file_path', '?')}\n{c['content'][:500]}" |
| | for c in code_context[:5] |
| | ]) |
| | |
| | prompt = f"""Create implementation strategy: |
| | |
| | Analysis: {json.dumps(ticket_analysis)} |
| | |
| | Code Context: |
| | {context_str} |
| | |
| | Provide JSON: |
| | {{ |
| | "architecture_notes": "how it fits", |
| | "implementation_steps": ["step1", "step2"], |
| | "files_to_modify": [{{"path": "file", "action": "modify/create", "reason": "why"}}], |
| | "patterns_to_follow": ["pattern1"], |
| | "integration_points": ["point1"] |
| | }}""" |
| |
|
| | response = self.client.chat.completions.create( |
| | model=self.model, |
| | messages=[{"role": "user", "content": prompt}], |
| | temperature=0.3 |
| | ) |
| | |
| | usage = response.usage |
| | cost_tracker.add_architect_call(usage.prompt_tokens, usage.completion_tokens) |
| | |
| | content = response.choices[0].message.content |
| | try: |
| | content = re.sub(r'^```json?\s*', '', content.strip()) |
| | content = re.sub(r'\s*```$', '', content) |
| | return json.loads(content) |
| | except: |
| | return {"architecture_notes": content, "implementation_steps": [], |
| | "files_to_modify": [], "patterns_to_follow": [], "integration_points": []} |
| |
|
| |
|
| | class DeveloperLLM: |
| | """LLM #2: Developer - code generation""" |
| | |
| | def __init__(self, config: Config): |
| | self.config = config |
| | self._client = None |
| | self.model = config.DEVELOPER_MODEL |
| | |
| | @property |
| | def client(self): |
| | if self._client is None: |
| | if not self.config.OPENAI_API_KEY: |
| | raise ValueError("OpenAI API key not set!") |
| | self._client = OpenAI(api_key=self.config.OPENAI_API_KEY) |
| | return self._client |
| | |
| | def reset_client(self): |
| | self._client = None |
| | |
| | def generate_boilerplate(self, ticket_analysis: Dict, strategy: Dict, code_context: List[Dict]) -> Dict[str, str]: |
| | context_str = "\n".join([f"// {c['metadata'].get('file_path', '?')}\n{c['content'][:400]}" |
| | for c in code_context[:3]]) |
| | |
| | prompt = f"""Generate boilerplate code: |
| | |
| | Summary: {ticket_analysis.get('summary', '')} |
| | Entities: {ticket_analysis.get('key_entities', [])} |
| | Steps: {strategy.get('implementation_steps', [])} |
| | |
| | Existing patterns: |
| | {context_str} |
| | |
| | Respond with JSON where keys are file paths: |
| | {{"path/file.py": "# code with TODOs"}}""" |
| |
|
| | response = self.client.chat.completions.create( |
| | model=self.model, |
| | messages=[{"role": "user", "content": prompt}], |
| | temperature=0.2 |
| | ) |
| | |
| | usage = response.usage |
| | cost_tracker.add_developer_call(usage.prompt_tokens, usage.completion_tokens) |
| | |
| | content = response.choices[0].message.content |
| | try: |
| | content = re.sub(r'^```json?\s*', '', content.strip()) |
| | content = re.sub(r'\s*```$', '', content) |
| | return json.loads(content) |
| | except: |
| | return {"generated_code.txt": content} |
| | |
| | def explain_code_context(self, code_context: List[Dict], question: str) -> str: |
| | context_str = "\n".join([f"File: {c['metadata'].get('file_path', '?')}\n{c['content']}" |
| | for c in code_context[:5]]) |
| | |
| | prompt = f"""Explain this code: |
| | |
| | {context_str} |
| | |
| | Question: {question} |
| | |
| | Be concise and helpful.""" |
| |
|
| | response = self.client.chat.completions.create( |
| | model=self.model, |
| | messages=[{"role": "user", "content": prompt}], |
| | temperature=0.3 |
| | ) |
| | |
| | usage = response.usage |
| | cost_tracker.add_developer_call(usage.prompt_tokens, usage.completion_tokens) |
| | |
| | return response.choices[0].message.content |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class DevProductivityAgent: |
| | """Main orchestrator with Pinecone and cost tracking""" |
| | |
| | def __init__(self, config: Config = None): |
| | self.config = config or Config() |
| | self.indexer = CodebaseIndexer(self.config) |
| | self.architect = ArchitectLLM(self.config) |
| | self.developer = DeveloperLLM(self.config) |
| | |
| | def set_api_keys(self, openai_key: str = None, pinecone_key: str = None): |
| | """Set API keys""" |
| | if openai_key: |
| | self.config.OPENAI_API_KEY = openai_key |
| | self.architect.reset_client() |
| | self.developer.reset_client() |
| | self.indexer._openai_client = None |
| | if pinecone_key: |
| | self.config.PINECONE_API_KEY = pinecone_key |
| | self.indexer._index = None |
| | |
| | def index_codebase(self, directory: str, extensions: List[str] = None) -> Dict: |
| | print(f"π Indexing: {directory}") |
| | results = self.indexer.index_directory(directory, extensions) |
| | stats = self.indexer.get_stats() |
| | return { |
| | "files_indexed": len([r for r in results.values() if isinstance(r, int)]), |
| | "total_chunks": stats['total_chunks'], |
| | "details": results |
| | } |
| | |
| | def process_ticket(self, ticket: JiraTicket) -> ImplementationPlan: |
| | print("π Analyzing...") |
| | analysis = self.architect.analyze_ticket(ticket) |
| | |
| | print("π Searching...") |
| | queries = analysis.get('technical_keywords', []) + analysis.get('key_entities', []) |
| | |
| | all_results = [] |
| | seen = set() |
| | for q in queries[:5]: |
| | for r in self.indexer.search(q, top_k=5): |
| | fp = r['metadata'].get('file_path', '') |
| | if fp not in seen: |
| | all_results.append(r) |
| | seen.add(fp) |
| | |
| | print("π Planning...") |
| | strategy = self.architect.create_implementation_strategy(analysis, all_results) |
| | |
| | print("π» Generating...") |
| | code = self.developer.generate_boilerplate(analysis, strategy, all_results) |
| | |
| | cost_tracker.record_ticket() |
| | |
| | return ImplementationPlan( |
| | ticket_summary=analysis.get('summary', ''), |
| | key_entities=analysis.get('key_entities', []), |
| | relevant_files=[{ |
| | 'path': r['metadata'].get('file_path', ''), |
| | 'relevance': f"Lines {r['metadata'].get('line_start', '?')}-{r['metadata'].get('line_end', '?')}", |
| | 'preview': r['content'][:200] |
| | } for r in all_results[:10]], |
| | implementation_steps=strategy.get('implementation_steps', []), |
| | prerequisites=analysis.get('prerequisites', []), |
| | boilerplate_code=code, |
| | architecture_notes=strategy.get('architecture_notes', ''), |
| | estimated_complexity=analysis.get('complexity', 'Unknown') |
| | ) |
| | |
| | def ask_about_code(self, question: str) -> str: |
| | results = self.indexer.search(question) |
| | if not results: |
| | return "No relevant code found. Index your codebase first." |
| | answer = self.developer.explain_code_context(results, question) |
| | cost_tracker.record_question() |
| | return answer |
| | |
| | def get_cost_stats(self) -> Dict: |
| | return cost_tracker.get_stats() |
| | |
| | def reset_cost_tracking(self): |
| | cost_tracker.reset() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | app = FastAPI(title="Developer Productivity Agent", version="2.0.0") |
| | app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, |
| | allow_methods=["*"], allow_headers=["*"]) |
| |
|
| | agent = DevProductivityAgent() |
| |
|
| | @app.get("/") |
| | async def root(): |
| | stats = agent.indexer.get_stats() |
| | return {"status": "healthy", "vector_db": "Pinecone", "chunks": stats['total_chunks']} |
| |
|
| | @app.get("/stats") |
| | async def get_stats(): |
| | return agent.indexer.get_stats() |
| |
|
| | @app.get("/cost-analytics") |
| | async def get_cost_analytics(): |
| | """Get cost analytics and savings""" |
| | return agent.get_cost_stats() |
| |
|
| | @app.post("/reset-costs") |
| | async def reset_costs(): |
| | agent.reset_cost_tracking() |
| | return {"status": "reset"} |
| |
|
| | @app.post("/index") |
| | async def index_codebase(directory: str, extensions: List[str] = None): |
| | try: |
| | return {"status": "success", "results": agent.index_codebase(directory, extensions)} |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.post("/process-ticket", response_model=ImplementationPlan) |
| | async def process_ticket(ticket: JiraTicket): |
| | try: |
| | return agent.process_ticket(ticket) |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.post("/ask") |
| | async def ask(question: str): |
| | try: |
| | return {"answer": agent.ask_about_code(question)} |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.post("/search") |
| | async def search(query: str, top_k: int = 10): |
| | try: |
| | return {"results": agent.indexer.search(query, top_k)} |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.delete("/clear") |
| | async def clear(): |
| | agent.indexer.clear_index() |
| | return {"status": "cleared"} |
| |
|
| | if __name__ == "__main__": |
| | import argparse |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument("--index", type=str) |
| | parser.add_argument("--serve", action="store_true") |
| | parser.add_argument("--port", type=int, default=8000) |
| | args = parser.parse_args() |
| | |
| | if args.index: |
| | agent.index_codebase(args.index) |
| | if args.serve: |
| | uvicorn.run(app, host="0.0.0.0", port=args.port) |