chatAi / app.py
Soufianesejjari's picture
update requirements and Dockerfile for improved dependency management and application performance
fd63909
import os
import time
from flask import Flask, request, jsonify
from flask_cors import CORS
import logging
import glob
import re # Add this for format_response function
# Import the new modules
from web_content_fetcher import fetch_web_content
from token_manager import generate_token, validate_token, refresh_token, TOKEN_EXPIRATION, save_user_data, load_user_data, get_user_data_file_path, USER_DATA_DIR
from groq_api import get_completion
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Initialize Flask
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
# Dictionary to store user data:
# { token: { urls: [], documents: [], created_at: timestamp, default_message: str, contact_email: str } }
user_data = {}
# Class representing a document
class Document:
def __init__(self, text, metadata=None):
self.text = text
self.metadata = metadata or {}
def to_dict(self):
"""Convert Document to a JSON-serializable dictionary"""
return {
"text": self.text,
"metadata": self.metadata
}
@classmethod
def from_dict(cls, data):
"""Create Document from a dictionary"""
return cls(text=data["text"], metadata=data["metadata"])
# Load all existing tokens from disk on startup
def load_all_user_data():
"""Load all user data from disk on application startup"""
logging.info("Loading all user data from disk...")
count = 0
try:
# Get all JSON files in the user data directory
json_files = glob.glob(os.path.join(USER_DATA_DIR, "*.json"))
for file_path in json_files:
try:
# Extract token from filename
token = os.path.basename(file_path).replace(".json", "")
# Load user data
data = load_user_data(token)
if data:
# Convert documents back to Document objects
if "documents" in data and isinstance(data["documents"], list):
data["documents"] = [Document(text=doc["text"], metadata=doc["metadata"])
for doc in data["documents"]]
# Add to in-memory cache
user_data[token] = data
count += 1
except Exception as e:
logging.error(f"Error loading user data from {file_path}: {e}")
continue
except Exception as e:
logging.error(f"Error loading user data: {e}")
logging.info(f"Loaded {count} tokens from disk")
return count
# Loading documents from a list of URLs
def load_documents_from_urls(urls):
documents = []
for url in urls:
content = fetch_web_content(url)
if content:
logging.info(f"Successfully fetched content from {url}: {len(content)} characters")
print(f"Récupération de {url} - {len(content)} caractères")
print(content[:400])
documents.append(Document(text=content, metadata={"url": url}))
else:
logging.warning(f"Failed to fetch content from {url}")
return documents
# Prepare user data for JSON serialization
def prepare_data_for_json(data):
"""Convert user data to a JSON-serializable format"""
serializable_data = dict(data)
if "documents" in serializable_data:
serializable_data["documents"] = [doc.to_dict() for doc in serializable_data["documents"]]
return serializable_data
# Restore documents from JSON data
def restore_documents_from_json(data):
"""Restore Document objects from JSON data"""
if "documents" in data and isinstance(data["documents"], list):
data["documents"] = [Document.from_dict(doc_dict) for doc_dict in data["documents"]]
return data
# Chat function
def chat(user_input, token):
if not validate_token(user_data, token):
return "Unauthorized: Invalid or expired token."
documents = user_data[token]['documents']
contact_email = user_data[token].get("contact_email", "soufiane.sejjari@neologix.ma")
default_message = user_data[token].get("default_message", "")
# Improved keyword-based document retrieval
most_relevant_docs = []
query_terms = set(word.lower() for word in user_input.split() if len(word) > 3)
for doc in documents:
# Calculate relevance score based on keyword presence
relevance_score = 0
doc_text_lower = doc.text.lower()
for term in query_terms:
if term in doc_text_lower:
relevance_score += 1
# Bonus for terms in title or metadata
if 'url' in doc.metadata and term in doc.metadata['url'].lower():
relevance_score += 0.5
if term in doc_text_lower[:200]: # Term in beginning of document
relevance_score += 0.5
if relevance_score > 0:
# Store document with its relevance score
most_relevant_docs.append((doc.text, relevance_score))
# Sort by relevance score
most_relevant_docs.sort(key=lambda x: x[1], reverse=True)
# Context selection and token management
max_context_tokens = 4000 # Approximate token limit
context_parts = []
token_count = 0
# Add the most relevant document content until we approach token limit
for doc_text, _ in most_relevant_docs[:3]: # Limit to top 3 documents
# Truncate long documents to avoid exceeding token limits
truncated_text = truncate_document(doc_text, 2000)
estimated_tokens = len(truncated_text.split()) * 1.3 # Rough estimate
if token_count + estimated_tokens > max_context_tokens:
# If adding would exceed limit, add partial content
remaining_tokens = max_context_tokens - token_count
if remaining_tokens > 200: # Only add if we can include meaningful content
partial_size = int(remaining_tokens / 1.3) # Convert back to approximate word count
context_parts.append(truncate_document(truncated_text, partial_size))
break
context_parts.append(truncated_text)
token_count += estimated_tokens
# If no relevant docs were found or context is too small, use default content
if not context_parts:
if default_message:
context = default_message
elif documents:
# Use beginning portions of first two documents
context = "\n\n".join(truncate_document(doc.text, 1000) for doc in documents[:2])
else:
context = "No relevant information available."
else:
context = "\n\n--- Next Document ---\n\n".join(context_parts)
# Send to LLM for response generation
response = get_completion(user_input, context, contact_email)
return format_response(response)
def truncate_document(text, max_chars):
"""Intelligently truncate a document to avoid cutting in the middle of sentences."""
if not text or len(text) <= max_chars:
return text
# Try to truncate at sentence boundaries
truncated = text[:max_chars]
# Find the last sentence boundary
last_period = truncated.rfind('.')
last_question = truncated.rfind('?')
last_exclamation = truncated.rfind('!')
# Use the latest sentence boundary
last_boundary = max(last_period, last_question, last_exclamation)
if last_boundary > max_chars * 0.5: # Only use if it's not too early in the text
return truncated[:last_boundary+1]
else:
# Fallback to word boundary
last_space = truncated.rfind(' ')
if last_space > 0:
return truncated[:last_space] + "..."
else:
return truncated + "..."
def format_response(response_text):
"""Format the response to make it more presentable."""
if not response_text:
return "Je n'ai pas pu générer une réponse. Veuillez réessayer."
# Clean up any markdown artifacts that might affect readability
response_text = re.sub(r'\*\*\*', '', response_text) # Remove bold/italic markers
# Ensure proper spacing between paragraphs
response_text = re.sub(r'\n\s*\n', '\n\n', response_text)
# Add proper punctuation at the end if missing
if response_text[-1] not in ['.', '!', '?', ':', ';']:
response_text += '.'
return response_text
# Endpoint /config: register URLs and get a token
@app.route("/config", methods=["POST"])
def config_endpoint():
try:
data = request.json
if not data or 'urls' not in data:
return jsonify({"status": "error", "message": "URLs required"}), 400
urls = data['urls']
if not urls:
return jsonify({"status": "error", "message": "No URLs provided"}), 400
# Optional parameters
default_message = data.get("default_message", "")
contact_email = data.get("contact_email", "soufiane.sejjari@neologix.ma")
# Generate token and load documents
token = generate_token()
documents = load_documents_from_urls(urls)
user_data[token] = {
'urls': urls,
'documents': documents,
'created_at': time.time(),
'default_message': default_message,
'contact_email': contact_email
}
# Save user data to file (serializable format)
save_user_data(token, prepare_data_for_json(user_data[token]))
return jsonify({
"status": "success",
"message": f"Successfully indexed {len(documents)} documents",
"token": token
})
except Exception as e:
logging.exception("Error in /config endpoint")
return jsonify({"status": "error", "message": str(e)}), 500
# Endpoint /chat
@app.route("/chat", methods=["POST"])
def chat_endpoint():
try:
data = request.json
if not data or 'message' not in data:
return jsonify({"status": "error", "message": "Message required"}), 400
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({"status": "error", "message": "Authorization header with Bearer token required"}), 401
token = auth_header.split(' ')[1]
user_input = data["message"]
response = chat(user_input, token)
return jsonify({"status": "success", "response": response})
except Exception as e:
logging.exception("Error in /chat endpoint")
return jsonify({"status": "error", "message": str(e)}), 500
# Endpoint /refresh_token
@app.route("/refresh_token", methods=["POST"])
def refresh_token_endpoint():
try:
data = request.json
if not data or 'token' not in data:
return jsonify({"status": "error", "message": "Token required"}), 400
old_token = data['token']
new_token = refresh_token(user_data, old_token)
if new_token:
return jsonify({"status": "success", "message": "Token refreshed successfully", "token": new_token})
else:
return jsonify({"status": "error", "message": "Invalid or expired token"}), 400
except Exception as e:
logging.exception("Error in /refresh_token endpoint")
return jsonify({"status": "error", "message": str(e)}), 500
# Test endpoint
@app.route("/test", methods=["GET"])
def test():
return jsonify({
"status": "ok",
"message": "API is working!",
"active_tokens": len(user_data)
})
# Load all user data on startup
load_all_user_data()
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
app.run(host="0.0.0.0", port=port)