rewardpilot-web-ui / docs /mcp_architecture.md
sammy786's picture
Create docs/mcp_architecture.md
1c67453 verified

A newer version of the Gradio SDK is available: 6.1.0

Upgrade
# MCP Server Implementation Guide

## Overview

RewardPilot implements a multi-agent MCP (Model Context Protocol) architecture with 4 independent microservices that work together to provide intelligent credit card recommendations.

## Architecture Diagram

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ User Interface β”‚ β”‚ (Gradio 6.0 App) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Orchestrator Agent β”‚ β”‚ (Claude 3.5 Sonnet) β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Phase 1: Planning β”‚ β”‚ β”‚ β”‚ - Analyze transaction context β”‚ β”‚ β”‚ β”‚ - Determine required MCP servers β”‚ β”‚ β”‚ β”‚ - Create execution strategy β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β–Ό β–Ό β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Smart Wallet β”‚ β”‚ RAG β”‚ β”‚ Forecast β”‚ β”‚ MCP Server β”‚ β”‚ MCP β”‚ β”‚ MCP Server β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ β–Ό β–Ό β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Gemini 2.0 Flash β”‚ β”‚ (Reasoning & Synthesis) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Final Responseβ”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜


---

## MCP Server 1: Orchestrator

### Purpose
Coordinates all MCP servers and manages the agent workflow.

### Deployment
- **URL:** https://mcp-1st-birthday-rewardpilot-orchestrator.hf.space
- **Stack:** FastAPI + Claude 3.5 Sonnet
- **Hosting:** Hugging Face Spaces

### API Endpoints

#### POST `/recommend`
Get card recommendation for a transaction.

**Request:**
```json
{
  "user_id": "u_alice",
  "merchant": "Whole Foods",
  "mcc": "5411",
  "amount_usd": 127.50,
  "category": "Groceries"
}

Response:

{
  "recommended_card": {
    "card_id": "c_amex_gold",
    "card_name": "American Express Gold",
    "issuer": "American Express"
  },
  "rewards": {
    "points_earned": 510,
    "cash_value": 5.10,
    "earn_rate": "4x points"
  },
  "reasoning": "Amex Gold offers 4x points on U.S. supermarkets...",
  "confidence": 0.95,
  "alternatives": [
    {
      "card_name": "Citi Custom Cash",
      "rewards": 3.82,
      "reason": "5% but monthly cap already hit"
    }
  ],
  "warnings": [
    "You're at $450/$1500 monthly cap. 3 more grocery trips available."
  ]
}

Implementation

# orchestrator_server.py
from fastapi import FastAPI, HTTPException
from anthropic import Anthropic
import httpx
import asyncio

app = FastAPI(title="RewardPilot Orchestrator")
anthropic = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

@app.post("/recommend")
async def recommend_card(request: TransactionRequest):
    # Phase 1: Planning with Claude
    plan = await create_execution_plan(request)
    
    # Phase 2: Parallel MCP calls
    mcp_results = await execute_mcp_calls(plan)
    
    # Phase 3: Reasoning with Gemini
    explanation = await synthesize_reasoning(request, mcp_results)
    
    # Phase 4: Format response
    return format_recommendation(mcp_results, explanation)

async def create_execution_plan(request: TransactionRequest):
    """Claude analyzes transaction and plans MCP calls"""
    prompt = f"""
    Analyze this transaction and determine which MCP servers to call:
    
    Transaction:
    - Merchant: {request.merchant}
    - Category: {request.category}
    - Amount: ${request.amount_usd}
    
    Available MCP servers:
    1. smart_wallet - Card recommendations and reward calculations
    2. rewards_rag - Semantic search of card benefits
    3. spend_forecast - Spending predictions and cap warnings
    
    Return a JSON plan with:
    - strategy: optimization approach
    - mcp_calls: list of servers to call (priority order)
    - confidence_threshold: minimum confidence for recommendation
    """
    
    response = anthropic.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )
    
    return json.loads(response.content[0].text)

async def execute_mcp_calls(plan: dict):
    """Call MCP servers in parallel"""
    tasks = []
    
    for mcp_call in plan["mcp_calls"]:
        if mcp_call["service"] == "smart_wallet":
            tasks.append(call_smart_wallet(request))
        elif mcp_call["service"] == "rewards_rag":
            tasks.append(call_rewards_rag(request))
        elif mcp_call["service"] == "spend_forecast":
            tasks.append(call_forecast(request))
    
    results = await asyncio.gather(*tasks)
    return dict(zip([c["service"] for c in plan["mcp_calls"]], results))

MCP Server 2: Smart Wallet

Purpose

Analyzes user's credit cards and calculates optimal rewards.

Deployment

API Endpoints

POST /analyze

Analyze transaction against user's wallet.

Request:

{
  "user_id": "u_alice",
  "merchant": "Whole Foods",
  "mcc": "5411",
  "amount_usd": 127.50
}

Response:

{
  "recommended_card": {
    "card_id": "c_amex_gold",
    "card_name": "American Express Gold",
    "rewards_earned": 5.10,
    "earn_rate": "4x points",
    "points_earned": 510
  },
  "all_cards_comparison": [
    {
      "card_name": "Amex Gold",
      "rewards": 5.10,
      "rank": 1
    },
    {
      "card_name": "Citi Custom Cash",
      "rewards": 3.82,
      "rank": 2,
      "note": "Cap already hit this month"
    }
  ]
}

Implementation

# smart_wallet_server.py
from fastapi import FastAPI
from sqlalchemy import create_engine
from typing import List

app = FastAPI(title="Smart Wallet MCP")

class CardAnalyzer:
    def __init__(self, user_id: str):
        self.user_id = user_id
        self.cards = self.load_user_cards()
    
    def analyze_transaction(self, merchant: str, mcc: str, amount: float):
        """Calculate rewards for all cards"""
        results = []
        
        for card in self.cards:
            # Get reward rate for this MCC
            reward_rate = self.get_reward_rate(card, mcc)
            
            # Check spending caps
            current_spending = self.get_monthly_spending(card, mcc)
            cap_remaining = card.monthly_cap - current_spending
            
            # Calculate rewards
            if cap_remaining >= amount:
                rewards = amount * reward_rate
            else:
                # Partial cap scenario
                rewards = (cap_remaining * reward_rate) + 
                         ((amount - cap_remaining) * card.base_rate)
            
            results.append({
                "card": card,
                "rewards": rewards,
                "effective_rate": rewards / amount,
                "cap_status": {
                    "current": current_spending,
                    "limit": card.monthly_cap,
                    "remaining": cap_remaining
                }
            })
        
        # Sort by rewards (descending)
        results.sort(key=lambda x: x["rewards"], reverse=True)
        
        return results[0]  # Return best card

MCP Server 3: Rewards RAG

Purpose

Semantic search across credit card benefit documents.

Deployment

API Endpoints

POST /query

Search card benefits with natural language.

Request:

{
  "query": "Does Amex Gold work at Costco for groceries?",
  "card_name": "American Express Gold",
  "top_k": 3
}

Response:

{
  "answer": "No, American Express cards are not accepted at Costco warehouse locations due to Costco's exclusive Visa agreement. However, Amex Gold works at Costco.com for online orders.",
  "sources": [
    {
      "card_name": "American Express Gold",
      "content": "Merchant acceptance: Not accepted at Costco warehouses...",
      "relevance_score": 0.92
    }
  ]
}

Implementation

See docs/llamaindex_setup.md for detailed RAG implementation.


MCP Server 4: Spend Forecast

Purpose

ML-based spending predictions and cap warnings.

Deployment

API Endpoints

POST /predict

Predict spending for next period.

Request:

{
  "user_id": "u_alice",
  "card_id": "c_amex_gold",
  "category": "Groceries",
  "horizon_days": 30
}

Response:

{
  "predicted_spending": 520.50,
  "confidence_interval": [480.00, 560.00],
  "warnings": [
    {
      "type": "cap_warning",
      "message": "Likely to exceed $500 monthly cap",
      "probability": 0.78,
      "suggested_action": "Switch to Citi Custom Cash after $500"
    }
  ]
}

Implementation

# forecast_server.py
from fastapi import FastAPI
from sklearn.ensemble import RandomForestRegressor
import numpy as np

app = FastAPI(title="Spend Forecast MCP")

class SpendingForecaster:
    def __init__(self):
        self.model = RandomForestRegressor(n_estimators=100)
    
    def predict(self, user_id: str, category: str, horizon_days: int):
        """Predict spending for next N days"""
        # Load historical data
        history = self.load_user_history(user_id, category)
        
        # Feature engineering
        features = self.extract_features(history)
        
        # Predict
        prediction = self.model.predict(features)
        
        # Calculate confidence interval
        predictions = [tree.predict(features) for tree in self.model.estimators_]
        lower = np.percentile(predictions, 5)
        upper = np.percentile(predictions, 95)
        
        return {
            "predicted_spending": float(prediction[0]),
            "confidence_interval": [float(lower), float(upper)]
        }

Communication Flow

Sequence Diagram

User -> Gradio: Enter transaction
Gradio -> Orchestrator: POST /recommend
Orchestrator -> Claude: Create execution plan
Claude -> Orchestrator: {plan: call all 3 MCPs}

Orchestrator -> Smart Wallet: POST /analyze
Orchestrator -> RAG: POST /query
Orchestrator -> Forecast: POST /predict

Smart Wallet -> Orchestrator: {best_card: Amex Gold, rewards: 5.10}
RAG -> Orchestrator: {benefits: "4x on groceries..."}
Forecast -> Orchestrator: {warning: "Near cap"}

Orchestrator -> Gemini: Synthesize results
Gemini -> Orchestrator: {explanation: "Use Amex Gold because..."}

Orchestrator -> Gradio: Final recommendation
Gradio -> User: Display result

Deployment Instructions

1. Deploy Each MCP Server to Hugging Face

# Clone template
git clone https://huggingface.co/spaces/YOUR_USERNAME/rewardpilot-orchestrator

# Add files
cp orchestrator_server.py app.py
cp requirements.txt .

# Create Space on HF
huggingface-cli repo create rewardpilot-orchestrator --type space --space_sdk gradio

# Push
git add .
git commit -m "Deploy orchestrator"
git push

2. Set Environment Variables

In each Space's settings, add:

ANTHROPIC_API_KEY=sk-ant-xxxxx
GEMINI_API_KEY=AIzaSyxxxxx
OPENAI_API_KEY=sk-xxxxx

3. Configure Endpoints

In main app.py:

MCP_ENDPOINTS = {
    "orchestrator": "https://mcp-1st-birthday-rewardpilot-orchestrator.hf.space",
    "smart_wallet": "https://mcp-1st-birthday-rewardpilot-smart-wallet.hf.space",
    "rewards_rag": "https://mcp-1st-birthday-rewardpilot-rewards-rag.hf.space",
    "forecast": "https://mcp-1st-birthday-rewardpilot-spend-forecast.hf.space"
}

Error Handling

Graceful Degradation

async def call_mcp_with_fallback(service_name: str, request_data: dict):
    """Call MCP server with timeout and fallback"""
    try:
        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.post(
                MCP_ENDPOINTS[service_name],
                json=request_data
            )
            response.raise_for_status()
            return response.json()
    except httpx.TimeoutException:
        logger.error(f"{service_name} timeout")
        return get_fallback_response(service_name)
    except httpx.HTTPError as e:
        logger.error(f"{service_name} error: {e}")
        return get_fallback_response(service_name)

Monitoring

Health Checks

@app.get("/health")
async def health_check():
    """Check status of all MCP servers"""
    statuses = {}
    
    for service, url in MCP_ENDPOINTS.items():
        try:
            async with httpx.AsyncClient(timeout=5.0) as client:
                response = await client.get(f"{url}/health")
                statuses[service] = {
                    "status": "healthy" if response.status_code == 200 else "unhealthy",
                    "latency_ms": response.elapsed.total_seconds() * 1000
                }
        except Exception as e:
            statuses[service] = {"status": "down", "error": str(e)}
    
    return statuses

Performance Optimization

Caching Strategy

from functools import lru_cache
import redis

# Redis cache for frequent queries
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

@lru_cache(maxsize=1000)
def get_card_benefits(card_name: str):
    """Cache card benefits for 1 hour"""
    cache_key = f"benefits:{card_name}"
    
    # Check cache
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # Fetch from RAG
    result = call_rewards_rag({"query": f"Get all benefits for {card_name}"})
    
    # Cache for 1 hour
    redis_client.setex(cache_key, 3600, json.dumps(result))
    
    return result

Testing

Integration Tests

import pytest
import httpx

@pytest.mark.asyncio
async def test_orchestrator_end_to_end():
    """Test full recommendation flow"""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{MCP_ENDPOINTS['orchestrator']}/recommend",
            json={
                "user_id": "test_user",
                "merchant": "Whole Foods",
                "amount_usd": 100.00
            }
        )
        
        assert response.status_code == 200
        data = response.json()
        assert "recommended_card" in data
        assert "rewards" in data
        assert "reasoning" in data

Next Steps

  1. Scale MCP servers - Add load balancing
  2. Add authentication - JWT tokens for API access
  3. Implement webhooks - Real-time transaction notifications
  4. Add more MCP servers - Travel optimization, business expenses, etc.

Related Documentation:


---