Nihal2000's picture
Update services/llm_service.py
4228dd5 verified
from mistralai import Mistral
import logging
import asyncio
from typing import List, Dict, Any, Optional
import anthropic
import openai
import config
logger = logging.getLogger(__name__)
class LLMService:
def __init__(self):
self.config = config.config
self.anthropic_client = None
self.mistral_client = None
self.openai_async_client = None
self._initialize_clients()
def _initialize_clients(self):
"""Initialize LLM clients"""
try:
if self.config.ANTHROPIC_API_KEY:
self.anthropic_client = anthropic.Anthropic(
api_key=self.config.ANTHROPIC_API_KEY
)
logger.info("Anthropic client initialized")
if self.config.MISTRAL_API_KEY:
self.mistral_client = Mistral( # Standard sync client
api_key=self.config.MISTRAL_API_KEY
)
logger.info("Mistral client initialized")
if self.config.OPENAI_API_KEY:
self.openai_async_client = openai.AsyncOpenAI(
api_key=self.config.OPENAI_API_KEY
)
logger.info("OpenAI client initialized")
# Check if at least one client is initialized
if not any([self.openai_async_client, self.mistral_client, self.anthropic_client]):
logger.warning("No LLM clients could be initialized based on current config. Check API keys.")
else:
logger.info("LLM clients initialized successfully (at least one).")
except Exception as e:
logger.error(f"Error initializing LLM clients: {str(e)}")
raise
async def generate_text(self, prompt: str, model: str = "auto", max_tokens: int = 1000, temperature: float = 0.7) -> str:
"""Generate text using the specified model, with new priority for 'auto'."""
try:
selected_model_name_for_call: str = ""
if model == "auto":
# New Priority: 1. OpenAI, 2. Mistral, 3. Anthropic
if self.openai_async_client and self.config.OPENAI_MODEL:
selected_model_name_for_call = self.config.OPENAI_MODEL
logger.debug(f"Auto-selected OpenAI model: {selected_model_name_for_call}")
return await self._generate_with_openai(prompt, selected_model_name_for_call, max_tokens, temperature)
elif self.mistral_client and self.config.MISTRAL_MODEL:
selected_model_name_for_call = self.config.MISTRAL_MODEL
logger.debug(f"Auto-selected Mistral model: {selected_model_name_for_call}")
return await self._generate_with_mistral(prompt, selected_model_name_for_call, max_tokens, temperature)
elif self.anthropic_client and self.config.ANTHROPIC_MODEL:
selected_model_name_for_call = self.config.ANTHROPIC_MODEL
logger.debug(f"Auto-selected Anthropic model: {selected_model_name_for_call}")
return await self._generate_with_claude(prompt, selected_model_name_for_call, max_tokens, temperature)
else:
logger.error("No LLM clients available for 'auto' mode or default models not configured.")
raise ValueError("No LLM clients available for 'auto' mode or default models not configured.")
elif model.startswith("gpt-") or model.lower().startswith("openai/"):
if not self.openai_async_client:
raise ValueError("OpenAI client not available. Check API key or model prefix.")
actual_model = model.split('/')[-1] if '/' in model else model
return await self._generate_with_openai(prompt, actual_model, max_tokens, temperature)
elif model.startswith("mistral"):
if not self.mistral_client:
raise ValueError("Mistral client not available. Check API key or model prefix.")
return await self._generate_with_mistral(prompt, model, max_tokens, temperature)
elif model.startswith("claude"):
if not self.anthropic_client:
raise ValueError("Anthropic client not available. Check API key or model prefix.")
return await self._generate_with_claude(prompt, model, max_tokens, temperature)
else:
raise ValueError(f"Unsupported model: {model}. Must start with 'gpt-', 'openai/', 'claude', 'mistral', or be 'auto'.")
except Exception as e:
logger.error(f"Error generating text with model '{model}': {str(e)}")
raise
async def _generate_with_openai(self, prompt: str, model_name: str, max_tokens: int, temperature: float) -> str:
"""Generate text using OpenAI (Async)"""
if not self.openai_async_client:
raise RuntimeError("OpenAI async client not initialized.")
try:
logger.debug(f"Generating with OpenAI model: {model_name}, max_tokens: {max_tokens}, temp: {temperature}, prompt: '{prompt[:50]}...'")
response = await self.openai_async_client.chat.completions.create(
model=model_name,
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=temperature
)
if response.choices and response.choices[0].message:
content = response.choices[0].message.content
if content is not None:
return content.strip()
else:
logger.warning(f"OpenAI response message content is None for model {model_name}.")
return ""
else:
logger.warning(f"OpenAI response did not contain expected choices or message for model {model_name}.")
return ""
except Exception as e:
logger.error(f"Error with OpenAI generation (model: {model_name}): {str(e)}")
raise
async def _generate_with_claude(self, prompt: str, model_name: str, max_tokens: int, temperature: float) -> str:
"""Generate text using Anthropic/Claude (Sync via run_in_executor)"""
if not self.anthropic_client:
raise RuntimeError("Anthropic client not initialized.")
try:
logger.debug(f"Generating with Anthropic model: {model_name}, max_tokens: {max_tokens}, temp: {temperature}, prompt: '{prompt[:50]}...'")
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: self.anthropic_client.messages.create(
model=model_name,
max_tokens=max_tokens,
temperature=temperature,
messages=[
{"role": "user", "content": prompt}
]
)
)
if response.content and response.content[0].text:
return response.content[0].text.strip()
else:
logger.warning(f"Anthropic response did not contain expected content for model {model_name}.")
return ""
except Exception as e:
logger.error(f"Error with Anthropic (Claude) generation (model: {model_name}): {str(e)}")
raise
async def _generate_with_mistral(self, prompt: str, model_name: str, max_tokens: int, temperature: float) -> str:
"""Generate text using Mistral (Sync via run_in_executor)"""
if not self.mistral_client:
raise RuntimeError("Mistral client not initialized.")
try:
logger.debug(f"Generating with Mistral model: {model_name}, temp: {temperature}, prompt: '{prompt[:50]}...' (max_tokens: {max_tokens} - note: not directly used by MistralClient.chat)")
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: self.mistral_client.chat(
model=model_name,
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=temperature
)
)
if response.choices and response.choices[0].message:
content = response.choices[0].message.content
if content is not None:
return content.strip()
else:
logger.warning(f"Mistral response message content is None for model {model_name}.")
return ""
else:
logger.warning(f"Mistral response did not contain expected choices or message for model {model_name}.")
return ""
except Exception as e:
logger.error(f"Error with Mistral generation (model: {model_name}): {str(e)}")
raise
async def summarize(self, text: str, style: str = "concise", max_length: Optional[int] = None) -> str:
if not text.strip():
return ""
style_prompts = {
"concise": "Provide a concise summary of the following text, focusing on the main points:",
"detailed": "Provide a detailed summary of the following text, including key details and supporting information:",
"bullet_points": "Summarize the following text as a list of bullet points highlighting the main ideas:",
"executive": "Provide an executive summary of the following text, focusing on key findings and actionable insights:"
}
prompt_template = style_prompts.get(style, style_prompts["concise"])
if max_length:
prompt_template += f" Keep the summary under approximately {max_length} words."
prompt = f"{prompt_template}\n\nText to summarize:\n{text}\n\nSummary:"
try:
summary_max_tokens = (max_length * 2) if max_length else 500
summary = await self.generate_text(prompt, model="auto", max_tokens=summary_max_tokens, temperature=0.3)
return summary.strip()
except Exception as e:
logger.error(f"Error generating summary: {str(e)}")
return "Error generating summary"
async def generate_tags(self, text: str, max_tags: int = 5) -> List[str]:
if not text.strip():
return []
prompt = f"""Generate up to {max_tags} relevant tags for the following text.
Tags should be concise, descriptive keywords or phrases (1-3 words typically) that capture the main topics or themes.
Return only the tags, separated by commas. Do not include any preamble or explanation.
Text:
{text}
Tags:"""
try:
response = await self.generate_text(prompt, model="auto", max_tokens=100, temperature=0.5)
tags = [tag.strip().lower() for tag in response.split(',') if tag.strip()]
tags = [tag for tag in tags if tag and len(tag) > 1 and len(tag) < 50]
return list(dict.fromkeys(tags))[:max_tags]
except Exception as e:
logger.error(f"Error generating tags: {str(e)}")
return []
async def categorize(self, text: str, categories: List[str]) -> str:
if not text.strip() or not categories:
return "Uncategorized"
categories_str = ", ".join([f"'{cat}'" for cat in categories])
prompt = f"""Classify the following text into ONE of these categories: {categories_str}.
Choose the single most appropriate category based on the content and main theme of the text.
Return only the category name as a string, exactly as it appears in the list provided. Do not add any other text or explanation.
Text to classify:
{text}
Category:"""
try:
response = await self.generate_text(prompt, model="auto", max_tokens=50, temperature=0.1)
category_candidate = response.strip().strip("'\"")
for cat in categories:
if cat.lower() == category_candidate.lower():
return cat
logger.warning(f"LLM returned category '{category_candidate}' which is not in the provided list: {categories}. Falling back.")
return categories[0] if categories else "Uncategorized"
except Exception as e:
logger.error(f"Error categorizing text: {str(e)}")
return "Uncategorized"
async def answer_question(self, question: str, context: str, max_context_length: int = 3000) -> str:
if not question.strip():
return "No question provided."
if not context.strip():
return "I don't have enough context to answer this question. Please provide relevant information."
if len(context) > max_context_length:
context = context[:max_context_length] + "..."
logger.warning(f"Context truncated to {max_context_length} characters for question answering.")
prompt = f"""You are an expert Q&A assistant. Your task is to synthesize an answer to the user's question based *only* on the provided source documents.
Analyze all the source documents provided in the context below.
If the information is present, provide a comprehensive answer.
Here are the source documents:
--- START OF CONTEXT ---
{context}
--- END OF CONTEXT ---
Based on the context above, please provide a clear and concise answer to the following question.
Question: {question}
Answer:"""
try:
answer = await self.generate_text(prompt, model="auto", max_tokens=800, temperature=0.5)
return answer.strip()
except Exception as e:
logger.error(f"Error answering question: {str(e)}")
return "I encountered an error while trying to answer your question."
async def extract_key_information(self, text: str) -> Dict[str, Any]:
if not text.strip():
return {}
prompt = f"""Analyze the following text and extract key information.
Provide the response as a JSON object with the following keys:
- "main_topic": (string) The main topic or subject of the text.
- "key_points": (array of strings) A list of 3-5 key points or takeaways.
- "entities": (array of strings) Important people, places, organizations, or products mentioned.
- "sentiment": (string) Overall sentiment of the text (e.g., "positive", "neutral", "negative", "mixed").
- "content_type": (string) The perceived type of content (e.g., "article", "email", "report", "conversation", "advertisement", "other").
If a piece of information is not found or not applicable, use null or an empty array/string as appropriate for the JSON structure.
Text to analyze:
---
{text}
---
JSON Analysis:"""
try:
response_str = await self.generate_text(prompt, model="auto", max_tokens=500, temperature=0.4)
import json
try:
if response_str.startswith("```json"):
response_str = response_str.lstrip("```json").rstrip("```").strip()
info = json.loads(response_str)
expected_keys = {"main_topic", "key_points", "entities", "sentiment", "content_type"}
if not expected_keys.issubset(info.keys()):
logger.warning(f"Extracted information missing some expected keys. Got: {info.keys()}")
return info
except json.JSONDecodeError as je:
logger.error(f"Failed to parse JSON from LLM response for key_information: {je}")
logger.debug(f"LLM Response string was: {response_str}")
info_fallback = {}
lines = response_str.split('\n')
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
key_clean = key.strip().lower().replace(' ', '_')
value_clean = value.strip()
if value_clean:
if key_clean in ["key_points", "entities"] and '[' in value_clean and ']' in value_clean:
try:
info_fallback[key_clean] = [item.strip().strip("'\"") for item in value_clean.strip('[]').split(',') if item.strip()]
except: info_fallback[key_clean] = value_clean
else: info_fallback[key_clean] = value_clean
if info_fallback:
logger.info("Successfully parsed key information using fallback line-based method.")
return info_fallback
return {"error": "Failed to parse LLM output", "raw_response": response_str}
except Exception as e:
logger.error(f"Error extracting key information: {str(e)}")
return {"error": f"General error extracting key information: {str(e)}"}
async def check_availability(self) -> Dict[str, bool]:
"""Check which LLM services are available by making a tiny test call."""
availability = {
"openai": False,
"mistral": False,
"anthropic": False
}
test_prompt = "Hello"
test_max_tokens = 5
test_temp = 0.1
logger.info("Checking LLM availability...")
if self.openai_async_client and self.config.OPENAI_MODEL:
try:
logger.debug(f"Testing OpenAI availability with model {self.config.OPENAI_MODEL}...")
test_response = await self._generate_with_openai(test_prompt, self.config.OPENAI_MODEL, test_max_tokens, test_temp)
availability["openai"] = bool(test_response.strip())
except Exception as e:
logger.warning(f"OpenAI availability check failed for model {self.config.OPENAI_MODEL}: {e}")
logger.info(f"OpenAI available: {availability['openai']}")
if self.mistral_client and self.config.MISTRAL_MODEL:
try:
logger.debug(f"Testing Mistral availability with model {self.config.MISTRAL_MODEL}...")
test_response = await self._generate_with_mistral(test_prompt, self.config.MISTRAL_MODEL, test_max_tokens, test_temp)
availability["mistral"] = bool(test_response.strip())
except Exception as e:
logger.warning(f"Mistral availability check failed for model {self.config.MISTRAL_MODEL}: {e}")
logger.info(f"Mistral available: {availability['mistral']}")
if self.anthropic_client and self.config.ANTHROPIC_MODEL:
try:
logger.debug(f"Testing Anthropic availability with model {self.config.ANTHROPIC_MODEL}...")
test_response = await self._generate_with_claude(test_prompt, self.config.ANTHROPIC_MODEL, test_max_tokens, test_temp)
availability["anthropic"] = bool(test_response.strip())
except Exception as e:
logger.warning(f"Anthropic availability check failed for model {self.config.ANTHROPIC_MODEL}: {e}")
logger.info(f"Anthropic available: {availability['anthropic']}")
logger.info(f"Final LLM Availability: {availability}")
return availability