online_teaching / app.py
jonyondlin
Add Gradio app and requirements
8d199d1
raw
history blame
9.56 kB
import os
import cv2
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt
from rmn import RMN
import gradio as gr
def process_video(video_path, share_screen_mode):
# 初始化目录
output_dir = 'output'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 初始化表情检测模型
print("Initializing emotion detection model...")
m = RMN()
# 打开视频文件
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps * 1) # 每秒处理一帧
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"Total frames: {total_frames}, FPS: {fps}")
# 创建视频写入器
output_video_path = os.path.join(output_dir, 'output_video.avi')
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))
current_frame = 0
# 面部ID和表情数据
face_ids = []
max_face_id = 0
face_emotions = defaultdict(list)
max_faces = 0
initial_faces = []
last_detections = {}
print("Starting video processing...")
while True:
ret, frame = cap.read()
if not ret:
print("Finished processing video.")
break
if share_screen_mode:
# 裁剪右侧1/5区域
x_start = int(frame_width * 4 / 5)
frame_to_process = frame[:, x_start:]
else:
frame_to_process = frame.copy()
x_start = 0 # 无偏移
if current_frame % frame_interval == 0:
print(f"Processing frame {current_frame}...")
# 检测面部
detections = m.detect_faces(frame_to_process)
print(f"Detected {len(detections)} faces.")
# 更新最大面部计数
if len(detections) > max_faces:
max_faces = len(detections)
for det in detections:
xmin = det['xmin']
ymin = det['ymin']
xmax = det['xmax']
ymax = det['ymax']
matched_id = None
max_iou = 0
# 与现有面部进行比较
for face in initial_faces:
ixmin, iymin, ixmax, iymax = face['bbox']
# 计算IoU
xx1 = max(xmin, ixmin)
yy1 = max(ymin, iymin)
xx2 = min(xmax, ixmax)
yy2 = min(ymax, iymax)
inter_area = max(0, xx2 - xx1) * max(0, yy2 - yy1)
area1 = (xmax - xmin) * (ymax - ymin)
area2 = (ixmax - ixmin) * (iymax - iymin)
iou = inter_area / float(area1 + area2 - inter_area + 1e-5)
if iou > 0.3 and iou > max_iou:
matched_id = face['id']
max_iou = iou
if matched_id is None:
if len(initial_faces) < max_faces:
# 创建新的面部ID
matched_id = max_face_id
max_face_id += 1
initial_faces.append({'id': matched_id, 'bbox': (xmin, ymin, xmax, ymax)})
else:
# 基于距离匹配
cx = (xmin + xmax) / 2
cy = (ymin + ymax) / 2
min_dist = float('inf')
for face in initial_faces:
fx = (face['bbox'][0] + face['bbox'][2]) / 2
fy = (face['bbox'][1] + face['bbox'][3]) / 2
dist = np.sqrt((cx - fx) ** 2 + (cy - fy) ** 2)
if dist < min_dist:
min_dist = dist
matched_id = face['id']
# 更新面部边界框
for face in initial_faces:
if face['id'] == matched_id:
face['bbox'] = (xmin, ymin, xmax, ymax)
break
# 获取表情标签
face_img = frame_to_process[ymin:ymax, xmin:xmax]
if face_img.size == 0:
continue
emo_label, _, _ = m.detect_emotion_for_single_face_image(face_img)
if emo_label not in ['neutral', 'happy']:
emo_label = 'confused'
# 记录表情
face_emotions[matched_id].append((current_frame / fps, emo_label))
print(f"Face {matched_id} emotion: {emo_label}")
# 更新最后的检测结果,调整坐标到原始帧
xmin_global = xmin + x_start
xmax_global = xmax + x_start
last_detections[matched_id] = (xmin_global, ymin, xmax_global, ymax, emo_label)
# 在原始帧上绘制最后的检测结果
for face_id, (xmin, ymin, xmax, ymax, emo_label) in last_detections.items():
cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
cv2.putText(frame, f"ID:{face_id} {emo_label}", (xmin, ymin + 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2)
# 将处理后的帧写入输出视频
out.write(frame)
current_frame += 1
cap.release()
out.release()
print("Finished processing video.")
# 返回输出视频路径和面部表情数据
return output_video_path, face_emotions
def generate_graphs(selected_ids, face_emotions):
# 将selected_ids从字符串转换为整数
selected_ids = [int(face_id) for face_id in selected_ids]
selected_face_emotions = {face_id: emotions for face_id, emotions in face_emotions.items() if face_id in selected_ids}
output_dir = 'output'
emotion_labels = ['confused', 'neutral', 'happy']
# 生成表情变化图
plt.figure(figsize=(15, 10))
for i, (face_id, emotions) in enumerate(selected_face_emotions.items(), 1):
times = [t for t, _ in emotions]
labels = [emotion_labels.index(emo) for _, emo in emotions]
plt.subplot(len(selected_face_emotions), 1, i)
plt.plot(times, labels, marker='o')
plt.title(f"Emotion changes for face {face_id}")
plt.xlabel('Time (s)')
plt.ylabel('Emotion')
plt.yticks([0, 1, 2], emotion_labels)
plt.tight_layout()
graph_path = os.path.join(output_dir, "selected_faces_emotions.png")
plt.savefig(graph_path)
plt.close()
print("Saved emotion change graph for selected faces.")
# 生成表情比例图
time_points = sorted(set(t for emotions in selected_face_emotions.values() for t, _ in emotions))
emotion_counts_over_time = {t: defaultdict(int) for t in time_points}
for emotions in selected_face_emotions.values():
for t, emo in emotions:
emotion_counts_over_time[t][emo] += 1
emotion_proportions_over_time = {t: {emo: 0 for emo in emotion_labels} for t in time_points}
for t in time_points:
total_faces = sum(emotion_counts_over_time[t].values())
if total_faces > 0:
for emo in emotion_labels:
emotion_proportions_over_time[t][emo] = emotion_counts_over_time[t][emo] / total_faces
plt.figure(figsize=(15, 10))
for i, emo in enumerate(emotion_labels, 1):
proportions = [emotion_proportions_over_time[t][emo] for t in time_points]
plt.subplot(len(emotion_labels), 1, i)
plt.plot(time_points, proportions, marker='o')
plt.title(f"Proportion of {emo} over time")
plt.xlabel('Time (s)')
plt.ylabel('Proportion')
plt.ylim(0, 1)
plt.tight_layout()
emotion_proportions_path = os.path.join(output_dir, "selected_emotion_proportions_over_time.png")
plt.savefig(emotion_proportions_path)
plt.close()
print("Saved emotion proportion graph for selected faces.")
return graph_path, emotion_proportions_path
# Gradio Interface
with gr.Blocks() as demo:
gr.Markdown("# Emotion Detection in Videos")
video_input = gr.Video(label="Upload a video")
share_screen_checkbox = gr.Checkbox(label="Turn on share mode", value=False)
process_btn = gr.Button("Process Video")
video_output = gr.Video(label="Processed Video Output")
# 状态,用于保存面部表情数据
face_emotions_state = gr.State()
# 多选框,列出检测到的ID
id_checkbox_group = gr.CheckboxGroup(label="Select Face IDs")
generate_graphs_btn = gr.Button("Generate Graphs")
graph_output = gr.Image(label="Emotion Change Graph")
emotion_proportions_output = gr.Image(label="Emotion Proportions Graph")
def process_and_get_ids(video, share_screen_mode):
video_output_path, face_emotions = process_video(video, share_screen_mode)
face_ids = [str(face_id) for face_id in face_emotions.keys()]
return video_output_path, gr.update(choices=face_ids), face_emotions
process_btn.click(
fn=process_and_get_ids,
inputs=[video_input, share_screen_checkbox],
outputs=[video_output, id_checkbox_group, face_emotions_state]
)
generate_graphs_btn.click(
fn=generate_graphs,
inputs=[id_checkbox_group, face_emotions_state],
outputs=[graph_output, emotion_proportions_output]
)
demo.launch(share=True)