Spaces:
Build error
Build error
from multiprocessing import Pool | |
from PIL import Image | |
import sys | |
# sys.path.append('/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/code_test') | |
# from imageocr import imageocr | |
# from Extract_audio import extract_audio | |
from FakeVD.code_test.VGGish_Feature_Extractor.my_vggish_fun import vggish_audio as extract_audio | |
from FakeVD.code_test.VGG19_Feature_Extractor.vgg19_feature import load_model_vgg19 | |
# from Imghash import imghash | |
# from Extract_frame import V2frame | |
from FakeVD.code_test.Text_Feature_Extractor.wav2text import wav2text | |
import os | |
import shutil | |
# from Sentence_distance import sentence_distance | |
import time | |
import json | |
import cv2 | |
# audio_path = '/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/dataset/1.wav' | |
# frame_path = '' | |
# datasave_path = '/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/code_test/Text_Feature/data' | |
# video_path = '/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/code_test/Text_Feature/videos' | |
# def get_string_from_list(A): | |
# if type(A) == list: | |
# ret = '' | |
# for i in A: | |
# ret += get_string_from_list(i) | |
# return ret | |
# if type(A) == str: | |
# return A | |
# return "" | |
# ocr_ret = [] | |
# def ocr_frame(frame): | |
# img_path = os.path.join(frame_path, str(frame)+'.jpg') | |
# now_ocr_result = imageocr.work(img_path) | |
# ocr_ret.append({'frame': frame, 'result': now_ocr_result}) | |
# def get_frame(x): | |
# return x['frame'] | |
# def OCR(path): | |
# imgH = imghash() | |
# cropimgH = imghash() | |
# # 返回结果 | |
# ret = [] | |
# # 当前帧 | |
# now_frame = -15 | |
# # 上一次执行ocr的帧 | |
# last_frame = 0 | |
# # 上一次的ocr结果拼接字符串 | |
# last_ocr_result_string = "ocr at first" | |
# # 限 | |
# k = 8 | |
# # 历史最高k点 | |
# kmax = 20 | |
# # 匹配标识 | |
# marchflag = False | |
# while True: | |
# now_frame += 15 | |
# img_path = os.path.join(frame_path, str(now_frame)+'.jpg') | |
# if not os.path.exists(img_path): | |
# print('[OCR all] done', now_frame) | |
# break | |
# # 相似度高,无需ocr | |
# if not imgH.work(Image.open(img_path), k): | |
# print("continue " + str(now_frame)) | |
# continue | |
# print('[OCR working] ocr at frame', now_frame) | |
# # 进行ocr | |
# now_ocr_result = imageocr.work(img_path, cropimgH) | |
# # 将识别结果添加 | |
# ret.append({'frame': now_frame, 'result': now_ocr_result}) | |
# # print('[OCR done] ocr at frame', now_frame) | |
# return ret | |
# 处理视频 | |
def video_work(w2t, model_vggish, video_file_path, output_path, audio_path='./FakeVD/code_test/preprocessed_feature/1.wav'): | |
video_file_name = os.path.basename(video_file_path) | |
video_id = os.path.splitext(video_file_name)[0] | |
st = time.time() | |
print('[video_work] working') | |
# v2f = V2frame() | |
################## | |
# w2t = wav2text() | |
# 分离wav | |
print('[extract audio]', video_file_path, audio_path) | |
extract_audio(model_vggish, video_file_path, feature_path="./FakeVD/code_test/preprocessed_feature/1.pkl", audio_file_path=audio_path) | |
# 分离帧 | |
# print('[extract frame]', path, audio_path) | |
# v2f.work(path, frame_path) | |
# 进行wav处理 | |
print('[wav2text] working') | |
wav_result = w2t.work(audio_path) | |
#print('[wav_result]', wav_result) | |
# 进行ocr处理 | |
# ocr_result = OCR(frame_path) | |
# print('ocr_result', ocr_result) | |
# cap = cv2.VideoCapture(path) | |
# fps = cap.get(cv2.CAP_PROP_FPS) | |
print("time: " + str(time.time() - st)) | |
result = {"video_id": video_id, 'text': wav_result[0]['text']} # , 'ocr_result': ocr_result , "fps": fps | |
with open(output_path, 'w') as f: | |
f.write(json.dumps(result, ensure_ascii=False)) | |
# if __name__ == '__main__': | |
# file = sys.argv[1] | |
# r = open(file, "r").readlines() | |
# for name in r: | |
# name = name.strip() | |
# data_path = os.path.join(datasave_path, name[:-4]+'.json') | |
# if os.path.exists(data_path): | |
# # print("continue " + name) | |
# continue | |
# if name == '': | |
# continue | |
# path = os.path.join(video_path, name) | |
# result = video_work(path, name[:-4]) | |
# with open(data_path, 'w') as f: | |
# f.write(json.dumps(result, ensure_ascii=False)) | |