|
import os |
|
import torch |
|
from typing import Dict, List, Any, Union, Optional, Tuple |
|
import numpy as np |
|
from pathlib import Path |
|
import requests |
|
import json |
|
import time |
|
|
|
from utils.config import AI_MODELS |
|
from utils.logging import get_logger, log_performance |
|
from utils.error_handling import handle_ai_model_exceptions, AIModelError, ValidationError |
|
from utils.semantic_search import search_content, find_similar_items, detect_duplicates, cluster_content, build_knowledge_graph, identify_trends, identify_information_gaps |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
os.environ["CUDA_VISIBLE_DEVICES"] = "" if not torch.cuda.is_available() else "0" |
|
|
|
|
|
MODEL_CACHE = {} |
|
|
|
@handle_ai_model_exceptions |
|
def get_model(task: str, model_name: Optional[str] = None): |
|
""" |
|
Load and cache AI models |
|
|
|
Args: |
|
task: Task type (text_generation, question_answering, image_captioning, etc.) |
|
model_name: Name of the model on HuggingFace (optional, uses default from config if None) |
|
|
|
Returns: |
|
Loaded model and tokenizer/processor |
|
|
|
Raises: |
|
AIModelError: If there's an error loading the model |
|
ValidationError: If the task is not supported |
|
""" |
|
|
|
if model_name is None: |
|
if task not in AI_MODELS: |
|
logger.error(f"Unsupported task: {task}") |
|
raise ValidationError(f"Unsupported task: {task}") |
|
model_name = AI_MODELS[task]["name"] |
|
|
|
cache_key = f"{model_name}_{task}" |
|
|
|
|
|
if cache_key in MODEL_CACHE: |
|
logger.debug(f"Using cached model for {task}: {model_name}") |
|
return MODEL_CACHE[cache_key] |
|
|
|
logger.info(f"Loading model for {task}: {model_name}") |
|
start_time = time.time() |
|
|
|
try: |
|
if task == "text_generation": |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForCausalLM.from_pretrained(model_name) |
|
MODEL_CACHE[cache_key] = (model, tokenizer) |
|
|
|
elif task == "question_answering": |
|
from transformers import AutoModelForQuestionAnswering, AutoTokenizer |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForQuestionAnswering.from_pretrained(model_name) |
|
MODEL_CACHE[cache_key] = (model, tokenizer) |
|
|
|
elif task == "image_captioning": |
|
from transformers import BlipProcessor, BlipForConditionalGeneration |
|
processor = BlipProcessor.from_pretrained(model_name) |
|
model = BlipForConditionalGeneration.from_pretrained(model_name) |
|
MODEL_CACHE[cache_key] = (model, processor) |
|
|
|
elif task == "speech_to_text": |
|
from transformers import WhisperProcessor, WhisperForConditionalGeneration |
|
processor = WhisperProcessor.from_pretrained(model_name) |
|
model = WhisperForConditionalGeneration.from_pretrained(model_name) |
|
MODEL_CACHE[cache_key] = (model, processor) |
|
|
|
elif task == "translation": |
|
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForSeq2SeqLM.from_pretrained(model_name) |
|
MODEL_CACHE[cache_key] = (model, tokenizer) |
|
|
|
elif task == "sentiment": |
|
from transformers import AutoModelForSequenceClassification, AutoTokenizer |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForSequenceClassification.from_pretrained(model_name) |
|
MODEL_CACHE[cache_key] = (model, tokenizer) |
|
|
|
elif task == "summarization": |
|
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForSeq2SeqLM.from_pretrained(model_name) |
|
MODEL_CACHE[cache_key] = (model, tokenizer) |
|
|
|
elif task == "code_generation": |
|
from transformers import AutoModel, AutoTokenizer |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModel.from_pretrained(model_name) |
|
MODEL_CACHE[cache_key] = (model, tokenizer) |
|
|
|
else: |
|
logger.error(f"Unsupported task: {task}") |
|
raise ValidationError(f"Unsupported task: {task}") |
|
|
|
|
|
elapsed_time = (time.time() - start_time) * 1000 |
|
log_performance(f"load_model_{task}", elapsed_time) |
|
logger.info(f"Model loaded successfully for {task}: {model_name} in {elapsed_time:.2f}ms") |
|
|
|
return MODEL_CACHE[cache_key] |
|
|
|
except Exception as e: |
|
logger.error(f"Error loading model {model_name} for task {task}: {str(e)}") |
|
raise AIModelError(f"Error loading model {model_name} for task {task}", {"original_error": str(e)}) from e |
|
|
|
@handle_ai_model_exceptions |
|
def generate_text(prompt: str, max_length: Optional[int] = None, temperature: Optional[float] = None) -> str: |
|
""" |
|
Generate text using DialoGPT-medium |
|
|
|
Args: |
|
prompt: Input prompt |
|
max_length: Maximum length of generated text (uses config default if None) |
|
temperature: Temperature for sampling (uses config default if None) |
|
|
|
Returns: |
|
Generated text |
|
|
|
Raises: |
|
AIModelError: If there's an error generating text |
|
""" |
|
task = "text_generation" |
|
model_config = AI_MODELS[task] |
|
model_name = model_config["name"] |
|
|
|
|
|
if max_length is None: |
|
max_length = model_config.get("max_length", 100) |
|
if temperature is None: |
|
temperature = model_config.get("temperature", 0.7) |
|
|
|
logger.debug(f"Generating text with prompt: {prompt[:50]}...") |
|
start_time = time.time() |
|
|
|
model, tokenizer = get_model(task) |
|
|
|
try: |
|
|
|
inputs = tokenizer.encode(prompt + tokenizer.eos_token, return_tensors="pt") |
|
with torch.no_grad(): |
|
outputs = model.generate( |
|
inputs, |
|
max_length=max_length, |
|
pad_token_id=tokenizer.eos_token_id, |
|
no_repeat_ngram_size=3, |
|
do_sample=True, |
|
top_k=50, |
|
top_p=0.95, |
|
temperature=temperature |
|
) |
|
|
|
|
|
response = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
|
|
|
elapsed_time = (time.time() - start_time) * 1000 |
|
log_performance("generate_text", elapsed_time) |
|
log_ai_model_usage(model_name, "text_generation", len(inputs[0]) + len(outputs[0])) |
|
|
|
logger.debug(f"Text generated successfully in {elapsed_time:.2f}ms") |
|
return response |
|
except Exception as e: |
|
logger.error(f"Error generating text: {str(e)}") |
|
raise AIModelError(f"Error generating text", {"original_error": str(e)}) from e |
|
|
|
@handle_ai_model_exceptions |
|
def answer_question(question: str, context: str) -> str: |
|
""" |
|
Answer a question based on the given context |
|
|
|
Args: |
|
question: Question to answer |
|
context: Context for the question |
|
|
|
Returns: |
|
Answer to the question |
|
|
|
Raises: |
|
AIModelError: If there's an error answering the question |
|
""" |
|
task = "question_answering" |
|
model_name = AI_MODELS[task]["name"] |
|
|
|
logger.debug(f"Answering question: {question}") |
|
start_time = time.time() |
|
|
|
model, tokenizer = get_model(task) |
|
|
|
try: |
|
|
|
inputs = tokenizer(question, context, return_tensors="pt") |
|
|
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
|
|
|
|
answer_start = torch.argmax(outputs.start_logits) |
|
answer_end = torch.argmax(outputs.end_logits) + 1 |
|
|
|
|
|
answer = tokenizer.convert_tokens_to_string( |
|
tokenizer.convert_ids_to_tokens(inputs.input_ids[0][answer_start:answer_end]) |
|
) |
|
|
|
|
|
elapsed_time = (time.time() - start_time) * 1000 |
|
log_performance("answer_question", elapsed_time) |
|
log_ai_model_usage(model_name, "question_answering", len(inputs.input_ids[0])) |
|
|
|
logger.debug(f"Question answered successfully in {elapsed_time:.2f}ms") |
|
return answer if answer else "No answer found" |
|
except Exception as e: |
|
logger.error(f"Error answering question: {str(e)}") |
|
raise AIModelError(f"Error answering question", {"original_error": str(e)}) from e |
|
|
|
@handle_ai_model_exceptions |
|
def analyze_sentiment(text: str) -> Dict[str, float]: |
|
""" |
|
Analyze sentiment of text |
|
|
|
Args: |
|
text: Text to analyze |
|
|
|
Returns: |
|
Dictionary with sentiment scores |
|
|
|
Raises: |
|
AIModelError: If there's an error analyzing sentiment |
|
""" |
|
task = "sentiment" |
|
model_name = AI_MODELS[task]["name"] |
|
|
|
logger.debug(f"Analyzing sentiment of text: {text[:50]}...") |
|
start_time = time.time() |
|
|
|
model, tokenizer = get_model(task) |
|
|
|
try: |
|
|
|
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512) |
|
|
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
|
|
|
|
scores = torch.nn.functional.softmax(outputs.logits, dim=1) |
|
scores = scores.detach().numpy()[0] |
|
|
|
|
|
labels = ["negative", "neutral", "positive"] |
|
results = {label: float(score) for label, score in zip(labels, scores)} |
|
|
|
|
|
elapsed_time = (time.time() - start_time) * 1000 |
|
log_performance("analyze_sentiment", elapsed_time) |
|
log_ai_model_usage(model_name, "sentiment_analysis", len(inputs.input_ids[0])) |
|
|
|
logger.debug(f"Sentiment analysis completed successfully in {elapsed_time:.2f}ms") |
|
return results |
|
except Exception as e: |
|
logger.error(f"Error analyzing sentiment: {str(e)}") |
|
raise AIModelError(f"Error analyzing sentiment", {"original_error": str(e)}) from e |
|
|
|
@handle_ai_model_exceptions |
|
def summarize_text(text: str, max_length: Optional[int] = None, min_length: Optional[int] = None) -> str: |
|
""" |
|
Summarize text using BART |
|
|
|
Args: |
|
text: Text to summarize |
|
max_length: Maximum length of summary (uses config default if None) |
|
min_length: Minimum length of summary (uses config default if None) |
|
|
|
Returns: |
|
Summarized text |
|
|
|
Raises: |
|
AIModelError: If there's an error summarizing text |
|
""" |
|
task = "summarization" |
|
model_config = AI_MODELS[task] |
|
model_name = model_config["name"] |
|
|
|
|
|
if max_length is None: |
|
max_length = model_config.get("max_length", 150) |
|
if min_length is None: |
|
min_length = model_config.get("min_length", 40) |
|
|
|
logger.debug(f"Summarizing text: {text[:50]}...") |
|
start_time = time.time() |
|
|
|
model, tokenizer = get_model(task) |
|
|
|
try: |
|
|
|
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=1024) |
|
|
|
|
|
with torch.no_grad(): |
|
summary_ids = model.generate( |
|
inputs.input_ids, |
|
max_length=max_length, |
|
min_length=min_length, |
|
num_beams=4, |
|
early_stopping=True |
|
) |
|
|
|
|
|
summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True) |
|
|
|
|
|
elapsed_time = (time.time() - start_time) * 1000 |
|
log_performance("summarize_text", elapsed_time) |
|
log_ai_model_usage(model_name, "summarization", len(inputs.input_ids[0]) + len(summary_ids[0])) |
|
|
|
logger.debug(f"Text summarization completed successfully in {elapsed_time:.2f}ms") |
|
return summary |
|
except Exception as e: |
|
logger.error(f"Error summarizing text: {str(e)}") |
|
raise AIModelError(f"Error summarizing text", {"original_error": str(e)}) from e |
|
|
|
@handle_ai_model_exceptions |
|
def get_weather(city: str) -> Dict[str, Any]: |
|
""" |
|
Get weather information for a city using a free weather API |
|
|
|
Args: |
|
city: City name |
|
|
|
Returns: |
|
Weather information |
|
|
|
Raises: |
|
AIModelError: If there's an error getting weather information |
|
""" |
|
logger.debug(f"Getting weather for city: {city}") |
|
start_time = time.time() |
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
weather_data = { |
|
"location": city, |
|
"temperature": 22, |
|
"condition": "Partly Cloudy", |
|
"humidity": 65, |
|
"wind_speed": 10, |
|
"forecast": [ |
|
{"day": "Today", "high": 24, "low": 18, "condition": "Partly Cloudy"}, |
|
{"day": "Tomorrow", "high": 26, "low": 19, "condition": "Sunny"}, |
|
{"day": "Day After", "high": 23, "low": 17, "condition": "Rain"} |
|
] |
|
} |
|
|
|
|
|
elapsed_time = (time.time() - start_time) * 1000 |
|
log_performance("get_weather", elapsed_time) |
|
|
|
logger.debug(f"Weather data retrieved successfully in {elapsed_time:.2f}ms") |
|
return weather_data |
|
except Exception as e: |
|
logger.error(f"Error getting weather: {str(e)}") |
|
raise AIModelError(f"Error getting weather information", {"original_error": str(e)}) from e |
|
|
|
@handle_ai_model_exceptions |
|
def generate_motivation_quote() -> str: |
|
""" |
|
Generate a motivational quote using DialoGPT |
|
|
|
Returns: |
|
Motivational quote |
|
|
|
Raises: |
|
AIModelError: If there's an error generating the quote |
|
""" |
|
logger.debug("Generating motivational quote") |
|
|
|
prompts = [ |
|
"Share an inspiring quote about productivity.", |
|
"What's a motivational quote for success?", |
|
"Give me a quote about achieving goals.", |
|
"Share wisdom about staying focused.", |
|
"What's a good quote about perseverance?" |
|
] |
|
|
|
import random |
|
prompt = random.choice(prompts) |
|
|
|
return generate_text(prompt, max_length=50) |
|
|
|
@handle_ai_model_exceptions |
|
def generate_daily_plan(tasks: List[Dict[str, Any]], goals: List[Dict[str, Any]]) -> str: |
|
""" |
|
Generate a daily plan based on tasks and goals |
|
|
|
Args: |
|
tasks: List of tasks |
|
goals: List of goals |
|
|
|
Returns: |
|
Generated daily plan |
|
|
|
Raises: |
|
AIModelError: If there's an error generating the plan |
|
""" |
|
logger.debug("Generating daily plan") |
|
|
|
|
|
active_tasks = [task for task in tasks if not task.get("completed", False)][:5] |
|
active_goals = [goal for goal in goals if not goal.get("completed", False)][:3] |
|
|
|
task_list = "\n".join([f"- {task.get('title', 'Untitled Task')}" for task in active_tasks]) |
|
goal_list = "\n".join([f"- {goal.get('title', 'Untitled Goal')}" for goal in active_goals]) |
|
|
|
prompt = f"""Create a productive daily plan based on these tasks and goals: |
|
|
|
Tasks: |
|
{task_list} |
|
|
|
Goals: |
|
{goal_list} |
|
|
|
Daily Plan:""" |
|
|
|
return generate_text(prompt, max_length=300) |
|
|
|
@handle_ai_model_exceptions |
|
def break_down_task(task_title: str, task_description: str) -> List[str]: |
|
""" |
|
Break down a task into subtasks using AI |
|
|
|
Args: |
|
task_title: Title of the task |
|
task_description: Description of the task |
|
|
|
Returns: |
|
List of subtasks |
|
|
|
Raises: |
|
AIModelError: If there's an error breaking down the task |
|
""" |
|
logger.debug(f"Breaking down task: {task_title}") |
|
|
|
prompt = f"""Break down this task into 3-5 actionable subtasks: |
|
|
|
Task: {task_title} |
|
Description: {task_description} |
|
|
|
Subtasks:""" |
|
|
|
response = generate_text(prompt, max_length=200) |
|
|
|
|
|
subtasks = [] |
|
for line in response.split("\n"): |
|
line = line.strip() |
|
if line and (line.startswith("-") or line.startswith("*") or |
|
(len(line) > 2 and line[0].isdigit() and line[1] == '.')): |
|
|
|
subtask = line[2:].strip() if line[1] == ' ' else line[1:].strip() |
|
subtasks.append(subtask) |
|
|
|
|
|
if not subtasks: |
|
logger.warning(f"Failed to parse subtasks for {task_title}, using generic subtasks") |
|
subtasks = [ |
|
f"Research for {task_title}", |
|
f"Create draft for {task_title}", |
|
f"Review and finalize {task_title}" |
|
] |
|
|
|
return subtasks |
|
|
|
@handle_ai_model_exceptions |
|
def estimate_task_time(task_title: str, task_description: str) -> int: |
|
""" |
|
Estimate time needed for a task in minutes |
|
|
|
Args: |
|
task_title: Title of the task |
|
task_description: Description of the task |
|
|
|
Returns: |
|
Estimated time in minutes |
|
|
|
Raises: |
|
AIModelError: If there's an error estimating the time |
|
""" |
|
logger.debug(f"Estimating time for task: {task_title}") |
|
|
|
prompt = f"""Estimate how many minutes this task will take: |
|
|
|
Task: {task_title} |
|
Description: {task_description} |
|
|
|
Estimated minutes:""" |
|
|
|
response = generate_text(prompt, max_length=10) |
|
|
|
|
|
import re |
|
numbers = re.findall(r'\d+', response) |
|
|
|
if numbers: |
|
|
|
try: |
|
minutes = int(numbers[0]) |
|
|
|
return min(max(minutes, 5), 480) |
|
except ValueError: |
|
logger.warning(f"Failed to parse time estimate from response: {response}") |
|
|
|
|
|
logger.warning(f"Using fallback time estimate for {task_title}") |
|
return min(len(task_title) * 5, 120) |