from multiprocessing import Pool from PIL import Image import sys # sys.path.append('/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/code_test') # from imageocr import imageocr # from Extract_audio import extract_audio from FakeVD.code_test.VGGish_Feature_Extractor.my_vggish_fun import vggish_audio as extract_audio from FakeVD.code_test.VGG19_Feature_Extractor.vgg19_feature import load_model_vgg19 # from Imghash import imghash # from Extract_frame import V2frame from FakeVD.code_test.Text_Feature_Extractor.wav2text import wav2text import os import shutil # from Sentence_distance import sentence_distance import time import json import cv2 # audio_path = '/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/dataset/1.wav' # frame_path = '' # datasave_path = '/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/code_test/Text_Feature/data' # video_path = '/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/code_test/Text_Feature/videos' # def get_string_from_list(A): # if type(A) == list: # ret = '' # for i in A: # ret += get_string_from_list(i) # return ret # if type(A) == str: # return A # return "" # ocr_ret = [] # def ocr_frame(frame): # img_path = os.path.join(frame_path, str(frame)+'.jpg') # now_ocr_result = imageocr.work(img_path) # ocr_ret.append({'frame': frame, 'result': now_ocr_result}) # def get_frame(x): # return x['frame'] # def OCR(path): # imgH = imghash() # cropimgH = imghash() # # 返回结果 # ret = [] # # 当前帧 # now_frame = -15 # # 上一次执行ocr的帧 # last_frame = 0 # # 上一次的ocr结果拼接字符串 # last_ocr_result_string = "ocr at first" # # 限 # k = 8 # # 历史最高k点 # kmax = 20 # # 匹配标识 # marchflag = False # while True: # now_frame += 15 # img_path = os.path.join(frame_path, str(now_frame)+'.jpg') # if not os.path.exists(img_path): # print('[OCR all] done', now_frame) # break # # 相似度高,无需ocr # if not imgH.work(Image.open(img_path), k): # print("continue " + str(now_frame)) # continue # print('[OCR working] ocr at frame', now_frame) # # 进行ocr # now_ocr_result = imageocr.work(img_path, cropimgH) # # 将识别结果添加 # ret.append({'frame': now_frame, 'result': now_ocr_result}) # # print('[OCR done] ocr at frame', now_frame) # return ret # 处理视频 def video_work(w2t, model_vggish, video_file_path, output_path, audio_path='./FakeVD/code_test/preprocessed_feature/1.wav'): video_file_name = os.path.basename(video_file_path) video_id = os.path.splitext(video_file_name)[0] st = time.time() print('[video_work] working') # v2f = V2frame() ################## # w2t = wav2text() # 分离wav print('[extract audio]', video_file_path, audio_path) extract_audio(model_vggish, video_file_path, feature_path="./FakeVD/code_test/preprocessed_feature/1.pkl", audio_file_path=audio_path) # 分离帧 # print('[extract frame]', path, audio_path) # v2f.work(path, frame_path) # 进行wav处理 print('[wav2text] working') wav_result = w2t.work(audio_path) #print('[wav_result]', wav_result) # 进行ocr处理 # ocr_result = OCR(frame_path) # print('ocr_result', ocr_result) # cap = cv2.VideoCapture(path) # fps = cap.get(cv2.CAP_PROP_FPS) print("time: " + str(time.time() - st)) result = {"video_id": video_id, 'text': wav_result[0]['text']} # , 'ocr_result': ocr_result , "fps": fps with open(output_path, 'w') as f: f.write(json.dumps(result, ensure_ascii=False)) # if __name__ == '__main__': # file = sys.argv[1] # r = open(file, "r").readlines() # for name in r: # name = name.strip() # data_path = os.path.join(datasave_path, name[:-4]+'.json') # if os.path.exists(data_path): # # print("continue " + name) # continue # if name == '': # continue # path = os.path.join(video_path, name) # result = video_work(path, name[:-4]) # with open(data_path, 'w') as f: # f.write(json.dumps(result, ensure_ascii=False))