Spaces:
Sleeping
Sleeping
update requirements and Dockerfile for improved dependency management and application performance
fd63909
import os | |
import time | |
from flask import Flask, request, jsonify | |
from flask_cors import CORS | |
import logging | |
import glob | |
import re # Add this for format_response function | |
# Import the new modules | |
from web_content_fetcher import fetch_web_content | |
from token_manager import generate_token, validate_token, refresh_token, TOKEN_EXPIRATION, save_user_data, load_user_data, get_user_data_file_path, USER_DATA_DIR | |
from groq_api import get_completion | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Initialize Flask | |
app = Flask(__name__) | |
CORS(app) # Enable CORS for all routes | |
# Dictionary to store user data: | |
# { token: { urls: [], documents: [], created_at: timestamp, default_message: str, contact_email: str } } | |
user_data = {} | |
# Class representing a document | |
class Document: | |
def __init__(self, text, metadata=None): | |
self.text = text | |
self.metadata = metadata or {} | |
def to_dict(self): | |
"""Convert Document to a JSON-serializable dictionary""" | |
return { | |
"text": self.text, | |
"metadata": self.metadata | |
} | |
def from_dict(cls, data): | |
"""Create Document from a dictionary""" | |
return cls(text=data["text"], metadata=data["metadata"]) | |
# Load all existing tokens from disk on startup | |
def load_all_user_data(): | |
"""Load all user data from disk on application startup""" | |
logging.info("Loading all user data from disk...") | |
count = 0 | |
try: | |
# Get all JSON files in the user data directory | |
json_files = glob.glob(os.path.join(USER_DATA_DIR, "*.json")) | |
for file_path in json_files: | |
try: | |
# Extract token from filename | |
token = os.path.basename(file_path).replace(".json", "") | |
# Load user data | |
data = load_user_data(token) | |
if data: | |
# Convert documents back to Document objects | |
if "documents" in data and isinstance(data["documents"], list): | |
data["documents"] = [Document(text=doc["text"], metadata=doc["metadata"]) | |
for doc in data["documents"]] | |
# Add to in-memory cache | |
user_data[token] = data | |
count += 1 | |
except Exception as e: | |
logging.error(f"Error loading user data from {file_path}: {e}") | |
continue | |
except Exception as e: | |
logging.error(f"Error loading user data: {e}") | |
logging.info(f"Loaded {count} tokens from disk") | |
return count | |
# Loading documents from a list of URLs | |
def load_documents_from_urls(urls): | |
documents = [] | |
for url in urls: | |
content = fetch_web_content(url) | |
if content: | |
logging.info(f"Successfully fetched content from {url}: {len(content)} characters") | |
print(f"Récupération de {url} - {len(content)} caractères") | |
print(content[:400]) | |
documents.append(Document(text=content, metadata={"url": url})) | |
else: | |
logging.warning(f"Failed to fetch content from {url}") | |
return documents | |
# Prepare user data for JSON serialization | |
def prepare_data_for_json(data): | |
"""Convert user data to a JSON-serializable format""" | |
serializable_data = dict(data) | |
if "documents" in serializable_data: | |
serializable_data["documents"] = [doc.to_dict() for doc in serializable_data["documents"]] | |
return serializable_data | |
# Restore documents from JSON data | |
def restore_documents_from_json(data): | |
"""Restore Document objects from JSON data""" | |
if "documents" in data and isinstance(data["documents"], list): | |
data["documents"] = [Document.from_dict(doc_dict) for doc_dict in data["documents"]] | |
return data | |
# Chat function | |
def chat(user_input, token): | |
if not validate_token(user_data, token): | |
return "Unauthorized: Invalid or expired token." | |
documents = user_data[token]['documents'] | |
contact_email = user_data[token].get("contact_email", "soufiane.sejjari@neologix.ma") | |
default_message = user_data[token].get("default_message", "") | |
# Improved keyword-based document retrieval | |
most_relevant_docs = [] | |
query_terms = set(word.lower() for word in user_input.split() if len(word) > 3) | |
for doc in documents: | |
# Calculate relevance score based on keyword presence | |
relevance_score = 0 | |
doc_text_lower = doc.text.lower() | |
for term in query_terms: | |
if term in doc_text_lower: | |
relevance_score += 1 | |
# Bonus for terms in title or metadata | |
if 'url' in doc.metadata and term in doc.metadata['url'].lower(): | |
relevance_score += 0.5 | |
if term in doc_text_lower[:200]: # Term in beginning of document | |
relevance_score += 0.5 | |
if relevance_score > 0: | |
# Store document with its relevance score | |
most_relevant_docs.append((doc.text, relevance_score)) | |
# Sort by relevance score | |
most_relevant_docs.sort(key=lambda x: x[1], reverse=True) | |
# Context selection and token management | |
max_context_tokens = 4000 # Approximate token limit | |
context_parts = [] | |
token_count = 0 | |
# Add the most relevant document content until we approach token limit | |
for doc_text, _ in most_relevant_docs[:3]: # Limit to top 3 documents | |
# Truncate long documents to avoid exceeding token limits | |
truncated_text = truncate_document(doc_text, 2000) | |
estimated_tokens = len(truncated_text.split()) * 1.3 # Rough estimate | |
if token_count + estimated_tokens > max_context_tokens: | |
# If adding would exceed limit, add partial content | |
remaining_tokens = max_context_tokens - token_count | |
if remaining_tokens > 200: # Only add if we can include meaningful content | |
partial_size = int(remaining_tokens / 1.3) # Convert back to approximate word count | |
context_parts.append(truncate_document(truncated_text, partial_size)) | |
break | |
context_parts.append(truncated_text) | |
token_count += estimated_tokens | |
# If no relevant docs were found or context is too small, use default content | |
if not context_parts: | |
if default_message: | |
context = default_message | |
elif documents: | |
# Use beginning portions of first two documents | |
context = "\n\n".join(truncate_document(doc.text, 1000) for doc in documents[:2]) | |
else: | |
context = "No relevant information available." | |
else: | |
context = "\n\n--- Next Document ---\n\n".join(context_parts) | |
# Send to LLM for response generation | |
response = get_completion(user_input, context, contact_email) | |
return format_response(response) | |
def truncate_document(text, max_chars): | |
"""Intelligently truncate a document to avoid cutting in the middle of sentences.""" | |
if not text or len(text) <= max_chars: | |
return text | |
# Try to truncate at sentence boundaries | |
truncated = text[:max_chars] | |
# Find the last sentence boundary | |
last_period = truncated.rfind('.') | |
last_question = truncated.rfind('?') | |
last_exclamation = truncated.rfind('!') | |
# Use the latest sentence boundary | |
last_boundary = max(last_period, last_question, last_exclamation) | |
if last_boundary > max_chars * 0.5: # Only use if it's not too early in the text | |
return truncated[:last_boundary+1] | |
else: | |
# Fallback to word boundary | |
last_space = truncated.rfind(' ') | |
if last_space > 0: | |
return truncated[:last_space] + "..." | |
else: | |
return truncated + "..." | |
def format_response(response_text): | |
"""Format the response to make it more presentable.""" | |
if not response_text: | |
return "Je n'ai pas pu générer une réponse. Veuillez réessayer." | |
# Clean up any markdown artifacts that might affect readability | |
response_text = re.sub(r'\*\*\*', '', response_text) # Remove bold/italic markers | |
# Ensure proper spacing between paragraphs | |
response_text = re.sub(r'\n\s*\n', '\n\n', response_text) | |
# Add proper punctuation at the end if missing | |
if response_text[-1] not in ['.', '!', '?', ':', ';']: | |
response_text += '.' | |
return response_text | |
# Endpoint /config: register URLs and get a token | |
def config_endpoint(): | |
try: | |
data = request.json | |
if not data or 'urls' not in data: | |
return jsonify({"status": "error", "message": "URLs required"}), 400 | |
urls = data['urls'] | |
if not urls: | |
return jsonify({"status": "error", "message": "No URLs provided"}), 400 | |
# Optional parameters | |
default_message = data.get("default_message", "") | |
contact_email = data.get("contact_email", "soufiane.sejjari@neologix.ma") | |
# Generate token and load documents | |
token = generate_token() | |
documents = load_documents_from_urls(urls) | |
user_data[token] = { | |
'urls': urls, | |
'documents': documents, | |
'created_at': time.time(), | |
'default_message': default_message, | |
'contact_email': contact_email | |
} | |
# Save user data to file (serializable format) | |
save_user_data(token, prepare_data_for_json(user_data[token])) | |
return jsonify({ | |
"status": "success", | |
"message": f"Successfully indexed {len(documents)} documents", | |
"token": token | |
}) | |
except Exception as e: | |
logging.exception("Error in /config endpoint") | |
return jsonify({"status": "error", "message": str(e)}), 500 | |
# Endpoint /chat | |
def chat_endpoint(): | |
try: | |
data = request.json | |
if not data or 'message' not in data: | |
return jsonify({"status": "error", "message": "Message required"}), 400 | |
auth_header = request.headers.get('Authorization') | |
if not auth_header or not auth_header.startswith('Bearer '): | |
return jsonify({"status": "error", "message": "Authorization header with Bearer token required"}), 401 | |
token = auth_header.split(' ')[1] | |
user_input = data["message"] | |
response = chat(user_input, token) | |
return jsonify({"status": "success", "response": response}) | |
except Exception as e: | |
logging.exception("Error in /chat endpoint") | |
return jsonify({"status": "error", "message": str(e)}), 500 | |
# Endpoint /refresh_token | |
def refresh_token_endpoint(): | |
try: | |
data = request.json | |
if not data or 'token' not in data: | |
return jsonify({"status": "error", "message": "Token required"}), 400 | |
old_token = data['token'] | |
new_token = refresh_token(user_data, old_token) | |
if new_token: | |
return jsonify({"status": "success", "message": "Token refreshed successfully", "token": new_token}) | |
else: | |
return jsonify({"status": "error", "message": "Invalid or expired token"}), 400 | |
except Exception as e: | |
logging.exception("Error in /refresh_token endpoint") | |
return jsonify({"status": "error", "message": str(e)}), 500 | |
# Test endpoint | |
def test(): | |
return jsonify({ | |
"status": "ok", | |
"message": "API is working!", | |
"active_tokens": len(user_data) | |
}) | |
# Load all user data on startup | |
load_all_user_data() | |
if __name__ == "__main__": | |
port = int(os.environ.get("PORT", 7860)) | |
app.run(host="0.0.0.0", port=port) | |