|
"""Utility functions for the MCP Hub project."""
|
|
|
|
import json
|
|
import re
|
|
from typing import Dict, Any, List, Optional, Union
|
|
from openai import OpenAI, AsyncOpenAI
|
|
from .config import api_config, model_config
|
|
from .exceptions import APIError, ValidationError
|
|
from .logging_config import logger
|
|
import aiohttp
|
|
from huggingface_hub import InferenceClient
|
|
|
|
|
|
def create_nebius_client() -> OpenAI:
|
|
"""Create and return a Nebius OpenAI client."""
|
|
return OpenAI(
|
|
base_url=api_config.nebius_base_url,
|
|
api_key=api_config.nebius_api_key,
|
|
)
|
|
|
|
def create_async_nebius_client() -> AsyncOpenAI:
|
|
"""Create and return an async Nebius OpenAI client."""
|
|
return AsyncOpenAI(
|
|
base_url=api_config.nebius_base_url,
|
|
api_key=api_config.nebius_api_key,
|
|
)
|
|
|
|
def create_llm_client() -> Union[OpenAI, object]:
|
|
"""Create and return an LLM client based on the configured provider."""
|
|
if api_config.llm_provider == "nebius":
|
|
return create_nebius_client()
|
|
elif api_config.llm_provider == "openai":
|
|
return OpenAI(api_key=api_config.openai_api_key)
|
|
elif api_config.llm_provider == "anthropic":
|
|
try:
|
|
import anthropic
|
|
return anthropic.Anthropic(api_key=api_config.anthropic_api_key)
|
|
except ImportError:
|
|
raise APIError("Anthropic", "anthropic package not installed. Install with: pip install anthropic")
|
|
elif api_config.llm_provider == "huggingface":
|
|
|
|
try:
|
|
|
|
return InferenceClient(
|
|
provider="hf-inference",
|
|
api_key=api_config.huggingface_api_key,
|
|
)
|
|
except Exception:
|
|
|
|
return InferenceClient(
|
|
token=api_config.huggingface_api_key,
|
|
)
|
|
else:
|
|
raise APIError("Config", f"Unsupported LLM provider: {api_config.llm_provider}")
|
|
|
|
def create_async_llm_client() -> Union[AsyncOpenAI, object]:
|
|
"""Create and return an async LLM client based on the configured provider."""
|
|
if api_config.llm_provider == "nebius":
|
|
return create_async_nebius_client()
|
|
elif api_config.llm_provider == "openai":
|
|
return AsyncOpenAI(api_key=api_config.openai_api_key)
|
|
elif api_config.llm_provider == "anthropic":
|
|
try:
|
|
import anthropic
|
|
return anthropic.AsyncAnthropic(api_key=api_config.anthropic_api_key)
|
|
except ImportError:
|
|
raise APIError("Anthropic", "anthropic package not installed. Install with: pip install anthropic")
|
|
elif api_config.llm_provider == "huggingface":
|
|
|
|
try:
|
|
|
|
return InferenceClient(
|
|
provider="hf-inference",
|
|
api_key=api_config.huggingface_api_key,
|
|
)
|
|
except Exception:
|
|
|
|
return InferenceClient(
|
|
token=api_config.huggingface_api_key,
|
|
)
|
|
else:
|
|
raise APIError("Config", f"Unsupported LLM provider: {api_config.llm_provider}")
|
|
|
|
def validate_non_empty_string(value: str, field_name: str) -> None:
|
|
"""Validate that a string is not empty or None."""
|
|
if not value or not value.strip():
|
|
raise ValidationError(f"{field_name} cannot be empty.")
|
|
|
|
def extract_json_from_text(text: str) -> Dict[str, Any]:
|
|
"""Extract JSON object from text that may contain markdown fences."""
|
|
|
|
if text.startswith("```"):
|
|
parts = text.split("```")
|
|
if len(parts) >= 3:
|
|
text = parts[1].strip()
|
|
else:
|
|
text = text.strip("```").strip()
|
|
|
|
|
|
start_idx = text.find("{")
|
|
end_idx = text.rfind("}")
|
|
|
|
if start_idx == -1 or end_idx == -1 or end_idx < start_idx:
|
|
raise ValidationError("Failed to locate JSON object in text.")
|
|
|
|
json_candidate = text[start_idx:end_idx + 1]
|
|
|
|
try:
|
|
return json.loads(json_candidate)
|
|
except json.JSONDecodeError as e:
|
|
raise ValidationError(f"Failed to parse JSON: {str(e)}")
|
|
|
|
def extract_urls_from_text(text: str) -> List[str]:
|
|
"""Extract URLs from text using regex."""
|
|
url_pattern = r"(https?://[^\s]+)"
|
|
return re.findall(url_pattern, text)
|
|
|
|
def make_nebius_completion(
|
|
model: str,
|
|
messages: List[Dict[str, str]],
|
|
temperature: float = 0.6,
|
|
response_format: Optional[Dict[str, Any]] = None
|
|
) -> str:
|
|
"""Make a completion request to Nebius and return the content."""
|
|
client = create_nebius_client()
|
|
|
|
try:
|
|
kwargs = {
|
|
"model": model,
|
|
"messages": messages,
|
|
"temperature": temperature,
|
|
}
|
|
|
|
if response_format:
|
|
kwargs["response_format"] = response_format
|
|
|
|
completion = client.chat.completions.create(**kwargs)
|
|
return completion.choices[0].message.content.strip()
|
|
except Exception as e:
|
|
raise APIError("Nebius", str(e))
|
|
|
|
async def make_async_nebius_completion(
|
|
model: str,
|
|
messages: List[Dict[str, Any]],
|
|
temperature: float = 0.0,
|
|
response_format: Optional[Dict[str, Any]] = None,
|
|
) -> str:
|
|
"""Make an async completion request to Nebius API."""
|
|
try:
|
|
client = create_async_nebius_client()
|
|
|
|
kwargs = {
|
|
"model": model,
|
|
"messages": messages,
|
|
"temperature": temperature
|
|
}
|
|
|
|
if response_format:
|
|
kwargs["response_format"] = response_format
|
|
|
|
response = await client.chat.completions.create(**kwargs)
|
|
|
|
if not response.choices:
|
|
raise APIError("Nebius", "No completion choices returned")
|
|
|
|
content = response.choices[0].message.content
|
|
if content is None:
|
|
raise APIError("Nebius", "Empty response content")
|
|
|
|
return content.strip()
|
|
|
|
except Exception as e:
|
|
if isinstance(e, APIError):
|
|
raise
|
|
raise APIError("Nebius", f"API call failed: {str(e)}")
|
|
|
|
def make_llm_completion(
|
|
model: str,
|
|
messages: List[Dict[str, str]],
|
|
temperature: float = 0.6,
|
|
response_format: Optional[Dict[str, Any]] = None
|
|
) -> str:
|
|
"""Make a completion request using the configured LLM provider."""
|
|
provider = api_config.llm_provider
|
|
|
|
try:
|
|
if provider == "nebius":
|
|
return make_nebius_completion(model, messages, temperature, response_format)
|
|
|
|
elif provider == "openai":
|
|
client = create_llm_client()
|
|
kwargs = {
|
|
"model": model,
|
|
"messages": messages,
|
|
"temperature": temperature,
|
|
}
|
|
|
|
if response_format and response_format.get("type") == "json_object":
|
|
kwargs["response_format"] = {"type": "json_object"}
|
|
completion = client.chat.completions.create(**kwargs)
|
|
return completion.choices[0].message.content.strip()
|
|
|
|
elif provider == "anthropic":
|
|
client = create_llm_client()
|
|
|
|
anthropic_messages = []
|
|
system_message = None
|
|
|
|
for msg in messages:
|
|
if msg["role"] == "system":
|
|
system_message = msg["content"]
|
|
else:
|
|
anthropic_messages.append({
|
|
"role": msg["role"],
|
|
"content": msg["content"]
|
|
})
|
|
|
|
kwargs = {
|
|
"model": model,
|
|
"messages": anthropic_messages,
|
|
"temperature": temperature,
|
|
"max_tokens": 1000,
|
|
}
|
|
if system_message:
|
|
kwargs["system"] = system_message
|
|
|
|
response = client.messages.create(**kwargs)
|
|
return response.content[0].text.strip()
|
|
|
|
elif provider == "huggingface":
|
|
|
|
hf_error = None
|
|
try:
|
|
client = create_llm_client()
|
|
|
|
|
|
|
|
|
|
try:
|
|
response = client.chat.completions.create(
|
|
model=model,
|
|
messages=messages,
|
|
temperature=temperature,
|
|
max_tokens=1000,
|
|
)
|
|
|
|
|
|
if hasattr(response, 'choices') and response.choices:
|
|
return response.choices[0].message.content.strip()
|
|
else:
|
|
return str(response).strip()
|
|
|
|
except Exception as e1:
|
|
hf_error = e1
|
|
|
|
|
|
try:
|
|
response = client.chat_completion(
|
|
messages=messages,
|
|
model=model,
|
|
temperature=temperature,
|
|
max_tokens=1000,
|
|
)
|
|
|
|
|
|
if hasattr(response, 'generated_text'):
|
|
return response.generated_text.strip()
|
|
elif isinstance(response, dict) and 'generated_text' in response:
|
|
return response['generated_text'].strip()
|
|
elif isinstance(response, list) and len(response) > 0:
|
|
if isinstance(response[0], dict) and 'generated_text' in response[0]:
|
|
return response[0]['generated_text'].strip()
|
|
|
|
return str(response).strip()
|
|
|
|
except Exception as e2:
|
|
|
|
hf_error = f"Method 1: {str(e1)}. Method 2: {str(e2)}"
|
|
raise APIError("HuggingFace", f"All HuggingFace methods failed. {hf_error}")
|
|
|
|
except Exception as e:
|
|
|
|
if hf_error is None:
|
|
hf_error = str(e)
|
|
logger.warning(f"HuggingFace API failed: {hf_error}, falling back to Nebius")
|
|
|
|
try:
|
|
|
|
nebius_model = model_config.get_model_for_provider("question_enhancer", "nebius")
|
|
return make_nebius_completion(nebius_model, messages, temperature, response_format)
|
|
except Exception as nebius_error:
|
|
raise APIError("HuggingFace", f"HuggingFace failed: {hf_error}. Nebius fallback also failed: {str(nebius_error)}")
|
|
|
|
else:
|
|
raise APIError("Config", f"Unsupported LLM provider: {provider}")
|
|
|
|
except Exception as e:
|
|
raise APIError(provider.title(), f"Completion failed: {str(e)}")
|
|
|
|
|
|
async def make_async_llm_completion(
|
|
model: str,
|
|
messages: List[Dict[str, Any]],
|
|
temperature: float = 0.0,
|
|
response_format: Optional[Dict[str, Any]] = None,
|
|
) -> str:
|
|
"""Make an async completion request using the configured LLM provider."""
|
|
provider = api_config.llm_provider
|
|
|
|
try:
|
|
if provider == "nebius":
|
|
return await make_async_nebius_completion(model, messages, temperature, response_format)
|
|
|
|
elif provider == "openai":
|
|
client = create_async_llm_client()
|
|
kwargs = {
|
|
"model": model,
|
|
"messages": messages,
|
|
"temperature": temperature
|
|
}
|
|
if response_format and response_format.get("type") == "json_object":
|
|
kwargs["response_format"] = {"type": "json_object"}
|
|
|
|
response = await client.chat.completions.create(**kwargs)
|
|
|
|
if not response.choices:
|
|
raise APIError("OpenAI", "No completion choices returned")
|
|
|
|
content = response.choices[0].message.content
|
|
if content is None:
|
|
raise APIError("OpenAI", "Empty response content")
|
|
|
|
return content.strip()
|
|
|
|
elif provider == "anthropic":
|
|
client = create_async_llm_client()
|
|
anthropic_messages = []
|
|
system_message = None
|
|
|
|
for msg in messages:
|
|
if msg["role"] == "system":
|
|
system_message = msg["content"]
|
|
else:
|
|
anthropic_messages.append({
|
|
"role": msg["role"],
|
|
"content": msg["content"]
|
|
})
|
|
|
|
kwargs = {
|
|
"model": model,
|
|
"messages": anthropic_messages,
|
|
"temperature": temperature,
|
|
"max_tokens": 1000,
|
|
}
|
|
if system_message:
|
|
kwargs["system"] = system_message
|
|
|
|
response = await client.messages.create(**kwargs)
|
|
return response.content[0].text.strip()
|
|
|
|
elif provider == "huggingface":
|
|
|
|
logger.warning("HuggingFace does not support async operations, falling back to Nebius")
|
|
|
|
try:
|
|
|
|
nebius_model = model_config.get_model_for_provider("question_enhancer", "nebius")
|
|
return await make_async_nebius_completion(nebius_model, messages, temperature, response_format)
|
|
except Exception as nebius_error:
|
|
raise APIError("HuggingFace", f"HuggingFace async not supported. Nebius fallback failed: {str(nebius_error)}")
|
|
|
|
else:
|
|
raise APIError("Config", f"Unsupported LLM provider: {provider}")
|
|
|
|
except Exception as e:
|
|
raise APIError(provider.title(), f"Async completion failed: {str(e)}")
|
|
|
|
async def async_tavily_search(query: str, max_results: int = 3) -> Dict[str, Any]:
|
|
"""Perform async web search using Tavily API."""
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
url = "https://api.tavily.com/search"
|
|
headers = {
|
|
"Content-Type": "application/json"
|
|
}
|
|
data = {
|
|
"api_key": api_config.tavily_api_key,
|
|
"query": query,
|
|
"search_depth": "basic",
|
|
"max_results": max_results,
|
|
"include_answer": True
|
|
}
|
|
|
|
async with session.post(url, headers=headers, json=data) as response:
|
|
if response.status != 200:
|
|
raise APIError("Tavily", f"HTTP {response.status}: {await response.text()}")
|
|
|
|
result = await response.json()
|
|
return {
|
|
"query": result.get("query", query),
|
|
"tavily_answer": result.get("answer"),
|
|
"results": result.get("results", []),
|
|
"data_source": "Tavily Search API",
|
|
}
|
|
|
|
except aiohttp.ClientError as e:
|
|
raise APIError("Tavily", f"HTTP request failed: {str(e)}")
|
|
except Exception as e:
|
|
if isinstance(e, APIError):
|
|
raise
|
|
raise APIError("Tavily", f"Search failed: {str(e)}")
|
|
|
|
def format_search_results(results: List[Dict[str, Any]]) -> str:
|
|
"""Format search results into a readable string."""
|
|
if not results:
|
|
return "No search results found."
|
|
|
|
snippets = []
|
|
for idx, item in enumerate(results, 1):
|
|
title = item.get("title", "No Title")
|
|
url = item.get("url", "")
|
|
content = item.get("content", "")
|
|
|
|
snippet = f"Result {idx}:\nTitle: {title}\nURL: {url}\nSnippet: {content}\n"
|
|
snippets.append(snippet)
|
|
|
|
return "\n".join(snippets).strip()
|
|
|
|
def create_apa_citation(url: str, year: str = None) -> str:
|
|
"""Create a simple APA-style citation from a URL."""
|
|
if not year:
|
|
year = api_config.current_year
|
|
|
|
try:
|
|
domain = url.split("/")[2]
|
|
title = domain.replace("www.", "").split(".")[0].capitalize()
|
|
return f"{title}. ({year}). Retrieved from {url}"
|
|
except (IndexError, AttributeError):
|
|
return f"Unknown Source. ({year}). Retrieved from {url}"
|
|
|