NiWaRe's picture
separate client and server api key for http and weave tracing
7d190a0
raw
history blame
5.93 kB
"""
Authentication middleware for W&B MCP Server.
Implements Bearer token validation for HTTP transport as per
MCP specification: https://modelcontextprotocol.io/specification/draft/basic/authorization
Clients send their W&B API keys as Bearer tokens, which the server
then uses for all W&B operations on behalf of that client.
"""
import os
import logging
import re
from typing import Optional, Dict, Any
from fastapi import HTTPException, Request, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import JSONResponse
logger = logging.getLogger(__name__)
# Bearer token security scheme
bearer_scheme = HTTPBearer(auto_error=False)
class MCPAuthConfig:
"""
Configuration for MCP authentication.
For HTTP transport: Accepts any W&B API key as a Bearer token.
The server uses the client's token for all W&B operations.
"""
pass # Simple config, no OAuth metadata needed
def is_valid_wandb_api_key(token: str) -> bool:
"""
Check if a token looks like a valid W&B API key format.
W&B API keys are typically 40 characters but we'll be permissive.
"""
if not token:
return False
# Strip any whitespace that might have been included
token = token.strip()
# Be permissive - accept keys between 20 and 100 characters
# The actual W&B API will validate the exact format
if len(token) < 20 or len(token) > 100:
return False
# Basic validation - W&B keys contain alphanumeric and some special characters
if re.match(r'^[a-zA-Z0-9_\-\.]+$', token):
return True
return False
async def validate_bearer_token(
credentials: Optional[HTTPAuthorizationCredentials],
config: MCPAuthConfig
) -> str:
"""
Validate Bearer token (W&B API key) for MCP access.
Accepts any valid-looking W&B API key. The actual validation
happens when the key is used to call W&B APIs.
Returns:
The W&B API key to use for operations
Raises:
HTTPException: 401 Unauthorized with WWW-Authenticate header
"""
if not credentials or not credentials.credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authorization required - please provide your W&B API key as a Bearer token",
headers={
"WWW-Authenticate": 'Bearer realm="W&B MCP"'
}
)
token = credentials.credentials.strip() # Strip any whitespace
# Basic format validation
if not is_valid_wandb_api_key(token):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid W&B API key format. Got {len(token)} characters. "
f"Get your key at: https://wandb.ai/authorize",
headers={
"WWW-Authenticate": 'Bearer realm="W&B MCP", error="invalid_token"'
}
)
logger.debug(f"Bearer token validated successfully (length: {len(token)})")
return token
async def mcp_auth_middleware(request: Request, call_next):
"""
FastAPI middleware for MCP authentication on HTTP transport.
Only applies to MCP endpoints (/mcp/*).
Extracts the client's W&B API key from the Bearer token and stores it
for use in W&B operations.
"""
# Only apply auth to MCP endpoints
if not request.url.path.startswith("/mcp"):
return await call_next(request)
# Skip auth if explicitly disabled (development only)
if os.environ.get("MCP_AUTH_DISABLED", "false").lower() == "true":
logger.warning("MCP authentication is disabled - endpoints are publicly accessible")
return await call_next(request)
config = MCPAuthConfig()
try:
# Extract bearer token from Authorization header
authorization = request.headers.get("Authorization", "")
credentials = None
if authorization.startswith("Bearer "):
# Remove "Bearer " prefix and strip any whitespace
token = authorization[7:].strip()
credentials = HTTPAuthorizationCredentials(
scheme="Bearer",
credentials=token
)
# Validate and get the W&B API key
wandb_api_key = await validate_bearer_token(credentials, config)
# Make sure the key is clean (no extra whitespace or encoding issues)
wandb_api_key = wandb_api_key.strip()
# Store the API key in request state for W&B operations
request.state.wandb_api_key = wandb_api_key
# Set the API key for this request
# Note: We don't restore the original value because with streaming responses,
# the tool execution happens after call_next returns. Each request sets its own key.
os.environ["WANDB_API_KEY"] = wandb_api_key
# Debug logging
logger.debug(f"Auth middleware: Set WANDB_API_KEY with length={len(wandb_api_key)}, "
f"is_40_chars={len(wandb_api_key) == 40}")
# Continue processing without restoring the env var
# Each request will set its own API key
response = await call_next(request)
return response
except HTTPException as e:
# Return proper error response
return JSONResponse(
status_code=e.status_code,
content={"error": e.detail},
headers=e.headers
)
except Exception as e:
logger.error(f"Authentication error: {e}")
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"error": "Authentication failed"},
headers={
"WWW-Authenticate": 'Bearer realm="W&B MCP"'
}
)
# OAuth-related functions removed - see AUTH_README.md for details