Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
# Auto-Analyst Backend Development Workflow | |
## π― Development Philosophy | |
The Auto-Analyst backend follows modern Python development practices with emphasis on: | |
- **Modularity**: Clear separation of concerns across components | |
- **Async-First**: Non-blocking operations for scalability | |
- **Type Safety**: Comprehensive type hints and validation | |
- **Documentation**: Self-documenting code and comprehensive docs | |
- **Testing**: Robust testing at multiple levels | |
- **Performance**: Optimized for real-world usage patterns | |
## ποΈ Code Organization Principles | |
### 1. **Directory Structure Standards** | |
``` | |
src/ | |
βββ agents/ # AI agent implementations | |
β βββ agents.py # Core agent definitions | |
β βββ deep_agents.py # Deep analysis system | |
β βββ retrievers/ # Information retrieval components | |
βββ db/ # Database layer | |
β βββ init_db.py # Database initialization | |
β βββ schemas/ # SQLAlchemy models | |
βββ managers/ # Business logic layer | |
β βββ chat_manager.py # Chat operations | |
β βββ ai_manager.py # AI model management | |
β βββ session_manager.py # Session lifecycle | |
βββ routes/ # FastAPI route handlers | |
β βββ core_routes.py # Core functionality | |
β βββ chat_routes.py # Chat endpoints | |
β βββ [feature]_routes.py # Feature-specific routes | |
βββ utils/ # Shared utilities | |
β βββ logger.py # Centralized logging | |
β βββ helpers.py # Common functions | |
βββ schemas/ # Pydantic models | |
βββ chat_schemas.py # Chat data models | |
βββ [feature]_schemas.py # Feature schemas | |
``` | |
### 2. **Import Organization** | |
```python | |
# Standard library imports | |
import asyncio | |
import json | |
from datetime import datetime | |
from typing import List, Optional, Dict, Any | |
# Third-party imports | |
import dspy | |
import pandas as pd | |
from fastapi import APIRouter, Depends, HTTPException | |
from pydantic import BaseModel | |
from sqlalchemy.orm import Session | |
# Local imports | |
from src.db.init_db import session_factory | |
from src.db.schemas.models import User, Chat | |
from src.utils.logger import Logger | |
from src.managers.chat_manager import ChatManager | |
``` | |
## π οΈ Development Patterns | |
### 1. **Agent Development Pattern** | |
```python | |
# 1. Define DSPy Signature | |
class new_analysis_agent(dspy.Signature): | |
""" | |
Comprehensive docstring explaining: | |
- Agent purpose and capabilities | |
- Input requirements and formats | |
- Expected output format | |
- Usage examples | |
""" | |
goal = dspy.InputField(desc="Clear description of analysis objective") | |
dataset = dspy.InputField(desc="Dataset structure and content description") | |
plan_instructions = dspy.InputField(desc="Execution plan from planner") | |
summary = dspy.OutputField(desc="Natural language summary of analysis") | |
code = dspy.OutputField(desc="Executable Python code for analysis") | |
# 2. Add to Agent Configuration | |
# In agents_config.json: | |
{ | |
"template_name": "new_analysis_agent", | |
"description": "Performs specialized analysis on datasets", | |
"variant_type": "both", # individual, planner, or both | |
"is_premium": false, | |
"usage_count": 0, | |
"icon_url": "analysis.svg" | |
} | |
# 3. Register in Agent System | |
# In agents.py, add to the appropriate loading functions | |
``` | |
### 2. **Route Development Pattern** | |
```python | |
# 1. Create route file: src/routes/feature_routes.py | |
from fastapi import APIRouter, Depends, HTTPException, Query | |
from pydantic import BaseModel | |
from typing import List, Optional | |
from src.db.init_db import session_factory | |
from src.db.schemas.models import FeatureModel | |
from src.utils.logger import Logger | |
logger = Logger("feature_routes", see_time=True, console_log=False) | |
router = APIRouter(prefix="/feature", tags=["feature"]) | |
# 2. Define Pydantic schemas | |
class FeatureCreate(BaseModel): | |
name: str | |
description: Optional[str] = None | |
class FeatureResponse(BaseModel): | |
id: int | |
name: str | |
description: Optional[str] | |
created_at: datetime | |
# 3. Implement endpoints with proper error handling | |
@router.post("/", response_model=FeatureResponse) | |
async def create_feature(feature: FeatureCreate): | |
try: | |
session = session_factory() | |
try: | |
new_feature = FeatureModel( | |
name=feature.name, | |
description=feature.description | |
) | |
session.add(new_feature) | |
session.commit() | |
session.refresh(new_feature) | |
return FeatureResponse( | |
id=new_feature.id, | |
name=new_feature.name, | |
description=new_feature.description, | |
created_at=new_feature.created_at | |
) | |
except Exception as e: | |
session.rollback() | |
logger.log_message(f"Error creating feature: {str(e)}", level=logging.ERROR) | |
raise HTTPException(status_code=500, detail=f"Failed to create feature: {str(e)}") | |
finally: | |
session.close() | |
except Exception as e: | |
logger.log_message(f"Error in create_feature: {str(e)}", level=logging.ERROR) | |
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") | |
# 4. Register in app.py | |
from src.routes.feature_routes import router as feature_router | |
app.include_router(feature_router) | |
``` | |
### 3. **Database Model Pattern** | |
```python | |
# In src/db/schemas/models.py | |
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, ForeignKey | |
from sqlalchemy.orm import relationship | |
from sqlalchemy.ext.declarative import declarative_base | |
from datetime import datetime, timezone | |
Base = declarative_base() | |
class NewModel(Base): | |
__tablename__ = "new_models" | |
# Primary key | |
id = Column(Integer, primary_key=True, autoincrement=True) | |
# Required fields | |
name = Column(String(255), nullable=False, unique=True) | |
# Optional fields | |
description = Column(Text, nullable=True) | |
is_active = Column(Boolean, default=True, nullable=False) | |
# Timestamps | |
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), nullable=False) | |
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc), nullable=False) | |
# Foreign keys | |
user_id = Column(Integer, ForeignKey("users.user_id"), nullable=True) | |
# Relationships | |
user = relationship("User", back_populates="new_models") | |
def __repr__(self): | |
return f"<NewModel(id={self.id}, name='{self.name}')>" | |
# Update User model to include back reference | |
class User(Base): | |
# ... existing fields ... | |
new_models = relationship("NewModel", back_populates="user") | |
``` | |
### 4. **Manager Pattern** | |
```python | |
# In src/managers/feature_manager.py | |
from typing import List, Optional, Dict, Any | |
from sqlalchemy.orm import Session | |
from src.db.schemas.models import FeatureModel | |
from src.utils.logger import Logger | |
logger = Logger("feature_manager", see_time=True, console_log=False) | |
class FeatureManager: | |
""" | |
Manages business logic for feature operations. | |
Separates complex business logic from route handlers. | |
""" | |
def __init__(self, session: Session): | |
self.session = session | |
async def create_feature(self, name: str, description: Optional[str] = None) -> FeatureModel: | |
"""Create a new feature with validation and business logic.""" | |
try: | |
# Validation | |
if not name or len(name.strip()) == 0: | |
raise ValueError("Feature name cannot be empty") | |
# Check for duplicates | |
existing = self.session.query(FeatureModel).filter_by(name=name).first() | |
if existing: | |
raise ValueError(f"Feature with name '{name}' already exists") | |
# Create feature | |
feature = FeatureModel(name=name, description=description) | |
self.session.add(feature) | |
self.session.commit() | |
self.session.refresh(feature) | |
logger.log_message(f"Created feature: {name}", level=logging.INFO) | |
return feature | |
except Exception as e: | |
self.session.rollback() | |
logger.log_message(f"Error creating feature: {str(e)}", level=logging.ERROR) | |
raise | |
async def get_features(self, active_only: bool = True) -> List[FeatureModel]: | |
"""Retrieve features with optional filtering.""" | |
try: | |
query = self.session.query(FeatureModel) | |
if active_only: | |
query = query.filter(FeatureModel.is_active == True) | |
features = query.order_by(FeatureModel.created_at.desc()).all() | |
return features | |
except Exception as e: | |
logger.log_message(f"Error retrieving features: {str(e)}", level=logging.ERROR) | |
raise | |
``` | |
## π Code Quality Standards | |
### 1. **Type Hints and Documentation** | |
```python | |
from typing import List, Optional, Dict, Any, Union | |
from datetime import datetime | |
async def process_analysis_data( | |
data: pd.DataFrame, | |
analysis_type: str, | |
user_id: Optional[int] = None, | |
options: Dict[str, Any] = None | |
) -> Dict[str, Union[str, List[Any], bool]]: | |
""" | |
Process analysis data with specified parameters. | |
Args: | |
data: Input DataFrame containing the data to analyze | |
analysis_type: Type of analysis to perform ("statistical", "ml", "viz") | |
user_id: Optional user ID for tracking and personalization | |
options: Additional options for analysis configuration | |
Returns: | |
Dictionary containing: | |
- status: "success" or "error" | |
- result: Analysis results or error message | |
- metadata: Additional information about the analysis | |
Raises: | |
ValueError: If analysis_type is not supported | |
DataError: If data format is invalid | |
Example: | |
>>> data = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) | |
>>> result = await process_analysis_data(data, "statistical") | |
>>> print(result["status"]) | |
"success" | |
""" | |
if options is None: | |
options = {} | |
# Implementation... | |
return {"status": "success", "result": [], "metadata": {}} | |
``` | |
### 2. **Error Handling Patterns** | |
```python | |
# Comprehensive error handling with logging and user-friendly messages | |
async def safe_operation(data: Any) -> Dict[str, Any]: | |
""" | |
Template for safe operations with comprehensive error handling. | |
""" | |
try: | |
# Validation | |
if not data: | |
raise ValueError("Data cannot be empty") | |
# Main operation | |
result = await perform_operation(data) | |
# Success logging | |
logger.log_message("Operation completed successfully", level=logging.INFO) | |
return {"success": True, "data": result} | |
except ValueError as e: | |
# Input validation errors | |
logger.log_message(f"Validation error: {str(e)}", level=logging.WARNING) | |
return {"success": False, "error": "Invalid input", "details": str(e)} | |
except ConnectionError as e: | |
# External service errors | |
logger.log_message(f"Connection error: {str(e)}", level=logging.ERROR) | |
return {"success": False, "error": "Service unavailable", "details": "Please try again later"} | |
except Exception as e: | |
# Unexpected errors | |
logger.log_message(f"Unexpected error in safe_operation: {str(e)}", level=logging.ERROR) | |
return {"success": False, "error": "Internal error", "details": "Please contact support"} | |
``` | |
### 3. **Async/Await Best Practices** | |
```python | |
import asyncio | |
from typing import List, Coroutine | |
# Proper async function definition | |
async def async_agent_execution(agents: List[str], query: str) -> List[Dict[str, Any]]: | |
"""Execute multiple agents concurrently.""" | |
# Create coroutines | |
tasks = [ | |
execute_single_agent(agent, query) | |
for agent in agents | |
] | |
# Execute concurrently with error handling | |
results = [] | |
for task in asyncio.as_completed(tasks): | |
try: | |
result = await task | |
results.append(result) | |
except Exception as e: | |
logger.log_message(f"Agent execution failed: {e}", level=logging.ERROR) | |
results.append({"error": str(e)}) | |
return results | |
# Database operations with proper session management | |
async def async_database_operation(session: Session) -> Any: | |
"""Template for async database operations.""" | |
try: | |
# Use asyncio.to_thread for CPU-bound database operations | |
result = await asyncio.to_thread( | |
lambda: session.query(Model).filter(...).all() | |
) | |
return result | |
except Exception as e: | |
session.rollback() | |
raise | |
finally: | |
session.close() | |
``` | |
## π§ͺ Testing Patterns | |
### 1. **Unit Testing Structure** | |
```python | |
# tests/test_feature_manager.py | |
import pytest | |
from unittest.mock import Mock, patch | |
from src.managers.feature_manager import FeatureManager | |
from src.db.schemas.models import FeatureModel | |
class TestFeatureManager: | |
"""Test cases for FeatureManager class.""" | |
@pytest.fixture | |
def mock_session(self): | |
"""Create a mock database session.""" | |
return Mock() | |
@pytest.fixture | |
def feature_manager(self, mock_session): | |
"""Create FeatureManager instance with mock session.""" | |
return FeatureManager(mock_session) | |
async def test_create_feature_success(self, feature_manager, mock_session): | |
"""Test successful feature creation.""" | |
# Arrange | |
mock_session.query.return_value.filter_by.return_value.first.return_value = None | |
# Act | |
result = await feature_manager.create_feature("test_feature", "Test description") | |
# Assert | |
assert isinstance(result, FeatureModel) | |
mock_session.add.assert_called_once() | |
mock_session.commit.assert_called_once() | |
async def test_create_feature_duplicate_name(self, feature_manager, mock_session): | |
"""Test feature creation with duplicate name.""" | |
# Arrange | |
existing_feature = FeatureModel(name="test_feature") | |
mock_session.query.return_value.filter_by.return_value.first.return_value = existing_feature | |
# Act & Assert | |
with pytest.raises(ValueError, match="already exists"): | |
await feature_manager.create_feature("test_feature") | |
``` | |
### 2. **Integration Testing** | |
```python | |
# tests/integration/test_api_endpoints.py | |
import pytest | |
from fastapi.testclient import TestClient | |
from src.app import app | |
from src.db.init_db import session_factory | |
client = TestClient(app) | |
class TestFeatureAPI: | |
"""Integration tests for feature API endpoints.""" | |
def test_create_feature_endpoint(self): | |
"""Test feature creation through API.""" | |
response = client.post( | |
"/feature/", | |
json={"name": "test_feature", "description": "Test description"} | |
) | |
assert response.status_code == 200 | |
data = response.json() | |
assert data["name"] == "test_feature" | |
assert "id" in data | |
def test_get_features_endpoint(self): | |
"""Test feature retrieval through API.""" | |
response = client.get("/feature/") | |
assert response.status_code == 200 | |
data = response.json() | |
assert isinstance(data, list) | |
``` | |
## π§ Development Workflow | |
### 1. **Feature Development Process** | |
1. **Plan the Feature**: | |
```bash | |
# Create feature branch | |
git checkout -b feature/new-analysis-agent | |
# Document requirements | |
echo "## New Analysis Agent" >> docs/feature_plan.md | |
``` | |
2. **Implement Core Logic**: | |
```bash | |
# Create agent signature | |
# Add to agents_config.json | |
# Implement business logic in managers/ | |
# Create route handlers | |
``` | |
3. **Add Database Changes**: | |
```bash | |
# Modify models if needed | |
alembic revision --autogenerate -m "Add new analysis tables" | |
alembic upgrade head | |
``` | |
4. **Testing**: | |
```bash | |
# Run unit tests | |
pytest tests/unit/ | |
# Run integration tests | |
pytest tests/integration/ | |
# Test API endpoints | |
curl -X POST "http://localhost:8000/feature/test" | |
``` | |
5. **Documentation**: | |
```bash | |
# Update API documentation | |
# Add to troubleshooting guide if needed | |
# Update getting started guide | |
``` | |
### 2. **Code Review Checklist** | |
- [ ] **Type Hints**: All functions have proper type annotations | |
- [ ] **Documentation**: Docstrings for all public functions and classes | |
- [ ] **Error Handling**: Comprehensive exception handling with logging | |
- [ ] **Testing**: Unit tests cover all new functionality | |
- [ ] **Database**: Proper session management and transaction handling | |
- [ ] **Async**: Appropriate use of async/await patterns | |
- [ ] **Logging**: Meaningful log messages at appropriate levels | |
- [ ] **Security**: Input validation and authorization checks | |
- [ ] **Performance**: Efficient database queries and memory usage | |
### 3. **Release Process** | |
1. **Pre-release Testing**: | |
```bash | |
# Run full test suite | |
pytest tests/ | |
# Test database migrations | |
alembic upgrade head | |
# Test with sample data | |
python scripts/test_with_sample_data.py | |
``` | |
2. **Documentation Updates**: | |
```bash | |
# Update API documentation | |
# Update troubleshooting guide | |
# Update changelog | |
``` | |
3. **Deployment Preparation**: | |
```bash | |
# Update requirements.txt | |
pip freeze > requirements.txt | |
# Test container build | |
docker build -t auto-analyst-backend . | |
# Verify environment variables | |
python scripts/verify_env.py | |
``` | |
## π Performance Considerations | |
### 1. **Database Optimization** | |
```python | |
# Use query optimization | |
from sqlalchemy.orm import joinedload | |
# Bad: N+1 query problem | |
users = session.query(User).all() | |
for user in users: | |
print(user.chats) # Separate query for each user | |
# Good: Eager loading | |
users = session.query(User).options(joinedload(User.chats)).all() | |
for user in users: | |
print(user.chats) # No additional queries | |
# Use pagination for large datasets | |
def get_paginated_results(session, model, page=1, per_page=20): | |
offset = (page - 1) * per_page | |
return session.query(model).offset(offset).limit(per_page).all() | |
``` | |
### 2. **Memory Management** | |
```python | |
# Efficient data processing | |
def process_large_dataset(file_path: str): | |
# Bad: Load entire file into memory | |
# df = pd.read_csv(file_path) | |
# Good: Process in chunks | |
chunk_size = 1000 | |
for chunk in pd.read_csv(file_path, chunksize=chunk_size): | |
process_chunk(chunk) | |
# Memory is freed after each chunk | |
# Clean up resources | |
def cleanup_session_data(session_id: str): | |
if session_id in global_session_data: | |
del global_session_data[session_id] | |
gc.collect() # Force garbage collection | |
``` | |
### 3. **Async Optimization** | |
```python | |
# Use connection pooling | |
from sqlalchemy.pool import QueuePool | |
engine = create_engine( | |
DATABASE_URL, | |
poolclass=QueuePool, | |
pool_size=20, | |
max_overflow=30 | |
) | |
# Batch operations | |
async def batch_process_agents(agents: List[str], queries: List[str]): | |
semaphore = asyncio.Semaphore(5) # Limit concurrent operations | |
async def process_with_limit(agent, query): | |
async with semaphore: | |
return await process_agent(agent, query) | |
tasks = [ | |
process_with_limit(agent, query) | |
for agent, query in zip(agents, queries) | |
] | |
return await asyncio.gather(*tasks, return_exceptions=True) | |
``` | |
This development workflow guide provides a comprehensive framework for maintaining code quality, consistency, and performance in the Auto-Analyst backend system. Following these patterns ensures that new features integrate seamlessly with the existing architecture while maintaining the high standards of the codebase. |