from flask import Flask, render_template, request, jsonify, send_from_directory from flask_cors import CORS from flask_limiter import Limiter from flask_limiter.util import get_remote_address from deepface import DeepFace from werkzeug.utils import secure_filename import os import tempfile import shutil import uuid import logging import time from datetime import datetime from functools import wraps import numpy as np import cv2 from PIL import Image import io import threading import queue import hashlib # Configuration du logging logging.basicConfig( filename='app.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) class FaceAnalysisApp: def __init__(self): self.app = Flask(__name__, static_folder='static') self.setup_app() def setup_app(self): # Configuration de base self.app.config['UPLOAD_FOLDER'] = 'static/uploads' self.app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 self.app.config['ALLOWED_EXTENSIONS'] = {'png', 'jpg', 'jpeg', 'gif'} self.app.config['SECRET_KEY'] = os.urandom(24) # Initialisation des composants CORS(self.app) self.limiter = Limiter( self.app, key_func=get_remote_address, default_limits=["200 per day", "50 per hour"] ) # File d'attente pour le traitement asynchrone self.task_queue = queue.Queue() self.start_worker_thread() # Cache pour les résultats self.results_cache = {} def start_worker_thread(self): def worker(): while True: task = self.task_queue.get() if task is None: break try: task() except Exception as e: logging.error(f"Error in worker thread: {str(e)}") self.task_queue.task_done() self.worker_thread = threading.Thread(target=worker, daemon=True) self.worker_thread.start() def timing_decorator(self, f): @wraps(f) def wrap(*args, **kwargs): start = time.time() result = f(*args, **kwargs) end = time.time() logging.info(f'{f.__name__} took {end-start:.2f} seconds to execute') return result return wrap def validate_image(self, image_stream): """Valide et optimise l'image""" try: img = Image.open(image_stream) # Vérification des dimensions if img.size[0] > 2000 or img.size[1] > 2000: img.thumbnail((2000, 2000), Image.LANCZOS) # Conversion en RGB si nécessaire if img.mode not in ('RGB', 'L'): img = img.convert('RGB') # Optimisation output = io.BytesIO() img.save(output, format='JPEG', quality=85, optimize=True) output.seek(0) return output except Exception as e: logging.error(f"Image validation error: {str(e)}") raise ValueError("Invalid image format") def process_face_detection(self, image_path): """Détection de visage avec mise en cache""" image_hash = hashlib.md5(open(image_path, 'rb').read()).hexdigest() if image_hash in self.results_cache: return self.results_cache[image_hash] try: result = DeepFace.analyze( img_path=image_path, actions=['age', 'gender', 'race', 'emotion'], enforce_detection=True ) self.results_cache[image_hash] = result return result except Exception as e: logging.error(f"Face detection error: {str(e)}") raise @timing_decorator def verify_faces(self, image1_path, image2_path): """Comparaison des visages avec vérification approfondie""" try: # Vérification initiale de la présence de visages face1 = cv2.imread(image1_path) face2 = cv2.imread(image2_path) if face1 is None or face2 is None: raise ValueError("Unable to read one or both images") result = DeepFace.verify( img1_path=image1_path, img2_path=image2_path, enforce_detection=True, model_name="VGG-Face" ) # Enrichissement des résultats result['timestamp'] = datetime.now().isoformat() result['confidence_score'] = 1 - result.get('distance', 0) result['processing_time'] = time.time() return result except Exception as e: logging.error(f"Face verification error: {str(e)}") raise def setup_routes(self): @self.app.route('/') def index(): return render_template('index.html') @self.app.route('/verify', methods=['POST']) @self.limiter.limit("10 per minute") def verify_faces_endpoint(): try: if 'image1' not in request.files or 'image2' not in request.files: return jsonify({'error': 'Two images are required'}), 400 image1 = request.files['image1'] image2 = request.files['image2'] # Validation des images try: image1_stream = self.validate_image(image1) image2_stream = self.validate_image(image2) except ValueError as e: return jsonify({'error': str(e)}), 400 # Création des fichiers temporaires with tempfile.TemporaryDirectory() as temp_dir: image1_path = os.path.join(temp_dir, secure_filename(image1.filename)) image2_path = os.path.join(temp_dir, secure_filename(image2.filename)) # Sauvegarde des images optimisées with open(image1_path, 'wb') as f: f.write(image1_stream.getvalue()) with open(image2_path, 'wb') as f: f.write(image2_stream.getvalue()) # Analyse des visages result = self.verify_faces(image1_path, image2_path) # Sauvegarde permanente si nécessaire if result['verified']: permanent_dir = os.path.join(self.app.static_folder, 'verified_faces') os.makedirs(permanent_dir, exist_ok=True) # Génération de noms uniques timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') image1_name = f"face1_{timestamp}_{uuid.uuid4().hex[:8]}.jpg" image2_name = f"face2_{timestamp}_{uuid.uuid4().hex[:8]}.jpg" shutil.copy2(image1_path, os.path.join(permanent_dir, image1_name)) shutil.copy2(image2_path, os.path.join(permanent_dir, image2_name)) result['image1_url'] = f'/static/verified_faces/{image1_name}' result['image2_url'] = f'/static/verified_faces/{image2_name}' return jsonify(result) except Exception as e: logging.error(f"Verification endpoint error: {str(e)}") return jsonify({'error': 'An internal error occurred'}), 500 @self.app.route('/analyze', methods=['POST']) @self.limiter.limit("20 per minute") def analyze_face_endpoint(): try: if 'image' not in request.files: return jsonify({'error': 'No image provided'}), 400 image = request.files['image'] # Validation de l'image try: image_stream = self.validate_image(image) except ValueError as e: return jsonify({'error': str(e)}), 400 # Traitement asynchrone result_queue = queue.Queue() def process_task(): try: with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp_file: temp_file.write(image_stream.getvalue()) result = self.process_face_detection(temp_file.name) result_queue.put(('success', result)) except Exception as e: result_queue.put(('error', str(e))) finally: try: os.unlink(temp_file.name) except: pass self.task_queue.put(process_task) # Attente du résultat avec timeout try: status, result = result_queue.get(timeout=30) if status == 'error': return jsonify({'error': result}), 500 return jsonify(result) except queue.Empty: return jsonify({'error': 'Processing timeout'}), 408 except Exception as e: logging.error(f"Analysis endpoint error: {str(e)}") return jsonify({'error': 'An internal error occurred'}), 500 @self.app.errorhandler(413) def request_entity_too_large(error): return jsonify({'error': 'File too large'}), 413 @self.app.errorhandler(429) def ratelimit_handler(e): return jsonify({'error': 'Rate limit exceeded'}), 429 def run(self, host='0.0.0.0', port=5000, debug=False): self.setup_routes() self.app.run(host=host, port=port, debug=debug) if __name__ == '__main__': app = FaceAnalysisApp() app.run(debug=True)