from flask import Flask, request, render_template, jsonify, Response import json import os from google import genai from google.genai import types import base64 from werkzeug.utils import secure_filename import mimetypes from dotenv import load_dotenv from datetime import datetime import logging # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('app.log', mode='a', encoding='utf-8') ] ) logger = logging.getLogger(__name__) app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size load_dotenv() @app.before_request def log_request_info(): logger.info(f'Request: {request.method} {request.url} from {request.remote_addr}') def load_system_instruction(): """Charge les instructions système depuis le fichier Markdown""" try: with open('instructions/system_instruction.md', 'r', encoding='utf-8') as f: return f.read().strip() except FileNotFoundError: logger.error("Fichier d'instructions système non trouvé.") return "Tu es un assistant intelligent et amical nommé Mariam. Tu assistes les utilisateurs au mieux de tes capacités. Tu as été créé par Aenir." except Exception as e: logger.exception("Erreur lors du chargement des instructions système") return "Tu es un assistant intelligent et amical nommé Mariam. Tu assistes les utilisateurs au mieux de tes capacités. Tu as été créé par Aenir." # Configuration du client Gemini API_KEY = os.getenv("GOOGLE_API_KEY") SYSTEM_INSTRUCTION = load_system_instruction() if not API_KEY: logger.warning("GOOGLE_API_KEY non définie dans les variables d'environnement") logger.warning("L'application démarrera mais les fonctionnalités de chat seront limitées") client = None else: try: client = genai.Client(api_key=API_KEY) logger.info("Client Gemini initialisé avec succès") except Exception as e: logger.exception("Erreur lors de l'initialisation du client Gemini") client = None # Configuration par défaut MODEL = "gemini-2.5-flash" DEFAULT_CONFIG = { "temperature": 0.7, "max_output_tokens": 8192, "top_p": 0.9, "top_k": 40 } # Outils activés par défaut DEFAULT_TOOLS = [ types.Tool(code_execution=types.ToolCodeExecution()), types.Tool(google_search=types.GoogleSearch()) ] # Stockage des conversations avec métadonnées (en production, utilisez une base de données) conversations = {} conversation_metadata = {} def add_message_to_history(conversation_id, role, content, has_file=False, file_data=None): """Ajoute un message à l'historique de la conversation""" if conversation_id not in conversation_metadata: conversation_metadata[conversation_id] = { 'id': conversation_id, 'created_at': datetime.now().isoformat(), 'last_activity': datetime.now().isoformat(), 'messages': [], 'status': 'active' } message_data = { 'role': role, 'content': content, 'timestamp': datetime.now().isoformat(), 'hasFile': has_file } if file_data: message_data['fileData'] = file_data conversation_metadata[conversation_id]['messages'].append(message_data) conversation_metadata[conversation_id]['last_activity'] = datetime.now().isoformat() @app.route('/') def index(): return render_template('index.html') @app.route('/admin1') def admin(): """Page d'administration""" return render_template('admin.html') @app.route('/admin/conversations') def get_conversations(): """API pour récupérer les conversations pour l'admin""" try: # Calculer les statistiques total_conversations = len(conversation_metadata) total_messages = sum(len(conv['messages']) for conv in conversation_metadata.values()) active_conversations = sum(1 for conv in conversation_metadata.values() if conv.get('status') == 'active') conversations_with_files = sum(1 for conv in conversation_metadata.values() if any(msg.get('hasFile') for msg in conv['messages'])) # Préparer les données des conversations conversations_data = [] for conv_id, conv_data in conversation_metadata.items(): conversations_data.append({ 'id': conv_id, 'createdAt': conv_data.get('created_at'), 'lastActivity': conv_data.get('last_activity'), 'status': conv_data.get('status', 'active'), 'messages': conv_data.get('messages', []) }) # Trier par dernière activité (plus récent en premier) conversations_data.sort(key=lambda x: x.get('lastActivity', ''), reverse=True) return jsonify({ 'conversations': conversations_data, 'stats': { 'total': total_conversations, 'totalMessages': total_messages, 'active': active_conversations, 'withFiles': conversations_with_files } }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/chat', methods=['POST']) def chat(): try: if not client: return jsonify({'error': 'Client Gemini non initialisé. Vérifiez GOOGLE_API_KEY.'}), 500 data = request.get_json() message = data.get('message', '') thinking_enabled = data.get('thinking_enabled', True) conversation_id = data.get('conversation_id', 'default') logger.info(f"Requête chat reçue: message='{message[:50]}...', conversation_id={conversation_id}") # Ajouter le message de l'utilisateur à l'historique add_message_to_history(conversation_id, 'user', message) # Configuration du thinking config_dict = DEFAULT_CONFIG.copy() config_dict["system_instruction"] = SYSTEM_INSTRUCTION config_dict["tools"] = DEFAULT_TOOLS # Activer thinking si demandé if thinking_enabled: config_dict["thinking_config"] = types.ThinkingConfig( thinking_budget=-1, # Dynamic thinking include_thoughts=True ) generation_config = types.GenerateContentConfig(**config_dict) # Gestion de la conversation if conversation_id not in conversations: conversations[conversation_id] = client.chats.create( model=MODEL, config=generation_config ) chat = conversations[conversation_id] # Génération de la réponse avec streaming def generate(): try: if not client: yield f"data: {json.dumps({'type': 'error', 'content': 'API Gemini non configurée. Définissez GOOGLE_API_KEY.'})}\n\n" return logger.info(f"Démarrage du streaming pour conversation {conversation_id}") response_stream = chat.send_message_stream( message, config=generation_config ) full_response = "" thoughts = "" chunk_count = 0 for chunk in response_stream: chunk_count += 1 logger.debug(f"Chunk {chunk_count} reçu") if chunk.candidates and chunk.candidates[0].content: for part in chunk.candidates[0].content.parts: if part.text: if part.thought and thinking_enabled: thoughts += part.text yield f"data: {json.dumps({'type': 'thought', 'content': part.text})}\n\n" else: full_response += part.text yield f"data: {json.dumps({'type': 'text', 'content': part.text})}\n\n" logger.info(f"Streaming terminé, réponse complète: {len(full_response)} caractères") # Ajouter la réponse de l'assistant à l'historique if full_response: add_message_to_history(conversation_id, 'assistant', full_response) # Signal de fin yield f"data: {json.dumps({'type': 'end'})}\n\n" except Exception as e: logger.exception("Erreur lors du streaming") yield f"data: {json.dumps({'type': 'error', 'content': f'Erreur API: {str(e)}'})}\n\n" return Response(generate(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Content-Type', }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/upload', methods=['POST']) def upload_file(): try: if 'file' not in request.files: return jsonify({'error': 'No file uploaded'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 # Lire le fichier file_bytes = file.read() mime_type = file.content_type or mimetypes.guess_type(file.filename)[0] logger.info(f"Fichier uploadé: {file.filename}, taille: {len(file_bytes)} bytes, type: {mime_type}") # Encoder en base64 pour le stockage temporaire file_b64 = base64.b64encode(file_bytes).decode() return jsonify({ 'success': True, 'mime_type': mime_type, 'data': file_b64 }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/chat_with_file', methods=['POST']) def chat_with_file(): try: if not client: return jsonify({'error': 'Client Gemini non initialisé. Vérifiez GOOGLE_API_KEY.'}), 500 data = request.get_json() message = data.get('message', '') file_data_list = data.get('file_data', []) thinking_enabled = data.get('thinking_enabled', True) conversation_id = data.get('conversation_id', 'default') logger.info(f"Requête chat_with_file reçue: message='{message[:50]}...', fichiers={len(file_data_list)}, conversation_id={conversation_id}") # Ensure file_data_list is a list if not isinstance(file_data_list, list): file_data_list = [file_data_list] # Ajouter le message de l'utilisateur à l'historique (avec indication de fichiers) display_message = message if message else 'Analyse ces fichiers' if file_data_list: file_count = len(file_data_list) display_message += f" [{file_count} fichier{'s' if file_count > 1 else ''}]" add_message_to_history(conversation_id, 'user', display_message, has_file=len(file_data_list) > 0, file_data=file_data_list) # Configuration du thinking config_dict = DEFAULT_CONFIG.copy() config_dict["tools"] = DEFAULT_TOOLS config_dict["system_instruction"] = SYSTEM_INSTRUCTION # Activer thinking si demandé if thinking_enabled: config_dict["thinking_config"] = types.ThinkingConfig( thinking_budget=-1, include_thoughts=True ) generation_config = types.GenerateContentConfig(**config_dict) # Gestion de la conversation if conversation_id not in conversations: conversations[conversation_id] = client.chats.create( model=MODEL, config=generation_config ) chat = conversations[conversation_id] # Préparation du contenu avec fichiers contents = [message] for file_data in file_data_list: file_bytes = base64.b64decode(file_data['data']) file_part = types.Part.from_bytes( data=file_bytes, mime_type=file_data['mime_type'] ) contents.append(file_part) # Génération de la réponse avec streaming def generate(): try: if not client: yield f"data: {json.dumps({'type': 'error', 'content': 'API Gemini non configurée. Définissez GOOGLE_API_KEY.'})}\n\n" return logger.info(f"Démarrage du streaming avec fichiers pour conversation {conversation_id}") response_stream = chat.send_message_stream( contents, config=generation_config ) full_response = "" thoughts = "" chunk_count = 0 for chunk in response_stream: chunk_count += 1 logger.debug(f"Chunk {chunk_count} reçu (avec fichiers)") for part in chunk.candidates[0].content.parts: if part.text: if part.thought and thinking_enabled: thoughts += part.text yield f"data: {json.dumps({'type': 'thought', 'content': part.text})}\n\n" else: full_response += part.text yield f"data: {json.dumps({'type': 'text', 'content': part.text})}\n\n" logger.info(f"Streaming avec fichiers terminé, réponse complète: {len(full_response)} caractères") # Ajouter la réponse de l'assistant à l'historique if full_response: add_message_to_history(conversation_id, 'assistant', full_response) # Signal de fin yield f"data: {json.dumps({'type': 'end'})}\n\n" except Exception as e: logger.exception("Erreur lors du streaming avec fichiers") yield f"data: {json.dumps({'type': 'error', 'content': f'Erreur API avec fichiers: {str(e)}'})}\n\n" return Response(generate(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Content-Type', }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/reset_conversation', methods=['POST']) def reset_conversation(): try: data = request.get_json() conversation_id = data.get('conversation_id', 'default') if conversation_id in conversations: del conversations[conversation_id] # Marquer la conversation comme terminée dans les métadonnées if conversation_id in conversation_metadata: conversation_metadata[conversation_id]['status'] = 'reset' conversation_metadata[conversation_id]['last_activity'] = datetime.now().isoformat() logger.info(f"Conversation {conversation_id} réinitialisée") return jsonify({'success': True}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/admin/conversations/', methods=['DELETE']) def delete_conversation(conversation_id): """Supprimer une conversation (pour l'admin)""" try: if conversation_id in conversations: del conversations[conversation_id] if conversation_id in conversation_metadata: del conversation_metadata[conversation_id] logger.info(f"Conversation {conversation_id} supprimée") return jsonify({'success': True}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/admin/conversations//export') def export_conversation(conversation_id): """Exporter une conversation en JSON""" try: if conversation_id not in conversation_metadata: return jsonify({'error': 'Conversation non trouvée'}), 404 conversation_data = conversation_metadata[conversation_id] logger.info(f"Conversation {conversation_id} exportée") return jsonify({ 'conversation_id': conversation_id, 'export_date': datetime.now().isoformat(), 'data': conversation_data }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/admin/stats') def get_admin_stats(): """Statistiques détaillées pour l'admin""" try: # Statistiques générales total_conversations = len(conversation_metadata) total_messages = sum(len(conv['messages']) for conv in conversation_metadata.values()) # Statistiques par statut status_stats = {} for conv in conversation_metadata.values(): status = conv.get('status', 'active') status_stats[status] = status_stats.get(status, 0) + 1 # Conversations avec fichiers conversations_with_files = sum(1 for conv in conversation_metadata.values() if any(msg.get('hasFile') for msg in conv['messages'])) # Activité par jour (derniers 7 jours) from collections import defaultdict daily_activity = defaultdict(int) for conv in conversation_metadata.values(): for message in conv['messages']: if message.get('timestamp'): try: date = datetime.fromisoformat(message['timestamp']).date() daily_activity[date.isoformat()] += 1 except: continue return jsonify({ 'total_conversations': total_conversations, 'total_messages': total_messages, 'status_distribution': status_stats, 'conversations_with_files': conversations_with_files, 'daily_activity': dict(daily_activity) }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/debug/api_test') def debug_api_test(): """Endpoint de debug pour tester la connectivité API""" try: if not client: return jsonify({ 'status': 'error', 'message': 'Client Gemini non initialisé', 'api_key_set': bool(API_KEY) }) # Test simple de l'API response = client.models.generate_content( model=MODEL, contents="Hello", config=types.GenerateContentConfig( max_output_tokens=10, system_instruction="Réponds brièvement." ) ) return jsonify({ 'status': 'success', 'message': 'API Gemini fonctionnelle', 'model': MODEL, 'response_length': len(response.text) if response.text else 0, 'sample_response': response.text[:100] if response.text else None }) except Exception as e: return jsonify({ 'status': 'error', 'message': f'Erreur API: {str(e)}', 'api_key_set': bool(API_KEY) }) if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=7860)