Multi-LLM-API-Gateway / app /providers.py
Alibrown's picture
Update app/providers.py
f003440 verified
# =============================================================================
# app/providers.py
# 09.03.2026
# LLM + Search Provider Registry + Fallback Chain
# Universal MCP Hub (Sandboxed) - based on PyFundaments Architecture
# Copyright 2026 - Volkan KΓΌcΓΌkbudak
# Apache License V. 2 + ESOL 1.1
# Repo: https://github.com/VolkanSah/Universal-MCP-Hub-sandboxed
# =============================================================================
# ARCHITECTURE NOTE:
# This file lives exclusively in app/ and is ONLY started by app/app.py.
# NO direct access to fundaments/*, .env, or Guardian (main.py).
# All config comes from app/.pyfun via app/config.py.
#
# PROVIDER PRINCIPLE:
# No key = no provider = no tool = no crash.
# Server always starts, just with fewer providers.
# Adding a new provider = update .pyfun + add class here. Never touch mcp.py!
#
# FALLBACK CHAIN:
# Defined in .pyfun per provider via fallback_to field.
# anthropic β†’ fails β†’ gemini β†’ fails β†’ openrouter β†’ fails β†’ RuntimeError
# Visited set prevents infinite loops.
#
# SECURITY NOTE:
# API keys are NEVER logged or included in exception messages.
# All errors are sanitized before propagation β€” only HTTP status codes
# and safe_url (query params stripped) are ever exposed in logs.
#
# HOW TO ADD A NEW LLM PROVIDER β€” 3 steps, nothing else to touch:
# 1. Add class below (copy a dummy, implement complete())
# 2. Register name β†’ class in _PROVIDER_CLASSES dict
# 3. Add [LLM_PROVIDER.yourprovider] block in app/.pyfun
# β†’ env_key, base_url, default_model, fallback_to
#
# DEPENDENCY CHAIN (app/* only, no fundaments!):
# config.py β†’ parses app/.pyfun β€” single source of truth
# providers.py β†’ LLM + Search registry + fallback chain
# tools.py β†’ calls providers.llm_complete() / providers.search()
# mcp.py β†’ calls providers.list_active_llm() / list_active_search()
# =============================================================================
import os
import logging
import httpx
from . import config
logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger("providers")
# =============================================================================
# SECTION 1 β€” Base Provider
# Shared HTTP logic β€” implemented ONCE, reused by all providers.
# =============================================================================
class BaseProvider:
"""
Base class for all LLM providers.
Subclasses only implement complete() β€” HTTP logic lives here.
"""
def __init__(self, name: str, cfg: dict):
self.name = name
self.key = os.getenv(cfg.get("env_key", ""))
self.base_url = cfg.get("base_url", "")
self.fallback = cfg.get("fallback_to", "")
self.timeout = int(config.get_limits().get("REQUEST_TIMEOUT_SEC", "60"))
self.model = cfg.get("default_model", "")
# Safe key hint for debug logs β€” never log the full key
self._key_hint = (
f"{self.key[:4]}...{self.key[-4:]}"
if self.key and len(self.key) > 8
else "***"
)
async def complete(self, prompt: str, model: str, max_tokens: int) -> str:
"""Override in each provider subclass."""
raise NotImplementedError
async def _post(self, url: str, headers: dict, payload: dict) -> dict:
"""
Shared HTTP POST β€” used by all providers.
Raises RuntimeError with sanitized message on non-2xx responses.
API keys are never included in raised exceptions or log output.
"""
safe_url = url.split("?")[0] # strip query params (may contain API keys)
logger.debug(f"POST β†’ {safe_url}")
async with httpx.AsyncClient() as client:
r = await client.post(
url,
headers=headers,
json=payload,
timeout=self.timeout,
)
try:
r.raise_for_status()
except httpx.HTTPStatusError as e:
# Sanitize: only status code + safe_url, never headers or body
raise RuntimeError(
f"HTTP {e.response.status_code} from {safe_url}"
) from None
return r.json()
# =============================================================================
# SECTION 2 β€” LLM Provider Implementations
# Only the API-specific parsing logic differs per provider.
# =============================================================================
class AnthropicProvider(BaseProvider):
"""Anthropic Claude API β€” Messages endpoint."""
async def complete(self, prompt: str, model: str = None, max_tokens: int = 1024) -> str:
cfg = config.get_active_llm_providers().get("anthropic", {})
data = await self._post(
f"{self.base_url}/messages",
headers={
"x-api-key": self.key,
"anthropic-version": cfg.get("api_version_header", "2023-06-01"),
"content-type": "application/json",
},
payload={
"model": model or self.model,
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": prompt}],
},
)
return data["content"][0]["text"]
class GeminiProvider(BaseProvider):
"""Google Gemini API β€” generateContent endpoint."""
async def complete(self, prompt: str, model: str = None, max_tokens: int = 1024) -> str:
m = model or self.model
safe_url = f"{self.base_url}/models/{m}:generateContent"
async with httpx.AsyncClient() as client:
r = await client.post(
safe_url,
params={"key": self.key}, # key in query param, never in logs
json={
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"maxOutputTokens": max_tokens},
},
timeout=self.timeout,
)
try:
r.raise_for_status()
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"HTTP {e.response.status_code} from {safe_url}"
) from None
return r.json()["candidates"][0]["content"]["parts"][0]["text"]
class OpenRouterProvider(BaseProvider):
"""OpenRouter API β€” OpenAI-compatible chat completions endpoint.
Required headers: HTTP-Referer + X-Title (required by OpenRouter for
free models and rate limit attribution).
"""
async def complete(self, prompt: str, model: str = None, max_tokens: int = 1024) -> str:
data = await self._post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.key}",
"HTTP-Referer": os.getenv("APP_URL", "https://huggingface.co"),
"X-Title": os.getenv("HUB_NAME", "Universal MCP Hub"), # required!
"content-type": "application/json",
},
payload={
"model": model or self.model,
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": prompt}],
},
)
return data["choices"][0]["message"]["content"]
class HuggingFaceProvider(BaseProvider):
"""HuggingFace Inference API β€” OpenAI-compatible serverless endpoint.
base_url in .pyfun: https://api-inference.huggingface.co/v1
Model goes in payload, not in URL.
Free tier: max ~8B models. PRO required for 70B+.
"""
async def complete(self, prompt: str, model: str = None, max_tokens: int = 512) -> str:
m = model or self.model
data = await self._post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.key}",
"content-type": "application/json",
},
payload={
"model": m,
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": prompt}],
},
)
return data["choices"][0]["message"]["content"]
# =============================================================================
# DUMMY PROVIDERS β€” copy, uncomment, adapt
# Steps: (1) uncomment class (2) add to _PROVIDER_CLASSES (3) add to .pyfun
# =============================================================================
# --- OpenAI -------------------------------------------------------------------
# .pyfun block to add:
#
# [LLM_PROVIDER.openai]
# active = "true"
# base_url = "https://api.openai.com/v1"
# env_key = "OPENAI_API_KEY"
# default_model = "gpt-4o-mini"
# models = "gpt-4o, gpt-4o-mini, gpt-3.5-turbo"
# fallback_to = ""
# [LLM_PROVIDER.openai_END]
#
# class OpenAIProvider(BaseProvider):
# """OpenAI API β€” OpenAI-compatible chat completions endpoint."""
#
# async def complete(self, prompt: str, model: str = None, max_tokens: int = 1024) -> str:
# data = await self._post(
# f"{self.base_url}/chat/completions",
# headers={
# "Authorization": f"Bearer {self.key}",
# "content-type": "application/json",
# },
# payload={
# "model": model or self.model,
# "max_tokens": max_tokens,
# "messages": [{"role": "user", "content": prompt}],
# },
# )
# return data["choices"][0]["message"]["content"]
# --- Mistral ------------------------------------------------------------------
# .pyfun block to add:
#
# [LLM_PROVIDER.mistral]
# active = "true"
# base_url = "https://api.mistral.ai/v1"
# env_key = "MISTRAL_API_KEY"
# default_model = "mistral-large-latest"
# models = "mistral-large-latest, mistral-small-latest, codestral-latest"
# fallback_to = ""
# [LLM_PROVIDER.mistral_END]
#
# class MistralProvider(BaseProvider):
# """Mistral AI API β€” OpenAI-compatible chat completions endpoint."""
#
# async def complete(self, prompt: str, model: str = None, max_tokens: int = 1024) -> str:
# data = await self._post(
# f"{self.base_url}/chat/completions",
# headers={
# "Authorization": f"Bearer {self.key}",
# "content-type": "application/json",
# },
# payload={
# "model": model or self.model,
# "max_tokens": max_tokens,
# "messages": [{"role": "user", "content": prompt}],
# },
# )
# return data["choices"][0]["message"]["content"]
# --- xAI (Grok) ---------------------------------------------------------------
# .pyfun block to add:
#
# [LLM_PROVIDER.xai]
# active = "true"
# base_url = "https://api.x.ai/v1"
# env_key = "XAI_API_KEY"
# default_model = "grok-3-mini"
# models = "grok-3, grok-3-mini, grok-3-fast"
# fallback_to = ""
# [LLM_PROVIDER.xai_END]
#
# class XAIProvider(BaseProvider):
# """xAI Grok API β€” OpenAI-compatible chat completions endpoint."""
#
# async def complete(self, prompt: str, model: str = None, max_tokens: int = 1024) -> str:
# data = await self._post(
# f"{self.base_url}/chat/completions",
# headers={
# "Authorization": f"Bearer {self.key}",
# "content-type": "application/json",
# },
# payload={
# "model": model or self.model,
# "max_tokens": max_tokens,
# "messages": [{"role": "user", "content": prompt}],
# },
# )
# return data["choices"][0]["message"]["content"]
# =============================================================================
# SECTION 3 β€” Provider Registry
# Built from .pyfun [LLM_PROVIDERS] at initialize().
# Maps provider names β†’ classes.
# To activate a dummy: uncomment class above + add entry here.
# =============================================================================
_PROVIDER_CLASSES = {
"anthropic": AnthropicProvider,
"gemini": GeminiProvider,
"openrouter": OpenRouterProvider,
"huggingface": HuggingFaceProvider,
# "openai": OpenAIProvider, # ← uncomment to activate
# "mistral": MistralProvider, # ← uncomment to activate
# "xai": XAIProvider, # ← uncomment to activate
}
_registry: dict = {}
def initialize() -> None:
"""
Build provider registry from .pyfun [LLM_PROVIDERS].
Called once by mcp.py during startup sequence.
Skips providers with missing ENV keys β€” no crash, just fewer tools.
"""
global _registry
active = config.get_active_llm_providers()
for name, cfg in active.items():
env_key = cfg.get("env_key", "")
if not env_key or not os.getenv(env_key):
logger.info(f"Provider '{name}' skipped β€” ENV key not set.")
continue
cls = _PROVIDER_CLASSES.get(name)
if not cls:
logger.info(f"Provider '{name}' has no handler yet β€” skipped.")
continue
_registry[name] = cls(name, cfg)
logger.info(f"Provider registered: {name}")
# =============================================================================
# SECTION 4 β€” LLM Execution + Fallback Chain
# =============================================================================
async def llm_complete(
prompt: str,
provider_name: str = None,
model: str = None,
max_tokens: int = 1024,
) -> str:
"""
Send prompt to LLM provider with automatic fallback chain.
Fallback order is defined in .pyfun via fallback_to field.
Raises RuntimeError if all providers in the chain fail.
Args:
prompt: Input text to send to the model.
provider_name: Provider name override. Defaults to default_provider
from .pyfun [TOOL.llm_complete].
model: Model name override. Defaults to provider's default_model.
max_tokens: Max tokens in response. Default: 1024.
Returns:
Model response as plain text string.
"""
if not provider_name:
tools_cfg = config.get_active_tools()
provider_name = tools_cfg.get("llm_complete", {}).get("default_provider", "anthropic")
visited = set()
current = provider_name
while current and current not in visited:
visited.add(current)
provider = _registry.get(current)
if not provider:
logger.warning(f"Provider '{current}' not in registry β€” trying fallback.")
else:
try:
result = await provider.complete(prompt, model, max_tokens)
logger.info(f"Response from provider: '{current}'")
return f"[{current}] {result}"
except Exception as e:
# Log only exception type + sanitized message β€” never raw {e}
# which may contain headers, keys, or response bodies
logger.warning(
f"Provider '{current}' failed: {type(e).__name__}: {e} β€” trying fallback."
)
cfg = config.get_active_llm_providers().get(current, {})
current = cfg.get("fallback_to", "")
raise RuntimeError("All providers failed β€” no fallback available.")
# Alias β€” used internally by tools.py
complete = llm_complete
# =============================================================================
# SECTION 5 β€” Search Execution
# Search providers not yet implemented β€” returns placeholder.
# Add BraveProvider, TavilyProvider here when ready.
# =============================================================================
async def search(
query: str,
provider_name: str = None,
max_results: int = 5,
) -> str:
"""
Search the web via configured search provider.
Search providers not yet implemented β€” placeholder until BraveProvider ready.
Args:
query: Search query string.
provider_name: Provider name override (e.g. 'brave', 'tavily').
max_results: Maximum number of results. Default: 5.
Returns:
Formatted search results as plain text string.
"""
# TODO: implement BraveProvider, TavilyProvider
# Same pattern as LLM providers β€” add class + register in _SEARCH_REGISTRY
logger.info(f"web_search called β€” query: '{query}' β€” search providers not yet active.")
return f"Search not yet implemented. Query was: {query}"
# =============================================================================
# SECTION 6 β€” Registry Helpers
# Used by mcp.py for tool registration decisions.
# =============================================================================
def list_active_llm() -> list:
"""Returns list of active LLM provider names."""
return list(_registry.keys())
def list_active_search() -> list:
"""
Returns list of active search provider names.
Empty until search providers are implemented.
"""
# TODO: return list(_search_registry.keys()) when search providers are ready
return []
def get(name: str) -> BaseProvider:
"""Get a specific provider instance by name."""
return _registry.get(name)
# =============================================================================
# Direct execution guard
# =============================================================================
if __name__ == "__main__":
print("WARNING: Run via main.py β†’ app.py, not directly.")