ybbwcwaps
some FakeVD
b0c0f32
from multiprocessing import Pool
from PIL import Image
import sys
# sys.path.append('/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/code_test')
# from imageocr import imageocr
# from Extract_audio import extract_audio
from FakeVD.code_test.VGGish_Feature_Extractor.my_vggish_fun import vggish_audio as extract_audio
from FakeVD.code_test.VGG19_Feature_Extractor.vgg19_feature import load_model_vgg19
# from Imghash import imghash
# from Extract_frame import V2frame
from FakeVD.code_test.Text_Feature_Extractor.wav2text import wav2text
import os
import shutil
# from Sentence_distance import sentence_distance
import time
import json
import cv2
# audio_path = '/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/dataset/1.wav'
# frame_path = ''
# datasave_path = '/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/code_test/Text_Feature/data'
# video_path = '/mnt/data10t/dazuoye/GROUP2024-GEN6/FakeSV/code_test/Text_Feature/videos'
# def get_string_from_list(A):
# if type(A) == list:
# ret = ''
# for i in A:
# ret += get_string_from_list(i)
# return ret
# if type(A) == str:
# return A
# return ""
# ocr_ret = []
# def ocr_frame(frame):
# img_path = os.path.join(frame_path, str(frame)+'.jpg')
# now_ocr_result = imageocr.work(img_path)
# ocr_ret.append({'frame': frame, 'result': now_ocr_result})
# def get_frame(x):
# return x['frame']
# def OCR(path):
# imgH = imghash()
# cropimgH = imghash()
# # 返回结果
# ret = []
# # 当前帧
# now_frame = -15
# # 上一次执行ocr的帧
# last_frame = 0
# # 上一次的ocr结果拼接字符串
# last_ocr_result_string = "ocr at first"
# # 限
# k = 8
# # 历史最高k点
# kmax = 20
# # 匹配标识
# marchflag = False
# while True:
# now_frame += 15
# img_path = os.path.join(frame_path, str(now_frame)+'.jpg')
# if not os.path.exists(img_path):
# print('[OCR all] done', now_frame)
# break
# # 相似度高,无需ocr
# if not imgH.work(Image.open(img_path), k):
# print("continue " + str(now_frame))
# continue
# print('[OCR working] ocr at frame', now_frame)
# # 进行ocr
# now_ocr_result = imageocr.work(img_path, cropimgH)
# # 将识别结果添加
# ret.append({'frame': now_frame, 'result': now_ocr_result})
# # print('[OCR done] ocr at frame', now_frame)
# return ret
# 处理视频
def video_work(w2t, model_vggish, video_file_path, output_path, audio_path='./FakeVD/code_test/preprocessed_feature/1.wav'):
video_file_name = os.path.basename(video_file_path)
video_id = os.path.splitext(video_file_name)[0]
st = time.time()
print('[video_work] working')
# v2f = V2frame()
##################
# w2t = wav2text()
# 分离wav
print('[extract audio]', video_file_path, audio_path)
extract_audio(model_vggish, video_file_path, feature_path="./FakeVD/code_test/preprocessed_feature/1.pkl", audio_file_path=audio_path)
# 分离帧
# print('[extract frame]', path, audio_path)
# v2f.work(path, frame_path)
# 进行wav处理
print('[wav2text] working')
wav_result = w2t.work(audio_path)
#print('[wav_result]', wav_result)
# 进行ocr处理
# ocr_result = OCR(frame_path)
# print('ocr_result', ocr_result)
# cap = cv2.VideoCapture(path)
# fps = cap.get(cv2.CAP_PROP_FPS)
print("time: " + str(time.time() - st))
result = {"video_id": video_id, 'text': wav_result[0]['text']} # , 'ocr_result': ocr_result , "fps": fps
with open(output_path, 'w') as f:
f.write(json.dumps(result, ensure_ascii=False))
# if __name__ == '__main__':
# file = sys.argv[1]
# r = open(file, "r").readlines()
# for name in r:
# name = name.strip()
# data_path = os.path.join(datasave_path, name[:-4]+'.json')
# if os.path.exists(data_path):
# # print("continue " + name)
# continue
# if name == '':
# continue
# path = os.path.join(video_path, name)
# result = video_work(path, name[:-4])
# with open(data_path, 'w') as f:
# f.write(json.dumps(result, ensure_ascii=False))