AhmedIbrahim007 commited on
Commit
026cee8
·
verified ·
1 Parent(s): 0512f04

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +207 -239
app.py CHANGED
@@ -1,239 +1,207 @@
1
- import numpy as np
2
- import cv2
3
- from fastapi import FastAPI, HTTPException
4
- from fastapi.middleware.cors import CORSMiddleware
5
- import uvicorn
6
- import logging
7
- import tempfile
8
- from pathlib import Path
9
- import firebase_admin
10
- from firebase_admin import credentials, firestore, storage
11
- from pydantic import BaseModel
12
- from deepface import DeepFace
13
- from tqdm import tqdm
14
-
15
-
16
-
17
- # Set up logging
18
- logging.basicConfig(level=logging.DEBUG,
19
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
20
- logger = logging.getLogger(__name__)
21
-
22
- # Initialize Firebase
23
- try:
24
- cred = credentials.Certificate("serviceAccountKey.json")
25
- firebase_app = firebase_admin.initialize_app(cred, {
26
- 'storageBucket': 'future-forge-60d3f.appspot.com'
27
- })
28
- db = firestore.client()
29
- bucket = storage.bucket(app=firebase_app)
30
- logger.info("Firebase initialized successfully")
31
- except Exception as e:
32
- logger.error(f"Failed to initialize Firebase: {str(e)}")
33
-
34
- app = FastAPI()
35
-
36
- # Add CORS middleware
37
- app.add_middleware(
38
- CORSMiddleware,
39
- allow_origins=["*"],
40
- allow_credentials=True,
41
- allow_methods=["*"],
42
- allow_headers=["*"],
43
- )
44
-
45
-
46
- # Define the input model
47
- class FileProcess(BaseModel):
48
- file_path: str
49
-
50
-
51
- @app.post("/process")
52
- async def process_file(file_data: FileProcess):
53
- logger.info(f"Processing file from Firebase Storage: {file_data.file_path}")
54
- try:
55
- # Get the file from Firebase Storage
56
- blob = bucket.blob(file_data.file_path)
57
-
58
- # Create a temporary file
59
- with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_data.file_path.split('.')[-1]}") as tmp_file:
60
- blob.download_to_filename(tmp_file.name)
61
- tmp_file_path = Path(tmp_file.name)
62
- logger.info(f"File downloaded temporarily at: {tmp_file_path}")
63
-
64
- file_type = file_data.file_path.split('.')[-1].lower()
65
-
66
- result = None
67
-
68
- try:
69
- if file_type in ['jpg', 'jpeg', 'png', 'bmp']:
70
- # Decode image directly from Firebase
71
- image = decode_image_from_firebase(tmp_file_path)
72
- processed_image = process_image(image)
73
- output_path = Path('processed_image.jpg')
74
- output_path.parent.mkdir(parents=True, exist_ok=True)
75
- cv2.imwrite(str(output_path), processed_image)
76
- result = {"type": "image", "path": str(output_path)}
77
- elif file_type in ['mp4', 'avi', 'mov', 'wmv']:
78
- graph_paths = process_video(str(tmp_file_path))
79
- result = {"type": "video", "paths": graph_paths}
80
- else:
81
- raise HTTPException(status_code=400, detail="Unsupported file type")
82
-
83
- logger.info(f"Processing complete. Result: {result}")
84
-
85
- # Store result in Firebase
86
- try:
87
- doc_ref = db.collection('results').add(result)
88
- return {"message": "File processed successfully", "result": result}
89
- except Exception as e:
90
- logger.error(f"Failed to store result in Firebase: {str(e)}")
91
- return {"message": "File processed successfully, but failed to store in Firebase", "result": result,
92
- "error": str(e)}
93
-
94
- finally:
95
- # Clean up the temporary file after processing
96
- if tmp_file_path.exists():
97
- tmp_file_path.unlink()
98
-
99
- except Exception as e:
100
- logger.error(f"Error processing file: {str(e)}")
101
- raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}")
102
-
103
-
104
- # Initialize face detector
105
- face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
106
- def process_video(video_path, output_video_path='output_video.mp4', frame_sample_rate=5):
107
- cap = cv2.VideoCapture(video_path)
108
-
109
- # Check if video opened successfully
110
- if not cap.isOpened():
111
- logger.error("Error: Could not open video.")
112
- return None
113
-
114
- fps = int(cap.get(cv2.CAP_PROP_FPS))
115
- width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
116
- height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
117
- total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
118
-
119
- # Define the codec and create VideoWriter object
120
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
121
- out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
122
-
123
- output = {}
124
- frame_index = 0
125
-
126
- # Create a progress bar
127
- with tqdm(total=total_frames, desc="Processing video") as pbar:
128
- while True:
129
- ret, frame = cap.read()
130
- if not ret:
131
- logger.info("End of video or cannot capture the frame.")
132
- break
133
-
134
- if frame_index % frame_sample_rate == 0: # Only analyze every nth frame
135
- try:
136
- result = DeepFace.analyze(frame, actions=['emotion'], detector_backend='retinaface',
137
- enforce_detection=False)
138
- except Exception as e:
139
- logger.error(f"Error analyzing frame {frame_index}: {e}")
140
- output[frame_index] = {}
141
- out.write(frame) # Write the original frame
142
- frame_index += 1
143
- pbar.update(1)
144
- continue # Skip to the next frame
145
-
146
- for face in result:
147
- x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h']
148
- emotion = face['dominant_emotion']
149
- emotion_scores = face['emotion']
150
-
151
- cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)
152
- cv2.putText(frame, f"{emotion} ({emotion_scores[emotion]:.2f})", (x, y - 10),
153
- cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)
154
-
155
- out.write(frame) # Write the processed frame
156
- frame_index += 1
157
- pbar.update(1) # Update progress bar
158
-
159
- # Release resources
160
- cap.release()
161
- out.release()
162
-
163
- # Save the results to a file
164
- with open('results_video.txt', 'w') as file:
165
- for frame_num, faces_info in output.items():
166
- file.write(f"Frame {frame_num}\n")
167
- for face_key, info in faces_info.items():
168
- file.write(f" {face_key}: {info}\n")
169
-
170
- logger.info(f"Processed {frame_index} frames.")
171
- return output_video_path
172
-
173
-
174
- def process_image(image_path):
175
- # Load the image using OpenCV
176
- image = cv2.imread(image_path)
177
-
178
- if image is None:
179
- print(f"Error: Unable to load image from path {image_path}")
180
- return
181
-
182
- # Use RetinaFace for face detection and emotion analysis
183
- try:
184
- # Analyze the image for face detection and emotion analysis
185
- result = DeepFace.analyze(image_path, actions=['emotion'], detector_backend='retinaface',
186
- enforce_detection=False)
187
- except Exception as e:
188
- print(f"Error analyzing image: {e}")
189
- return image
190
-
191
- if len(result) == 0:
192
- print("No faces detected.")
193
- return image # Return the original image if no faces are detected
194
-
195
- output = {}
196
- tmp = {}
197
-
198
- for face in result:
199
- # Get bounding box coordinates for each detected face
200
- x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h']
201
-
202
- # Extract emotion data
203
- emotion = face['dominant_emotion']
204
- emotion_scores = face['emotion']
205
- tmp[(x, y, w, h)] = {'emotion': emotion, 'score': emotion_scores[emotion]}
206
-
207
- # Draw rectangle around face and label with predicted emotion
208
- cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)
209
- cv2.putText(image, f"{emotion} ({emotion_scores[emotion]:.2f})", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8,
210
- (0, 255, 0), 2)
211
-
212
- output['image'] = tmp
213
-
214
- # Save the processed image with bounding boxes and labels
215
- output_image_path = 'output_image_with_emotions.jpg'
216
- cv2.imwrite(output_image_path, image)
217
- print(f"Processed image saved as {output_image_path}")
218
-
219
- # Save the results to a file
220
- with open('results_image.txt', 'w') as file:
221
- file.write(f"Image {image_path}\n")
222
- for face_key, info in output['image'].items():
223
- file.write(f" {face_key}: {info}\n")
224
-
225
- return image
226
-
227
-
228
- def decode_image_from_firebase(temp_file_path):
229
- """Reads the temporary file and decodes the image for OpenCV."""
230
- with open(temp_file_path, 'rb') as f:
231
- image_array = np.asarray(bytearray(f.read()), dtype=np.uint8)
232
- image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
233
- return image
234
-
235
-
236
-
237
- if __name__ == "__main__":
238
- logger.info("Starting the Face Emotion Recognition API")
239
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
+ import json
2
+ import cv2
3
+ from fastapi import FastAPI, HTTPException
4
+ from fastapi.middleware.cors import CORSMiddleware
5
+ import uvicorn
6
+ import logging
7
+ import tempfile
8
+ from pathlib import Path
9
+ import firebase_admin
10
+ from firebase_admin import credentials, firestore, storage
11
+ from pydantic import BaseModel
12
+ from deepface import DeepFace
13
+ from tqdm import tqdm
14
+
15
+ # Set up logging
16
+ logging.basicConfig(level=logging.DEBUG,
17
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
18
+ logger = logging.getLogger(__name__)
19
+
20
+ # Initialize Firebase
21
+ try:
22
+ cred = credentials.Certificate("serviceAccountKey.json")
23
+ firebase_app = firebase_admin.initialize_app(cred, {
24
+ 'storageBucket': 'future-forge-60d3f.appspot.com'
25
+ })
26
+ db = firestore.client()
27
+ bucket = storage.bucket(app=firebase_app)
28
+ logger.info("Firebase initialized successfully")
29
+ except Exception as e:
30
+ logger.error(f"Failed to initialize Firebase: {str(e)}")
31
+
32
+ app = FastAPI()
33
+ # Add CORS middleware
34
+ app.add_middleware(
35
+ CORSMiddleware,
36
+ allow_origins=["*"],
37
+ allow_credentials=True,
38
+ allow_methods=["*"],
39
+ allow_headers=["*"],
40
+ )
41
+
42
+ # Define the input model
43
+ class FileProcess(BaseModel):
44
+ file_path: str
45
+
46
+ @app.post("/facial-emotion")
47
+ async def process_file(file_data: FileProcess):
48
+ logger.info(f"Processing file from Firebase Storage: {file_data.file_path}")
49
+ try:
50
+ # Get the file from Firebase Storage
51
+ blob = bucket.blob(file_data.file_path)
52
+ # Create a temporary file
53
+ with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_data.file_path.split('.')[-1]}") as tmp_file:
54
+ blob.download_to_filename(tmp_file.name)
55
+ tmp_file_path = Path(tmp_file.name)
56
+ logger.info(f"File downloaded temporarily at: {tmp_file_path}")
57
+
58
+ file_type = file_data.file_path.split('.')[-1].lower()
59
+
60
+ result = None
61
+
62
+ try:
63
+ if file_type in ['jpg', 'jpeg', 'png', 'bmp']:
64
+ output_image = process_image(tmp_file_path)
65
+ result = {"type": "image", "output": output_image}
66
+ elif file_type in ['mp4', 'avi', 'mov', 'wmv']:
67
+ video_output = process_video(str(tmp_file_path))
68
+ result = {"type": "video", "output": video_output}
69
+ else:
70
+ raise HTTPException(status_code=400, detail="Unsupported file type")
71
+
72
+ logger.info(f"Processing complete. Result: {result}")
73
+
74
+ # Store result in Firebase
75
+ try:
76
+ doc_ref = db.collection('results').add(result)
77
+ return {"message": "File processed successfully", "result": result}
78
+ except Exception as e:
79
+ logger.error(f"Failed to store result in Firebase: {str(e)}")
80
+ return {"message": "File processed successfully, but failed to store in Firebase", "result": result,
81
+ "error": str(e)}
82
+ finally:
83
+ # Clean up the temporary file after processing
84
+ if tmp_file_path.exists():
85
+ tmp_file_path.unlink()
86
+ except Exception as e:
87
+ logger.error(f"Error processing file: {str(e)}")
88
+ raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}")
89
+
90
+ def process_video(video_path, output_video_path='output_video.mp4', frame_sample_rate=5):
91
+ cap = cv2.VideoCapture(video_path)
92
+ # Check if video opened successfully
93
+ if not cap.isOpened():
94
+ logger.error("Error: Could not open video.")
95
+ return None
96
+ fps = int(cap.get(cv2.CAP_PROP_FPS))
97
+ width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
98
+ height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
99
+ total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
100
+ # Define the codec and create VideoWriter object
101
+ fourcc = cv2.VideoWriter_fourcc(*'mp4v')
102
+ out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
103
+ output = {}
104
+ frame_index = 0
105
+ # Create a progress bar
106
+ with tqdm(total=total_frames, desc="Processing video") as pbar:
107
+ while True:
108
+ ret, frame = cap.read()
109
+ if not ret:
110
+ logger.info("End of video or cannot capture the frame.")
111
+ break
112
+
113
+ if frame_index % frame_sample_rate == 0: # Only analyze every nth frame
114
+ try:
115
+ result = DeepFace.analyze(frame, actions=['emotion'], detector_backend='retinaface',enforce_detection=False)
116
+ except Exception as e:
117
+ logger.error(f"Error analyzing frame {frame_index}: {e}")
118
+ output[frame_index] = {}
119
+ out.write(frame) # Write the original frame
120
+ frame_index += 1
121
+ pbar.update(1)
122
+ continue # Skip to the next frame
123
+ tmp = {}
124
+ for face in result:
125
+ x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h']
126
+ emotion = face['dominant_emotion']
127
+ emotion_scores = face['emotion']
128
+ tmp[(x, y, w, h)] = {'emotion': emotion, 'score': emotion_scores[emotion]}
129
+
130
+ cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)
131
+ cv2.putText(frame, f"{emotion} ({emotion_scores[emotion]:.2f})", (x, y - 10),
132
+ cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)
133
+ output[frame_index] = tmp
134
+ out.write(frame) # Write the processed frame
135
+ frame_index += 1
136
+ pbar.update(1) # Update progress bar
137
+ # Release resources
138
+ cap.release()
139
+ out.release()
140
+ # Save the results to a file
141
+ with open('results_video.txt', 'w') as file:
142
+ for frame_num, faces_info in output.items():
143
+ file.write(f"Frame {frame_num} ")
144
+ for face_key, info in faces_info.items():
145
+ file.write(f"{face_key}: {info}\n")
146
+
147
+ logger.info(f"Processed {frame_index} frames.")
148
+ video_json_output = calculate_emotion_percentages('results_video.txt')
149
+ print(video_json_output)
150
+ return video_json_output
151
+
152
+ def process_image(image_path):
153
+ image = cv2.imread(image_path)
154
+ if image is None:
155
+ print(f"Error: Unable to load image from path {image_path}")
156
+ return
157
+ try:
158
+ # Analyze the image for face detection and emotion analysis
159
+ result = DeepFace.analyze(image_path, actions=['emotion'], detector_backend='retinaface',enforce_detection=False)
160
+ except Exception as e:
161
+ print(f"Error analyzing image: {e}")
162
+ return image
163
+
164
+ if len(result) == 0:
165
+ print("No faces detected.")
166
+ return image # Return the original image if no faces are detected
167
+
168
+ output = {}
169
+ tmp = {}
170
+ for i, face in enumerate(result):
171
+ # Get bounding box coordinates for each detected face
172
+ x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h']
173
+ # Extract emotion data
174
+ emotion = face['dominant_emotion']
175
+ emotion_scores = face['emotion']
176
+ tmp[i] = {'person':i+1,'emotion': emotion, 'score': f"{emotion_scores[emotion]:.3f}"}
177
+
178
+ # Draw rectangle around face and label with predicted emotion
179
+ cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)
180
+ cv2.putText(image, f"{emotion} ({emotion_scores[emotion]:.3f})", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8,(0, 255, 0), 2)
181
+ output['output'] = tmp
182
+ # Save the processed image with bounding boxes and labels
183
+ output_image_path = 'output_image_with_emotions.jpg'
184
+ cv2.imwrite(output_image_path, image)
185
+ print(f"Processed image saved as {output_image_path}")
186
+ string_image_output = json.dumps(output['output'])
187
+ return string_image_output
188
+
189
+ def calculate_emotion_percentages(file_path):
190
+ emotions = {}
191
+ total_frames = 0
192
+ with open(file_path, 'r') as file:
193
+ for line in file:
194
+ if "{'emotion':" in line:
195
+ total_frames += 1
196
+ emotion = line.split("'emotion': ")[1].split("'")[1]
197
+ emotions[emotion] = emotions.get(emotion, 0) + 1
198
+
199
+ emotion_percentages = [
200
+ {"emotion": emotion, "percentage": (count / total_frames) * 100}
201
+ for emotion, count in emotions.items()
202
+ ]
203
+ return emotion_percentages
204
+
205
+ if __name__ == "__main__":
206
+ logger.info("Starting the Face Emotion Recognition API")
207
+ uvicorn.run(app, host="0.0.0.0", port=7860)