AhmedIbrahim007's picture
Update app.py
10e6ed7 verified
raw
history blame
8.47 kB
import json
import cv2
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import logging
import tempfile
from pathlib import Path
import firebase_admin
from firebase_admin import credentials, firestore, storage
from pydantic import BaseModel
from deepface import DeepFace
from tqdm import tqdm
# Set up logging
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Initialize Firebase
try:
cred = credentials.Certificate("serviceAccountKey.json")
firebase_app = firebase_admin.initialize_app(cred, {
'storageBucket': 'future-forge-60d3f.appspot.com'
})
db = firestore.client()
bucket = storage.bucket(app=firebase_app)
logger.info("Firebase initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Firebase: {str(e)}")
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Define the input model
class FileProcess(BaseModel):
file_path: str
@app.post("/facial-emotion")
async def process_file(file_data: FileProcess):
logger.info(f"Processing file from Firebase Storage: {file_data.file_path}")
try:
# Get the file from Firebase Storage
blob = bucket.blob(file_data.file_path)
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_data.file_path.split('.')[-1]}") as tmp_file:
blob.download_to_filename(tmp_file.name)
tmp_file_path = Path(tmp_file.name)
logger.info(f"File downloaded temporarily at: {tmp_file_path}")
file_type = file_data.file_path.split('.')[-1].lower()
result = None
try:
if file_type in ['jpg', 'jpeg', 'png', 'bmp']:
output_image = process_image(tmp_file_path)
result = {"type": "image", "data": output_image}
elif file_type in ['mp4', 'avi', 'mov', 'wmv']:
video_output = process_video(str(tmp_file_path))
result = {"type": "video", "data": video_output}
else:
raise HTTPException(status_code=400, detail="Unsupported file type")
logger.info(f"Processing complete. Result: {result}")
# Store result in Firebase
try:
doc_ref = db.collection('results').add(result)
return {"message": "File processed successfully", "result": result}
except Exception as e:
logger.error(f"Failed to store result in Firebase: {str(e)}")
return {"message": "File processed successfully, but failed to store in Firebase", "result": result,
"error": str(e)}
finally:
# Clean up the temporary file after processing
if tmp_file_path.exists():
tmp_file_path.unlink()
except Exception as e:
logger.error(f"Error processing file: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}")
def process_video(video_path, output_video_path='output_video.mp4', frame_sample_rate=5):
cap = cv2.VideoCapture(video_path)
# Check if video opened successfully
if not cap.isOpened():
logger.error("Error: Could not open video.")
return None
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
output = {}
frame_index = 0
# Create a progress bar
with tqdm(total=total_frames, desc="Processing video") as pbar:
while True:
ret, frame = cap.read()
if not ret:
logger.info("End of video or cannot capture the frame.")
break
if frame_index % frame_sample_rate == 0: # Only analyze every nth frame
try:
result = DeepFace.analyze(frame, actions=['emotion'], detector_backend='retinaface',enforce_detection=False)
except Exception as e:
logger.error(f"Error analyzing frame {frame_index}: {e}")
output[frame_index] = {}
out.write(frame) # Write the original frame
frame_index += 1
pbar.update(1)
continue # Skip to the next frame
tmp = {}
for face in result:
x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h']
emotion = face['dominant_emotion']
emotion_scores = face['emotion']
tmp[(x, y, w, h)] = {'emotion': emotion, 'score': emotion_scores[emotion]}
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)
cv2.putText(frame, f"{emotion} ({emotion_scores[emotion]:.2f})", (x, y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)
output[frame_index] = tmp
out.write(frame) # Write the processed frame
frame_index += 1
pbar.update(1) # Update progress bar
# Release resources
cap.release()
out.release()
# Save the results to a file
with open('results_video.txt', 'w') as file:
for frame_num, faces_info in output.items():
file.write(f"Frame {frame_num} ")
for face_key, info in faces_info.items():
file.write(f"{face_key}: {info}\n")
logger.info(f"Processed {frame_index} frames.")
video_json_output = calculate_emotion_percentages('results_video.txt')
print(video_json_output)
return video_json_output
def process_image(image_path):
image = cv2.imread(image_path)
if image is None:
print(f"Error: Unable to load image from path {image_path}")
return
try:
# Analyze the image for face detection and emotion analysis
result = DeepFace.analyze(image_path, actions=['emotion'], detector_backend='retinaface',enforce_detection=False)
except Exception as e:
print(f"Error analyzing image: {e}")
return image
if len(result) == 0:
print("No faces detected.")
return image # Return the original image if no faces are detected
output = {}
tmp = {}
for i, face in enumerate(result):
# Get bounding box coordinates for each detected face
x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h']
# Extract emotion data
emotion = face['dominant_emotion']
emotion_scores = face['emotion']
tmp[i] = {'person':i+1,'emotion': emotion, 'score': f"{emotion_scores[emotion]:.3f}"}
# Draw rectangle around face and label with predicted emotion
cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)
cv2.putText(image, f"{emotion} ({emotion_scores[emotion]:.3f})", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8,(0, 255, 0), 2)
output['output'] = tmp
# Save the processed image with bounding boxes and labels
output_image_path = 'output_image_with_emotions.jpg'
cv2.imwrite(output_image_path, image)
print(f"Processed image saved as {output_image_path}")
string_image_output = json.dumps(output['output'])
return string_image_output
def calculate_emotion_percentages(file_path):
emotions = {}
total_frames = 0
with open(file_path, 'r') as file:
for line in file:
if "{'emotion':" in line:
total_frames += 1
emotion = line.split("'emotion': ")[1].split("'")[1]
emotions[emotion] = emotions.get(emotion, 0) + 1
emotion_percentages = [
{"emotion": emotion, "percentage": (count / total_frames) * 100}
for emotion, count in emotions.items()
]
return emotion_percentages
if __name__ == "__main__":
logger.info("Starting the Face Emotion Recognition API")
uvicorn.run(app, host="0.0.0.0", port=7860)