|
import json |
|
import cv2 |
|
from fastapi import FastAPI, HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
import uvicorn |
|
import logging |
|
import tempfile |
|
from pathlib import Path |
|
import firebase_admin |
|
from firebase_admin import credentials, firestore, storage |
|
from pydantic import BaseModel |
|
from deepface import DeepFace |
|
from tqdm import tqdm |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
try: |
|
cred = credentials.Certificate("serviceAccountKey.json") |
|
firebase_app = firebase_admin.initialize_app(cred, { |
|
'storageBucket': 'future-forge-60d3f.appspot.com' |
|
}) |
|
db = firestore.client() |
|
bucket = storage.bucket(app=firebase_app) |
|
logger.info("Firebase initialized successfully") |
|
except Exception as e: |
|
logger.error(f"Failed to initialize Firebase: {str(e)}") |
|
|
|
app = FastAPI() |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
class FileProcess(BaseModel): |
|
file_path: str |
|
|
|
@app.post("/facial-emotion") |
|
async def process_file(file_data: FileProcess): |
|
logger.info(f"Processing file from Firebase Storage: {file_data.file_path}") |
|
try: |
|
|
|
blob = bucket.blob(file_data.file_path) |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_data.file_path.split('.')[-1]}") as tmp_file: |
|
blob.download_to_filename(tmp_file.name) |
|
tmp_file_path = Path(tmp_file.name) |
|
logger.info(f"File downloaded temporarily at: {tmp_file_path}") |
|
|
|
file_type = file_data.file_path.split('.')[-1].lower() |
|
|
|
result = None |
|
|
|
try: |
|
if file_type in ['jpg', 'jpeg', 'png', 'bmp']: |
|
output_image = process_image(tmp_file_path) |
|
result = {"type": "image", "data": output_image} |
|
elif file_type in ['mp4', 'avi', 'mov', 'wmv']: |
|
video_output = process_video(str(tmp_file_path)) |
|
result = {"type": "video", "data": video_output} |
|
else: |
|
raise HTTPException(status_code=400, detail="Unsupported file type") |
|
|
|
logger.info(f"Processing complete. Result: {result}") |
|
|
|
|
|
try: |
|
doc_ref = db.collection('results').add(result) |
|
return {"message": "File processed successfully", "result": result} |
|
except Exception as e: |
|
logger.error(f"Failed to store result in Firebase: {str(e)}") |
|
return {"message": "File processed successfully, but failed to store in Firebase", "result": result, |
|
"error": str(e)} |
|
finally: |
|
|
|
if tmp_file_path.exists(): |
|
tmp_file_path.unlink() |
|
except Exception as e: |
|
logger.error(f"Error processing file: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}") |
|
|
|
def process_video(video_path, output_video_path='output_video.mp4', frame_sample_rate=5): |
|
cap = cv2.VideoCapture(video_path) |
|
|
|
if not cap.isOpened(): |
|
logger.error("Error: Could not open video.") |
|
return None |
|
fps = int(cap.get(cv2.CAP_PROP_FPS)) |
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) |
|
output = {} |
|
frame_index = 0 |
|
|
|
with tqdm(total=total_frames, desc="Processing video") as pbar: |
|
while True: |
|
ret, frame = cap.read() |
|
if not ret: |
|
logger.info("End of video or cannot capture the frame.") |
|
break |
|
|
|
if frame_index % frame_sample_rate == 0: |
|
try: |
|
result = DeepFace.analyze(frame, actions=['emotion'], detector_backend='retinaface',enforce_detection=False) |
|
except Exception as e: |
|
logger.error(f"Error analyzing frame {frame_index}: {e}") |
|
output[frame_index] = {} |
|
out.write(frame) |
|
frame_index += 1 |
|
pbar.update(1) |
|
continue |
|
tmp = {} |
|
for face in result: |
|
x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h'] |
|
emotion = face['dominant_emotion'] |
|
emotion_scores = face['emotion'] |
|
tmp[(x, y, w, h)] = {'emotion': emotion, 'score': emotion_scores[emotion]} |
|
|
|
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2) |
|
cv2.putText(frame, f"{emotion} ({emotion_scores[emotion]:.2f})", (x, y - 10), |
|
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2) |
|
output[frame_index] = tmp |
|
out.write(frame) |
|
frame_index += 1 |
|
pbar.update(1) |
|
|
|
cap.release() |
|
out.release() |
|
|
|
with open('results_video.txt', 'w') as file: |
|
for frame_num, faces_info in output.items(): |
|
file.write(f"Frame {frame_num} ") |
|
for face_key, info in faces_info.items(): |
|
file.write(f"{face_key}: {info}\n") |
|
|
|
logger.info(f"Processed {frame_index} frames.") |
|
video_json_output = calculate_emotion_percentages('results_video.txt') |
|
print(video_json_output) |
|
return video_json_output |
|
|
|
def process_image(image_path): |
|
image = cv2.imread(image_path) |
|
if image is None: |
|
print(f"Error: Unable to load image from path {image_path}") |
|
return |
|
try: |
|
|
|
result = DeepFace.analyze(image_path, actions=['emotion'], detector_backend='retinaface',enforce_detection=False) |
|
except Exception as e: |
|
print(f"Error analyzing image: {e}") |
|
return image |
|
|
|
if len(result) == 0: |
|
print("No faces detected.") |
|
return image |
|
|
|
output = {} |
|
tmp = {} |
|
for i, face in enumerate(result): |
|
|
|
x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h'] |
|
|
|
emotion = face['dominant_emotion'] |
|
emotion_scores = face['emotion'] |
|
tmp[i] = {'person':i+1,'emotion': emotion, 'score': f"{emotion_scores[emotion]:.3f}"} |
|
|
|
|
|
cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2) |
|
cv2.putText(image, f"{emotion} ({emotion_scores[emotion]:.3f})", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8,(0, 255, 0), 2) |
|
output['output'] = tmp |
|
|
|
output_image_path = 'output_image_with_emotions.jpg' |
|
cv2.imwrite(output_image_path, image) |
|
print(f"Processed image saved as {output_image_path}") |
|
string_image_output = json.dumps(output['output']) |
|
return string_image_output |
|
|
|
def calculate_emotion_percentages(file_path): |
|
emotions = {} |
|
total_frames = 0 |
|
with open(file_path, 'r') as file: |
|
for line in file: |
|
if "{'emotion':" in line: |
|
total_frames += 1 |
|
emotion = line.split("'emotion': ")[1].split("'")[1] |
|
emotions[emotion] = emotions.get(emotion, 0) + 1 |
|
|
|
emotion_percentages = [ |
|
{"emotion": emotion, "percentage": (count / total_frames) * 100} |
|
for emotion, count in emotions.items() |
|
] |
|
return emotion_percentages |
|
|
|
if __name__ == "__main__": |
|
logger.info("Starting the Face Emotion Recognition API") |
|
uvicorn.run(app, host="0.0.0.0", port=7860) |