ViDove / pipeline.py
worldqwq
Added auto retry
ce7a58b
raw
history blame
8.89 kB
import openai
from pytube import YouTube
import argparse
import os
from tqdm import tqdm
from SRT import SRT_script
import stable_whisper
parser = argparse.ArgumentParser()
parser.add_argument("--link", help="youtube video link here", default=None, type=str, required=False)
parser.add_argument("--video_file", help="local video path here", default=None, type=str, required=False)
parser.add_argument("--audio_file", help="local audio path here", default=None, type=str, required=False)
parser.add_argument("--srt_file", help="srt file input path here", default=None, type=str, required=False) # New argument
parser.add_argument("--download", help="download path", default='./downloads', type=str, required=False)
parser.add_argument("--output_dir", help="translate result path", default='./results', type=str, required=False)
parser.add_argument("--video_name", help="video name, if use video link as input, the name will auto-filled by youtube video name", default='placeholder', type=str, required=False)
parser.add_argument("--model_name", help="model name only support text-davinci-003 and gpt-3.5-turbo", type=str, required=False, default="gpt-3.5-turbo")
parser.add_argument("-only_srt", help="set script output to only .srt file", action='store_true')
parser.add_argument("-v", help="auto encode script with video", action='store_true')
args = parser.parse_args()
# input should be either video file or youtube video link.
if args.link is None and args.video_file is None and args.srt_file is None:
print("need video source or srt file")
exit()
# set up
openai.api_key = os.getenv("OPENAI_API_KEY")
DOWNLOAD_PATH = args.download
if not os.path.exists(DOWNLOAD_PATH):
os.mkdir(DOWNLOAD_PATH)
os.mkdir(f'{DOWNLOAD_PATH}/audio')
os.mkdir(f'{DOWNLOAD_PATH}/video')
RESULT_PATH = args.output_dir
if not os.path.exists(RESULT_PATH):
os.mkdir(RESULT_PATH)
VIDEO_NAME = args.video_name
model_name = args.model_name
# get source audio
if args.link is not None and args.video_file is None:
# Download audio from YouTube
video_link = args.link
video = None
audio = None
try:
yt = YouTube(video_link)
video = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
if video:
video.download(f'{DOWNLOAD_PATH}/video')
print('Video download completed!')
else:
print("Error: Video stream not found")
audio = yt.streams.filter(only_audio=True, file_extension='mp4').first()
if audio:
audio.download(f'{DOWNLOAD_PATH}/audio')
print('Audio download completed!')
else:
print("Error: Audio stream not found")
except Exception as e:
print("Connection Error")
print(e)
exit()
video_path = f'{DOWNLOAD_PATH}/video/{video.default_filename}'
audio_path = '{}/audio/{}'.format(DOWNLOAD_PATH, audio.default_filename)
audio_file = open(audio_path, "rb")
if VIDEO_NAME == 'placeholder':
VIDEO_NAME = audio.default_filename.split('.')[0]
elif args.video_file is not None:
# Read from local
video_path = args.video_file
if args.audio_file is not None:
audio_file= open(args.audio_file, "rb")
audio_path = args.audio_file
else:
os.system(f'ffmpeg -i {args.video_file} -f mp3 -ab 192000 -vn {DOWNLOAD_PATH}/audio/{VIDEO_NAME}.mp3')
audio_file= open(f'{DOWNLOAD_PATH}/audio/{VIDEO_NAME}.mp3', "rb")
audio_path = f'{DOWNLOAD_PATH}/audio/{VIDEO_NAME}.mp3'
if not os.path.exists(f'{RESULT_PATH}/{VIDEO_NAME}'):
os.mkdir(f'{RESULT_PATH}/{VIDEO_NAME}')
# Instead of using the script_en variable directly, we'll use script_input
srt_file_en = args.srt_file
if srt_file_en is not None:
srt = SRT_script.parse_from_srt_file(srt_file_en)
else:
# using whisper to perform speech-to-text and save it in <video name>_en.txt under RESULT PATH.
srt_file_en = "{}/{}/{}_en.srt".format(RESULT_PATH, VIDEO_NAME, VIDEO_NAME)
if not os.path.exists(srt_file_en):
# use OpenAI API for transcribe
# transcript = openai.Audio.transcribe("whisper-1", audio_file)
# use local whisper model
# model = whisper.load_model("base") # using base model in local machine (may use large model on our server)
# transcript = model.transcribe(audio_path)
# use stable-whisper
model = stable_whisper.load_model('base')
transcript = model.transcribe(audio_path, regroup = False)
(
transcript
.split_by_punctuation(['.', '。', '?'])
.merge_by_gap(.15, max_words=3)
.merge_by_punctuation([' '])
.split_by_punctuation(['.', '。', '?'])
)
# transcript.to_srt_vtt(srt_file_en)
transcript = transcript.to_dict()
srt = SRT_script(transcript['segments']) # read segments to SRT class
#Write SRT file
# from whisper.utils import WriteSRT
# with open(srt_file_en, 'w', encoding="utf-8") as f:
# writer = WriteSRT(RESULT_PATH)
# writer.write_result(transcript, f)
else:
srt = SRT_script.parse_from_srt_file(srt_file_en)
# srt preprocess
srt.form_whole_sentence()
srt.spell_check_term()
srt.correct_with_force_term()
srt.write_srt_file_src(srt_file_en)
script_input = srt.get_source_only()
if not args.only_srt:
from srt2ass import srt2ass
assSub_en = srt2ass(srt_file_en, "default", "No", "Modest")
print('ASS subtitle saved as: ' + assSub_en)
# Split the video script by sentences and create chunks within the token limit
def script_split(script_in, chunk_size = 1000):
script_split = script_in.split('\n\n')
script_arr = []
range_arr = []
start = 1
end = 0
script = ""
for sentence in script_split:
if len(script) + len(sentence) + 1 <= chunk_size:
script += sentence + '\n\n'
end+=1
else:
range_arr.append((start, end))
start = end+1
end += 1
script_arr.append(script.strip())
script = sentence + '\n\n'
if script.strip():
script_arr.append(script.strip())
range_arr.append((start, len(script_split)-1))
assert len(script_arr) == len(range_arr)
return script_arr, range_arr
script_arr, range_arr = script_split(script_input)
def get_response(model_name):
if model_name == "gpt-3.5-turbo":
# print(s + "\n")
response = openai.ChatCompletion.create(
model=model_name,
messages = [
{"role": "system", "content": "You are a helpful assistant that translates English to Chinese and have decent background in starcraft2."},
{"role": "system", "content": "Your translation has to keep the orginal format and be as accurate as possible."},
{"role": "system", "content": "There is no need for you to add any comments or notes."},
{"role": "user", "content": 'Translate the following English text to Chinese: "{}"'.format(s)}
],
temperature=0.15
)
return response['choices'][0]['message']['content'].strip()
if model_name == "text-davinci-003":
prompt = f"Please help me translate this into Chinese:\n\n{s}\n\n"
# print(prompt)
response = openai.Completion.create(
model=model_name,
prompt=prompt,
temperature=0.1,
max_tokens=2000,
top_p=1.0,
frequency_penalty=0.0,
presence_penalty=0.0
)
return response['choices'][0]['text'].strip()
pass
# Translate and save
for s, range in tqdm(zip(script_arr, range_arr)):
# using chatgpt model
print(f"now translating sentences {range}")
flag = True
while flag:
flag = False
try:
translate = get_response(model_name)
except Exception as e:
print("An error has occurred during translation:",e)
print("Retrying...")
flag = True
srt.set_translation(translate, range)
srt.check_len_and_split()
srt.write_srt_file_translate(f"{RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}_zh.srt")
srt.write_srt_file_bilingual(f"{RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}_bi.srt")
if not args.only_srt:
assSub_zh = srt2ass(f"{RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}_zh.srt", "default", "No", "Modest")
print('ASS subtitle saved as: ' + assSub_zh)
if args.v:
if args.only_srt:
os.system(f'ffmpeg -i {video_path} -vf "subtitles={RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}_zh.srt" {RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}.mp4')
else:
os.system(f'ffmpeg -i {video_path} -vf "subtitles={RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}_zh.ass" {RESULT_PATH}/{VIDEO_NAME}/{VIDEO_NAME}.mp4')