auto-analyst-backend / docs /development /development_workflow.md
GitHub Actions
Merge branch 'FireBird-Technologies:main' into main
7643a03

Auto-Analyst Backend Development Workflow

🎯 Development Philosophy

The Auto-Analyst backend follows modern Python development practices with emphasis on:

  • Modularity: Clear separation of concerns across components
  • Async-First: Non-blocking operations for scalability
  • Type Safety: Comprehensive type hints and validation
  • Documentation: Self-documenting code and comprehensive docs
  • Testing: Robust testing at multiple levels
  • Performance: Optimized for real-world usage patterns

πŸ—οΈ Code Organization Principles

1. Directory Structure Standards

src/
β”œβ”€β”€ agents/           # AI agent implementations
β”‚   β”œβ”€β”€ agents.py    # Core agent definitions
β”‚   β”œβ”€β”€ deep_agents.py # Deep analysis system
β”‚   └── retrievers/  # Information retrieval components
β”œβ”€β”€ db/              # Database layer
β”‚   β”œβ”€β”€ init_db.py   # Database initialization
β”‚   └── schemas/     # SQLAlchemy models
β”œβ”€β”€ managers/        # Business logic layer
β”‚   β”œβ”€β”€ chat_manager.py    # Chat operations
β”‚   β”œβ”€β”€ ai_manager.py      # AI model management
β”‚   └── session_manager.py # Session lifecycle
β”œβ”€β”€ routes/          # FastAPI route handlers
β”‚   β”œβ”€β”€ session_routes.py     # Core functionality
β”‚   β”œβ”€β”€ chat_routes.py     # Chat endpoints
β”‚   └── [feature]_routes.py # Feature-specific routes
β”œβ”€β”€ utils/           # Shared utilities
β”‚   β”œβ”€β”€ logger.py    # Centralized logging
β”‚   └── helpers.py   # Common functions
└── schemas/         # Pydantic models
    β”œβ”€β”€ chat_schemas.py    # Chat data models
    └── [feature]_schemas.py # Feature schemas

2. Import Organization

# Standard library imports
import asyncio
import json
from datetime import datetime
from typing import List, Optional, Dict, Any

# Third-party imports
import dspy
import pandas as pd
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session

# Local imports
from src.db.init_db import session_factory
from src.db.schemas.models import User, Chat
from src.utils.logger import Logger
from src.managers.chat_manager import ChatManager

πŸ› οΈ Development Patterns

1. Agent Development Pattern

# 1. Define DSPy Signature
class new_analysis_agent(dspy.Signature):
    """
    Comprehensive docstring explaining:
    - Agent purpose and capabilities
    - Input requirements and formats
    - Expected output format
    - Usage examples
    """
    goal = dspy.InputField(desc="Clear description of analysis objective")
    dataset = dspy.InputField(desc="Dataset structure and content description")
    plan_instructions = dspy.InputField(desc="Execution plan from planner")
    
    summary = dspy.OutputField(desc="Natural language summary of analysis")
    code = dspy.OutputField(desc="Executable Python code for analysis")

# 2. Add to Agent Configuration
# In agents_config.json:
{
  "template_name": "new_analysis_agent",
  "description": "Performs specialized analysis on datasets",
  "variant_type": "both",  # individual, planner, or both
  "is_premium": false, # Will be active by default
  "usage_count": 0,
  "icon_url": "analysis.svg"
}

# 3. Register in Agent System
# In agents.py, add to the appropriate loading functions

2. Route Development Pattern

# 1. Create route file: src/routes/feature_routes.py
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from typing import List, Optional
from src.db.init_db import session_factory
from src.db.schemas.models import FeatureModel
from src.utils.logger import Logger

logger = Logger("feature_routes", see_time=True, console_log=False)
router = APIRouter(prefix="/feature", tags=["feature"])

# 2. Define Pydantic schemas
class FeatureCreate(BaseModel):
    name: str
    description: Optional[str] = None
    
class FeatureResponse(BaseModel):
    id: int
    name: str
    description: Optional[str]
    created_at: datetime

# 3. Implement endpoints with proper error handling
@router.post("/", response_model=FeatureResponse)
async def create_feature(feature: FeatureCreate):
    try:
        session = session_factory()
        try:
            new_feature = FeatureModel(
                name=feature.name,
                description=feature.description
            )
            session.add(new_feature)
            session.commit()
            session.refresh(new_feature)
            
            return FeatureResponse(
                id=new_feature.id,
                name=new_feature.name,
                description=new_feature.description,
                created_at=new_feature.created_at
            )
            
        except Exception as e:
            session.rollback()
            logger.log_message(f"Error creating feature: {str(e)}", level=logging.ERROR)
            raise HTTPException(status_code=500, detail=f"Failed to create feature: {str(e)}")
        finally:
            session.close()
            
    except Exception as e:
        logger.log_message(f"Error in create_feature: {str(e)}", level=logging.ERROR)
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")

# 4. Register in app.py
from src.routes.feature_routes import router as feature_router
app.include_router(feature_router)

3. Database Model Pattern

# In src/db/schemas/models.py
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime, timezone

Base = declarative_base()

class NewModel(Base):
    __tablename__ = "new_models"
    
    # Primary key
    id = Column(Integer, primary_key=True, autoincrement=True)
    
    # Required fields
    name = Column(String(255), nullable=False, unique=True)
    
    # Optional fields
    description = Column(Text, nullable=True)
    is_active = Column(Boolean, default=True, nullable=False)
    
    # Timestamps
    created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), nullable=False)
    updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc), nullable=False)
    
    # Foreign keys
    user_id = Column(Integer, ForeignKey("users.user_id"), nullable=True)
    
    # Relationships
    user = relationship("User", back_populates="new_models")
    
    def __repr__(self):
        return f"<NewModel(id={self.id}, name='{self.name}')>"

# Update User model to include back reference
class User(Base):
    # ... existing fields ...
    new_models = relationship("NewModel", back_populates="user")

4. Manager Pattern

# In src/managers/feature_manager.py
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session
from src.db.schemas.models import FeatureModel
from src.utils.logger import Logger

logger = Logger("feature_manager", see_time=True, console_log=False)

class FeatureManager:
    """
    Manages business logic for feature operations.
    Separates complex business logic from route handlers.
    """
    
    def __init__(self, session: Session):
        self.session = session
    
    async def create_feature(self, name: str, description: Optional[str] = None) -> FeatureModel:
        """Create a new feature with validation and business logic."""
        try:
            # Validation
            if not name or len(name.strip()) == 0:
                raise ValueError("Feature name cannot be empty")
            
            # Check for duplicates
            existing = self.session.query(FeatureModel).filter_by(name=name).first()
            if existing:
                raise ValueError(f"Feature with name '{name}' already exists")
            
            # Create feature
            feature = FeatureModel(name=name, description=description)
            self.session.add(feature)
            self.session.commit()
            self.session.refresh(feature)
            
            logger.log_message(f"Created feature: {name}", level=logging.INFO)
            return feature
            
        except Exception as e:
            self.session.rollback()
            logger.log_message(f"Error creating feature: {str(e)}", level=logging.ERROR)
            raise
    
    async def get_features(self, active_only: bool = True) -> List[FeatureModel]:
        """Retrieve features with optional filtering."""
        try:
            query = self.session.query(FeatureModel)
            if active_only:
                query = query.filter(FeatureModel.is_active == True)
            
            features = query.order_by(FeatureModel.created_at.desc()).all()
            return features
            
        except Exception as e:
            logger.log_message(f"Error retrieving features: {str(e)}", level=logging.ERROR)
            raise

πŸ“‹ Code Quality Standards

1. Type Hints and Documentation

from typing import List, Optional, Dict, Any, Union
from datetime import datetime

async def process_analysis_data(
    data: pd.DataFrame,
    analysis_type: str,
    user_id: Optional[int] = None,
    options: Dict[str, Any] = None
) -> Dict[str, Union[str, List[Any], bool]]:
    """
    Process analysis data with specified parameters.
    
    Args:
        data: Input DataFrame containing the data to analyze
        analysis_type: Type of analysis to perform ("statistical", "ml", "viz")
        user_id: Optional user ID for tracking and personalization
        options: Additional options for analysis configuration
        
    Returns:
        Dictionary containing:
        - status: "success" or "error"
        - result: Analysis results or error message
        - metadata: Additional information about the analysis
        
    Raises:
        ValueError: If analysis_type is not supported
        DataError: If data format is invalid
        
    Example:
        >>> data = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
        >>> result = await process_analysis_data(data, "statistical")
        >>> print(result["status"])
        "success"
    """
    if options is None:
        options = {}
    
    # Implementation...
    return {"status": "success", "result": [], "metadata": {}}

2. Error Handling Patterns

# Comprehensive error handling with logging and user-friendly messages
async def safe_operation(data: Any) -> Dict[str, Any]:
    """
    Template for safe operations with comprehensive error handling.
    """
    try:
        # Validation
        if not data:
            raise ValueError("Data cannot be empty")
        
        # Main operation
        result = await perform_operation(data)
        
        # Success logging
        logger.log_message("Operation completed successfully", level=logging.INFO)
        return {"success": True, "data": result}
        
    except ValueError as e:
        # Input validation errors
        logger.log_message(f"Validation error: {str(e)}", level=logging.WARNING)
        return {"success": False, "error": "Invalid input", "details": str(e)}
        
    except ConnectionError as e:
        # External service errors
        logger.log_message(f"Connection error: {str(e)}", level=logging.ERROR)
        return {"success": False, "error": "Service unavailable", "details": "Please try again later"}
        
    except Exception as e:
        # Unexpected errors
        logger.log_message(f"Unexpected error in safe_operation: {str(e)}", level=logging.ERROR)
        return {"success": False, "error": "Internal error", "details": "Please contact support"}

3. Async/Await Best Practices

import asyncio
from typing import List, Coroutine

# Proper async function definition
async def async_agent_execution(agents: List[str], query: str) -> List[Dict[str, Any]]:
    """Execute multiple agents concurrently."""
    
    # Create coroutines
    tasks = [
        execute_single_agent(agent, query) 
        for agent in agents
    ]
    
    # Execute concurrently with error handling
    results = []
    for task in asyncio.as_completed(tasks):
        try:
            result = await task
            results.append(result)
        except Exception as e:
            logger.log_message(f"Agent execution failed: {e}", level=logging.ERROR)
            results.append({"error": str(e)})
    
    return results

# Database operations with proper session management
async def async_database_operation(session: Session) -> Any:
    """Template for async database operations."""
    try:
        # Use asyncio.to_thread for CPU-bound database operations
        result = await asyncio.to_thread(
            lambda: session.query(Model).filter(...).all()
        )
        return result
    except Exception as e:
        session.rollback()
        raise
    finally:
        session.close()

πŸ”§ Development Workflow

1. Feature Development Process

  1. Plan the Feature:

    # Create feature branch
    git checkout -b feature/new-analysis-agent
    
    # Document requirements
    echo "## New Analysis Agent" >> docs/feature_plan.md
    
  2. Implement Core Logic:

    # Create agent signature
    # Add to agents_config.json
    # Implement business logic in managers/
    # Create route handlers
    
  3. Add Database Changes:

    # Modify models if needed
    alembic revision --autogenerate -m "Add new analysis tables"
    alembic upgrade head
    

3. Release Process

  1. Pre-release Testing:

    # Run full test suite
    pytest tests/
    
    # Test database migrations
    alembic upgrade head
    
    # Test with sample data
    python scripts/test_with_sample_data.py
    
  2. Documentation Updates:

    # Update API documentation
    # Update troubleshooting guide
    # Update changelog
    
  3. Deployment Preparation:

    # Update requirements.txt
    pip freeze > requirements.txt
    
    # Test container build
    docker build -t auto-analyst-backend .
    

πŸ“Š Performance Considerations

1. Database Optimization

# Use query optimization
from sqlalchemy.orm import joinedload

# Bad: N+1 query problem
users = session.query(User).all()
for user in users:
    print(user.chats)  # Separate query for each user

# Good: Eager loading
users = session.query(User).options(joinedload(User.chats)).all()
for user in users:
    print(user.chats)  # No additional queries

# Use pagination for large datasets
def get_paginated_results(session, model, page=1, per_page=20):
    offset = (page - 1) * per_page
    return session.query(model).offset(offset).limit(per_page).all()

2. Async Optimization

# Use connection pooling
from sqlalchemy.pool import QueuePool

engine = create_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=20,
    max_overflow=30
)

# Batch operations
async def batch_process_agents(agents: List[str], queries: List[str]):
    semaphore = asyncio.Semaphore(5)  # Limit concurrent operations
    
    async def process_with_limit(agent, query):
        async with semaphore:
            return await process_agent(agent, query)
    
    tasks = [
        process_with_limit(agent, query) 
        for agent, query in zip(agents, queries)
    ]
    
    return await asyncio.gather(*tasks, return_exceptions=True)

This development workflow guide provides a comprehensive framework for maintaining code quality, consistency, and performance in the Auto-Analyst backend system. Following these patterns ensures that new features integrate seamlessly with the existing architecture while maintaining the high standards of the codebase.