Spaces:
Running
Running
from flask import Flask, render_template, request, jsonify, send_from_directory | |
from flask_cors import CORS | |
from flask_limiter import Limiter | |
from flask_limiter.util import get_remote_address | |
from deepface import DeepFace | |
from werkzeug.utils import secure_filename | |
import os | |
import tempfile | |
import shutil | |
import uuid | |
import logging | |
import time | |
from datetime import datetime | |
from functools import wraps | |
import numpy as np | |
import cv2 | |
from PIL import Image | |
import io | |
import threading | |
import queue | |
import hashlib | |
# Configuration du logging | |
logging.basicConfig( | |
filename='app.log', | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
class FaceAnalysisApp: | |
def __init__(self): | |
self.app = Flask(__name__, static_folder='static') | |
self.setup_app() | |
def setup_app(self): | |
# Configuration de base | |
self.app.config['UPLOAD_FOLDER'] = 'static/uploads' | |
self.app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 | |
self.app.config['ALLOWED_EXTENSIONS'] = {'png', 'jpg', 'jpeg', 'gif'} | |
self.app.config['SECRET_KEY'] = os.urandom(24) | |
# Initialisation des composants | |
CORS(self.app) | |
self.limiter = Limiter( | |
self.app, | |
key_func=get_remote_address, | |
default_limits=["200 per day", "50 per hour"] | |
) | |
# File d'attente pour le traitement asynchrone | |
self.task_queue = queue.Queue() | |
self.start_worker_thread() | |
# Cache pour les résultats | |
self.results_cache = {} | |
def start_worker_thread(self): | |
def worker(): | |
while True: | |
task = self.task_queue.get() | |
if task is None: | |
break | |
try: | |
task() | |
except Exception as e: | |
logging.error(f"Error in worker thread: {str(e)}") | |
self.task_queue.task_done() | |
self.worker_thread = threading.Thread(target=worker, daemon=True) | |
self.worker_thread.start() | |
def timing_decorator(self, f): | |
def wrap(*args, **kwargs): | |
start = time.time() | |
result = f(*args, **kwargs) | |
end = time.time() | |
logging.info(f'{f.__name__} took {end-start:.2f} seconds to execute') | |
return result | |
return wrap | |
def validate_image(self, image_stream): | |
"""Valide et optimise l'image""" | |
try: | |
img = Image.open(image_stream) | |
# Vérification des dimensions | |
if img.size[0] > 2000 or img.size[1] > 2000: | |
img.thumbnail((2000, 2000), Image.LANCZOS) | |
# Conversion en RGB si nécessaire | |
if img.mode not in ('RGB', 'L'): | |
img = img.convert('RGB') | |
# Optimisation | |
output = io.BytesIO() | |
img.save(output, format='JPEG', quality=85, optimize=True) | |
output.seek(0) | |
return output | |
except Exception as e: | |
logging.error(f"Image validation error: {str(e)}") | |
raise ValueError("Invalid image format") | |
def process_face_detection(self, image_path): | |
"""Détection de visage avec mise en cache""" | |
image_hash = hashlib.md5(open(image_path, 'rb').read()).hexdigest() | |
if image_hash in self.results_cache: | |
return self.results_cache[image_hash] | |
try: | |
result = DeepFace.analyze( | |
img_path=image_path, | |
actions=['age', 'gender', 'race', 'emotion'], | |
enforce_detection=True | |
) | |
self.results_cache[image_hash] = result | |
return result | |
except Exception as e: | |
logging.error(f"Face detection error: {str(e)}") | |
raise | |
def verify_faces(self, image1_path, image2_path): | |
"""Comparaison des visages avec vérification approfondie""" | |
try: | |
# Vérification initiale de la présence de visages | |
face1 = cv2.imread(image1_path) | |
face2 = cv2.imread(image2_path) | |
if face1 is None or face2 is None: | |
raise ValueError("Unable to read one or both images") | |
result = DeepFace.verify( | |
img1_path=image1_path, | |
img2_path=image2_path, | |
enforce_detection=True, | |
model_name="VGG-Face" | |
) | |
# Enrichissement des résultats | |
result['timestamp'] = datetime.now().isoformat() | |
result['confidence_score'] = 1 - result.get('distance', 0) | |
result['processing_time'] = time.time() | |
return result | |
except Exception as e: | |
logging.error(f"Face verification error: {str(e)}") | |
raise | |
def setup_routes(self): | |
def index(): | |
return render_template('index.html') | |
def verify_faces_endpoint(): | |
try: | |
if 'image1' not in request.files or 'image2' not in request.files: | |
return jsonify({'error': 'Two images are required'}), 400 | |
image1 = request.files['image1'] | |
image2 = request.files['image2'] | |
# Validation des images | |
try: | |
image1_stream = self.validate_image(image1) | |
image2_stream = self.validate_image(image2) | |
except ValueError as e: | |
return jsonify({'error': str(e)}), 400 | |
# Création des fichiers temporaires | |
with tempfile.TemporaryDirectory() as temp_dir: | |
image1_path = os.path.join(temp_dir, secure_filename(image1.filename)) | |
image2_path = os.path.join(temp_dir, secure_filename(image2.filename)) | |
# Sauvegarde des images optimisées | |
with open(image1_path, 'wb') as f: | |
f.write(image1_stream.getvalue()) | |
with open(image2_path, 'wb') as f: | |
f.write(image2_stream.getvalue()) | |
# Analyse des visages | |
result = self.verify_faces(image1_path, image2_path) | |
# Sauvegarde permanente si nécessaire | |
if result['verified']: | |
permanent_dir = os.path.join(self.app.static_folder, 'verified_faces') | |
os.makedirs(permanent_dir, exist_ok=True) | |
# Génération de noms uniques | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
image1_name = f"face1_{timestamp}_{uuid.uuid4().hex[:8]}.jpg" | |
image2_name = f"face2_{timestamp}_{uuid.uuid4().hex[:8]}.jpg" | |
shutil.copy2(image1_path, os.path.join(permanent_dir, image1_name)) | |
shutil.copy2(image2_path, os.path.join(permanent_dir, image2_name)) | |
result['image1_url'] = f'/static/verified_faces/{image1_name}' | |
result['image2_url'] = f'/static/verified_faces/{image2_name}' | |
return jsonify(result) | |
except Exception as e: | |
logging.error(f"Verification endpoint error: {str(e)}") | |
return jsonify({'error': 'An internal error occurred'}), 500 | |
def analyze_face_endpoint(): | |
try: | |
if 'image' not in request.files: | |
return jsonify({'error': 'No image provided'}), 400 | |
image = request.files['image'] | |
# Validation de l'image | |
try: | |
image_stream = self.validate_image(image) | |
except ValueError as e: | |
return jsonify({'error': str(e)}), 400 | |
# Traitement asynchrone | |
result_queue = queue.Queue() | |
def process_task(): | |
try: | |
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp_file: | |
temp_file.write(image_stream.getvalue()) | |
result = self.process_face_detection(temp_file.name) | |
result_queue.put(('success', result)) | |
except Exception as e: | |
result_queue.put(('error', str(e))) | |
finally: | |
try: | |
os.unlink(temp_file.name) | |
except: | |
pass | |
self.task_queue.put(process_task) | |
# Attente du résultat avec timeout | |
try: | |
status, result = result_queue.get(timeout=30) | |
if status == 'error': | |
return jsonify({'error': result}), 500 | |
return jsonify(result) | |
except queue.Empty: | |
return jsonify({'error': 'Processing timeout'}), 408 | |
except Exception as e: | |
logging.error(f"Analysis endpoint error: {str(e)}") | |
return jsonify({'error': 'An internal error occurred'}), 500 | |
def request_entity_too_large(error): | |
return jsonify({'error': 'File too large'}), 413 | |
def ratelimit_handler(e): | |
return jsonify({'error': 'Rate limit exceeded'}), 429 | |
def run(self, host='0.0.0.0', port=5000, debug=False): | |
self.setup_routes() | |
self.app.run(host=host, port=port, debug=debug) | |
if __name__ == '__main__': | |
app = FaceAnalysisApp() | |
app.run(debug=True) |