import os import sys os.system('git clone https://github.com/facebookresearch/av_hubert.git') os.chdir('/home/user/app/av_hubert') os.system('git submodule init') os.system('git submodule update') os.chdir('/home/user/app/av_hubert/fairseq') os.system('pip install ./') os.system('pip install scipy') os.system('pip install sentencepiece') os.system('pip install python_speech_features') os.system('pip install scikit-video') os.system('pip install transformers') os.system('pip install gradio==3.12') os.system('pip install numpy==1.23.3') # sys.path.append('/home/user/app/av_hubert') sys.path.append('/home/user/app/av_hubert/avhubert') print(sys.path) print(os.listdir()) print(sys.argv, type(sys.argv)) sys.argv.append('dummy') import dlib, cv2, os import numpy as np import skvideo import skvideo.io from tqdm import tqdm from preparation.align_mouth import landmarks_interpolate, crop_patch, write_video_ffmpeg from base64 import b64encode import torch import cv2 import tempfile from argparse import Namespace import fairseq from fairseq import checkpoint_utils, options, tasks, utils from fairseq.dataclass.configs import GenerationConfig from huggingface_hub import hf_hub_download import gradio as gr from pytube import YouTube # os.chdir('/home/user/app/av_hubert/avhubert') user_dir = "/home/user/app/av_hubert/avhubert" utils.import_user_module(Namespace(user_dir=user_dir)) data_dir = "/home/user/app/video" # ckpt_path = hf_hub_download('vumichien/AV-HuBERT', 'model.pt') face_detector_path = "/home/user/app/mmod_human_face_detector.dat" face_predictor_path = "/home/user/app/shape_predictor_68_face_landmarks.dat" mean_face_path = "/home/user/app/20words_mean_face.npy" mouth_roi_path = "/home/user/app/roi.mp4" output_video_path = "/home/user/app/video/và/test" modalities = ["video"] gen_subset = "test" gen_cfg = GenerationConfig(beam=20) models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task([ckpt_path]) models = [model.eval().cuda() if torch.cuda.is_available() else model.eval() for model in models] saved_cfg.task.modalities = modalities saved_cfg.task.data = data_dir saved_cfg.task.label_dir = data_dir task = tasks.setup_task(saved_cfg.task) generator = task.build_generator(models, gen_cfg) def get_youtube(video_url): yt = YouTube(video_url) abs_video_path = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download() print("Success download video") print(abs_video_path) return abs_video_path import dlib, cv2, os import numpy as np import skvideo import skvideo.io from tqdm import tqdm from preparation.align_mouth import landmarks_interpolate, crop_patch, write_video_ffmpeg from IPython.display import HTML from base64 import b64encode import numpy as np def convert_bgr2gray(data): # np.stack(배열_1, 배열_2, axis=0): 지정한 axis를 완전히 새로운 axis로 생각 return np.stack([cv2.cvtColor(_, cv2.COLOR_BGR2GRAY) for _ in data], axis=0) def save2npz(filename, data=None): """save2npz. :param filename: str, the fileanme where the data will be saved. :param data: ndarray, arrays to save to the file. """ assert data is not None, "data is {}".format(data) if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) np.savez_compressed(filename, data=data) def detect_landmark(image, detector, predictor): gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) face_locations = detector(gray, 1) coords = None for (_, face_location) in enumerate(face_locations): if torch.cuda.is_available(): rect = face_location.rect else: rect = face_location shape = predictor(gray, rect) coords = np.zeros((68, 2), dtype=np.int32) for i in range(0, 68): coords[i] = (shape.part(i).x, shape.part(i).y) return coords def preprocess_video(input_video_path): if torch.cuda.is_available(): detector = dlib.cnn_face_detection_model_v1(face_detector_path) else: detector = dlib.get_frontal_face_detector() predictor = dlib.shape_predictor(face_predictor_path) STD_SIZE = (256, 256) mean_face_landmarks = np.load(mean_face_path) stablePntsIDs = [33, 36, 39, 42, 45] videogen = skvideo.io.vread(input_video_path) frames = np.array([frame for frame in videogen]) landmarks = [] for frame in tqdm(frames): landmark = detect_landmark(frame, detector, predictor) landmarks.append(landmark) preprocessed_landmarks = landmarks_interpolate(landmarks) rois = crop_patch(input_video_path, preprocessed_landmarks, mean_face_landmarks, stablePntsIDs, STD_SIZE, window_margin=12, start_idx=48, stop_idx=68, crop_height=96, crop_width=96) rois_gray=convert_bgr2gray(rois) save2npz(output_video_path, data=rois_gray) write_video_ffmpeg(rois, mouth_roi_path, "/usr/bin/ffmpeg") return mouth_roi_path def predict(process_video): os.chdir('/home/user/app') return os.system('bash TestVisual.sh') # ---- Gradio Layout ----- youtube_url_in = gr.Textbox(label="Youtube url", lines=1, interactive=True) video_in = gr.Video(label="Input Video", mirror_webcam=False, interactive=True) video_out = gr.Video(label="Audio Visual Video", mirror_webcam=False, interactive=True) demo = gr.Blocks() demo.encrypt = False text_output = gr.Textbox() with demo: # gr.Markdown(''' #
#

Speech Recognition from Visual Lip Movement by Audio-Visual Hidden Unit BERT Model (AV-HuBERT)

# This space uses AV-HuBERT models from Meta Research to recoginze the speech from Lip Movement 🤗 #
# Audio-Visual Speech Recognition #
Speech Recognition from visual lip movement #
#
#
# ''') # with gr.Row(): # gr.Markdown(''' # ### Reading Lip movement with youtube link using Avhubert # ##### Step 1a. Download video from youtube (Note: the length of video should be less than 10 seconds if not it will be cut and the face should be stable for better result) # ##### Step 1b. You also can upload video directly # ##### Step 2. Generating landmarks surrounding mouth area # ##### Step 3. Reading lip movement. # ''') with gr.Row(): gr.Markdown(''' ### You can test by following examples: ''') examples = gr.Examples(examples= [ "https://www.youtube.com/watch?v=ZXVDnuepW2s", "https://www.youtube.com/watch?v=X8_glJn1B8o", "https://www.youtube.com/watch?v=80yqL2KzBVw"], label="Examples", inputs=[youtube_url_in]) with gr.Column(): youtube_url_in.render() download_youtube_btn = gr.Button("Download Youtube video") download_youtube_btn.click(get_youtube, [youtube_url_in], [ video_in]) print(video_in) with gr.Row(): video_in.render() video_out.render() with gr.Row(): detect_landmark_btn = gr.Button("Phát hiện mốc/cắt môi") detect_landmark_btn.click(preprocess_video, [video_in], [ video_out]) predict_btn = gr.Button("Dự đoán") predict_btn.click(predict, [video_out], [ text_output]) with gr.Row(): # video_lip = gr.Video(label="Audio Visual Video", mirror_webcam=False) text_output.render() demo.launch(debug=True)