| import os
|
| import cv2
|
| import json
|
| import glob
|
| import pickle
|
| import shutil
|
| import subprocess
|
| from typing import List, Optional
|
| from cog import BasePredictor, BaseModel, Input, Path
|
|
|
|
|
| class Output(BaseModel):
|
| media_path: Optional[List[Path]]
|
| json_str: Optional[str]
|
|
|
|
|
| class Predictor(BasePredictor):
|
| def setup(self):
|
| pass
|
|
|
| def predict(
|
| self,
|
| video: Path = Input(description="Path to the video"),
|
| face_det_scale: float = Input(
|
| default=0.25,
|
| description="Scale factor for face detection, the frames will be scaled to 0.25 of the original",
|
| ge=0,
|
| le=1,
|
| ),
|
| min_track: int = Input(
|
| default=10, description="Number of min frames for each shot"
|
| ),
|
| num_failed_det: int = Input(
|
| default=10,
|
| description="Number of missed detections allowed before tracking is stopped",
|
| ge=1,
|
| ),
|
| min_face_size: int = Input(
|
| default=1, description="Minimum face size in pixels", ge=1
|
| ),
|
| crop_scale: float = Input(
|
| default=0.40, description="Scale bounding box", ge=0, le=1
|
| ),
|
| start: int = Input(default=0, description="The start time of the video", ge=0),
|
| duration: int = Input(
|
| default=-1,
|
| description="The duration of the video, when set as -1, will extract the whole video",
|
| ),
|
| return_json: bool = Input(
|
| description="Return results in json format", default=True
|
| ),
|
| return_boundingbox_percentages: bool = Input(
|
| description="Return bounding box coordinates as percentages of the video width and height",
|
| default=False,
|
| ),
|
| ) -> Output:
|
|
|
| video_path = str(video)
|
| video_name = os.path.splitext(os.path.basename(video_path))[0]
|
| video_folder = "demo"
|
|
|
|
|
| shutil.rmtree(video_folder, ignore_errors=True)
|
| os.makedirs(video_folder, exist_ok=True)
|
|
|
|
|
| target_video_path = os.path.join(video_folder, os.path.basename(video_path))
|
| shutil.copy(video_path, target_video_path)
|
|
|
| duration = max(0, duration)
|
| n_data_loader_thread = 32
|
|
|
|
|
| command = (
|
| f"python demoTalkNet.py --videoName {video_name} "
|
| f"--videoFolder {video_folder} "
|
| f"--pretrainModel pretrain_TalkSet.model "
|
| f"--nDataLoaderThread {n_data_loader_thread} "
|
| f"--facedetScale {face_det_scale} "
|
| f"--minTrack {min_track} "
|
| f"--numFailedDet {num_failed_det} "
|
| f"--minFaceSize {min_face_size} "
|
| f"--cropScale {crop_scale} "
|
| f"--start {start} "
|
| f"--duration {duration} "
|
| )
|
|
|
| process = subprocess.Popen(
|
| command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
| )
|
| stdout, stderr = process.communicate()
|
| print(f"Command output: {stdout.decode()}")
|
| if stderr:
|
| print(f"Command errors: {stderr.decode()}")
|
|
|
|
|
| pywork_folders = glob.glob(os.path.join(video_folder, "*", "pywork"))
|
| latest_pywork_folder = max(pywork_folders, key=os.path.getctime)
|
|
|
|
|
| tracks_file = os.path.join(latest_pywork_folder, "tracks.pckl")
|
| scores_file = os.path.join(latest_pywork_folder, "scores.pckl")
|
| with open(tracks_file, "rb") as f:
|
| face_tracks = pickle.load(f)
|
| with open(scores_file, "rb") as f:
|
| scores = pickle.load(f)
|
|
|
|
|
| video = cv2.VideoCapture(target_video_path)
|
| video_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
|
| video_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
| video.release()
|
|
|
|
|
| output_data = []
|
| for track_idx, track in enumerate(face_tracks):
|
|
|
| frames = track["track"]["frame"]
|
|
|
|
|
| boxes = track["proc_track"]
|
|
|
|
|
|
|
| speaking_scores = scores[track_idx] if track_idx < len(scores) else []
|
|
|
| for i, frame in enumerate(frames):
|
|
|
|
|
| if i >= len(boxes["x"]) or i >= len(boxes["y"]) or i >= len(boxes["s"]):
|
| break
|
|
|
|
|
| x0 = int(boxes["x"][i] - boxes["s"][i])
|
| y0 = int(boxes["y"][i] - boxes["s"][i])
|
| x1 = int(boxes["x"][i] + boxes["s"][i])
|
| y1 = int(boxes["y"][i] + boxes["s"][i])
|
|
|
|
|
| if return_boundingbox_percentages:
|
| x0 /= video_width
|
| y0 /= video_height
|
| x1 /= video_width
|
| y1 /= video_height
|
|
|
|
|
| speaking = (
|
| bool(speaking_scores[i] >= 0) if i < len(speaking_scores) else False
|
| )
|
|
|
|
|
| box = {
|
| "face_id": track_idx,
|
| "x0": x0,
|
| "y0": y0,
|
| "x1": x1,
|
| "y1": y1,
|
| "speaking": speaking,
|
| }
|
|
|
|
|
| frame_data = next(
|
| (
|
| data
|
| for data in output_data
|
| if data["frame_number"] == int(frame)
|
| ),
|
| None,
|
| )
|
| if frame_data is None:
|
| frame_data = {"frame_number": int(frame), "faces": []}
|
| output_data.append(frame_data)
|
|
|
|
|
| frame_data["faces"].append(box)
|
|
|
|
|
| json_str = json.dumps(output_data)
|
|
|
| if return_json:
|
| return Output(json_str=json_str)
|
| else:
|
| mp4_files = []
|
| excluded_files = ["video_only.avi", "video.avi"]
|
| avi_files = [
|
| avi_file
|
| for avi_file in Path(video_folder).rglob("*.avi")
|
| if avi_file.name not in excluded_files
|
| ]
|
| for avi_file in avi_files:
|
| mp4_file = avi_file.with_suffix(".mp4")
|
| conversion_command = f"ffmpeg -i {avi_file} {mp4_file}"
|
| conversion_process = subprocess.run(
|
| conversion_command,
|
| shell=True,
|
| stdout=subprocess.PIPE,
|
| stderr=subprocess.PIPE,
|
| )
|
| if conversion_process.returncode == 0:
|
| mp4_files.append(Path(mp4_file))
|
| return Output(media_path=mp4_files)
|
|
|