Spaces:
Build error
Build error
import gradio as gr | |
import ffmpeg | |
from pathlib import Path | |
import os | |
import ast | |
import json | |
import base64 | |
import requests | |
import moviepy.editor as mp | |
API_URL = "https://api-inference.huggingface.co/models/facebook/wav2vec2-base-960h" | |
headers = {"Authorization": "Bearer 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"} | |
#HF_TOKEN = os.environ["HF_TOKEN"] | |
#headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
def generate_transcripts(in_video): #generate_gifs(in_video, gif_transcript): | |
print("********* Inside generate_transcripts() **********") | |
#convert video to audio | |
print(f" input video is : {in_video}") | |
video_path = Path("./ShiaLaBeouf.mp4") | |
audio_memory, _ = ffmpeg.input(video_path).output('-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True) | |
#Getting transcripts using wav2Vec2 huggingface hosted accelerated inference | |
#sending audio file in request along with stride and chunk length information | |
model_response = query_api(audio_memory) | |
#model response has both - transcripts as well as character timestamps or chunks | |
print(f"model_response is : {model_response}") | |
transcription = model_response["text"].lower() | |
chnk = model_response["chunks"] | |
#creating lists from chunks to consume downstream easily | |
timestamps = [[chunk["text"].lower(), chunk["timestamp"][0], chunk["timestamp"][1]] | |
for chunk in chnk] | |
#getting words and word timestamps | |
words, words_timestamp = get_word_timestamps(timestamps) | |
print(f"Total words in the audio transcript is:{len(words)}, transcript word list is :{words}, type of words is :{type(words)} ") | |
print(f"Total Word timestamps derived fromcharacter timestamp are :{len(words_timestamp)}, Word timestamps are :{words_timestamp}") | |
return transcription, words, words_timestamp | |
def generate_gifs(gif_transcript, words, words_timestamp): | |
print("********* Inside generate_gifs() **********") | |
#creating list from input gif transcript | |
gif = "don't let your dreams be dreams" | |
#gif = gif_transcript | |
giflist = gif.split() | |
#getting gif indexes from the generator | |
# Converting string to list | |
words = ast.literal_eval(words) | |
print(f"words is :{words}") | |
print(f"type of words is :{type(words)}") | |
print(f"length of words is :{len(words)}") | |
print(f"giflist is :{giflist}") | |
#print(f"haystack and needle function returns value as : {list(get_gif_word_indexes(words, giflist))}") | |
#indx_tmp = [num for num in get_gif_word_indexes(words, giflist)] | |
#print(f"index temp is : {indx_tmp}") | |
giflist_indxs = list(list(get_gif_word_indexes(words, giflist))[0]) | |
#getting start and end timestamps for a gif video | |
start_seconds, end_seconds = get_gif_timestamps(giflist_indxs, words_timestamp) | |
#generated .gif image | |
gen_moviepy_gif(start_seconds, end_seconds) | |
#("./gifimage.gif") | |
html_out = "<img src='./gifimage.gif' />" | |
return html_out | |
#calling the hosted model | |
def query_api(audio_bytes: bytes): | |
""" | |
Query for Huggingface Inference API for Automatic Speech Recognition task | |
""" | |
print("********* Inside query_api() **********") | |
payload = json.dumps({ | |
"inputs": base64.b64encode(audio_bytes).decode("utf-8"), | |
"parameters": { | |
"return_timestamps": "char", | |
"chunk_length_s": 10, | |
"stride_length_s": [4, 2] | |
}, | |
"options": {"use_gpu": False} | |
}).encode("utf-8") | |
response = requests.request( | |
"POST", API_URL, headers=headers, data=payload) | |
json_reponse = json.loads(response.content.decode("utf-8")) | |
print(f"json_reponse is :{json_reponse}") | |
return json_reponse | |
#getting word timestamps from character timestamps | |
def get_word_timestamps(timestamps): | |
words, word = [], [] | |
letter_timestamp, word_timestamp, words_timestamp = [], [], [] | |
for idx,entry in enumerate(timestamps): | |
word.append(entry[0]) | |
letter_timestamp.append(entry[1]) | |
if entry[0] == ' ': | |
words.append(''.join(word)) | |
word_timestamp.append(letter_timestamp[0]) | |
word_timestamp.append(timestamps[idx-1][2]) | |
words_timestamp.append(word_timestamp) | |
word, word_timestamp, letter_timestamp = [], [], [] | |
words = [word.strip() for word in words] | |
return words, words_timestamp | |
#getting index of gif words in main transcript | |
def get_gif_word_indexes(total_words_list, gif_words_list): | |
if not gif_words_list: | |
print("THIS IS 1") | |
return | |
# just optimization | |
COUNT=0 | |
lengthgif_words_list = len(gif_words_list) | |
print("THIS IS 2") | |
firstgif_words_list = gif_words_list[0] | |
print("THIS IS 3") | |
print(f"total_words_list is :{total_words_list}") | |
print(f"length of total_words_list is :{len(total_words_list)}") | |
print(f"gif_words_list is :{gif_words_list}") | |
print(f"length of gif_words_list is :{len(gif_words_list)}") | |
for idx, item in enumerate(total_words_list): | |
COUNT+=1 | |
#print("COUNT IS :", COUNT) | |
if item == firstgif_words_list: | |
print("THIS IS 5") | |
if total_words_list[idx:idx+lengthgif_words_list] == gif_words_list: | |
print("THIS IS 6") | |
print(f"value 1 is: {range(idx, idx+lengthgif_words_list)}") | |
print(f"value of tuple is : {tuple(range(idx, idx+lengthgif_words_list))}") | |
yield tuple(range(idx, idx+lengthgif_words_list)) | |
#getting start and end timestamps for gif transcript | |
def get_gif_timestamps(giflist_indxs, words_timestamp): | |
#giflist_indxs = list(list(get_gif_word_indexes(words, giflist))[0]) | |
min_idx = min(giflist_indxs) | |
max_idx = max(giflist_indxs) | |
gif_words_timestamp = words_timestamp[min_idx : max_idx+1] | |
start_seconds, end_seconds = gif_words_timestamp[0][0], gif_words_timestamp[-1][-1] | |
return start_seconds, end_seconds | |
#extracting the video and building and serving a .gif image | |
def gen_moviepy_gif(start_seconds, end_seconds): | |
video_path = Path("./ShiaLaBeouf.mp4") | |
video = mp.VideoFileClip(videopath) | |
final_clip = video.subclip(start_seconds, end_seconds) | |
#final_clip.write_videofile("/content/gdrive/My Drive/AI/videoedit/gif1.mp4") | |
final_clip.write_gif("./gifimage.gif",) | |
final_clip.close() | |
return | |
sample_video = ['./ShiaLaBeouf.mp4'] | |
sample_vid = gr.Video(label='Video file') #for displaying the example | |
examples = gr.components.Dataset(components=[sample_vid], samples=[sample_video], type='values') | |
demo = gr.Blocks() | |
with demo: | |
with gr.Row(): | |
input_video = gr.Video(label="Upload a Video", visible=True) #for incoming video | |
text_transcript = gr.Textbox(label="Transcripts", lines = 10, interactive = True ) #to generate and display transcriptions for input video | |
text_words = gr.Textbox(visible=False) | |
text_wordstimestamps = gr.Textbox(visible=False) | |
text_gif_transcript = gr.Textbox(label="Transcripts", placeholder="Copy paste transcripts here to create GIF image" , lines = 3, interactive = True ) #to copy paste required gif transcript | |
out_gif = gr.HTML(label="Generated GIF from transcript selected", show_label=True) | |
examples.render() | |
def load_examples(video): #to load sample video into input_video upon clicking on it | |
print("****** inside load_example() ******") | |
print("in_video is : ", video[0]) | |
return video[0] | |
examples.click(load_examples, examples, input_video) | |
with gr.Row(): | |
button_transcript = gr.Button("Generate transcripts") | |
button_gifs = gr.Button("Create Gif") | |
#def load_gif(): | |
# print("****** inside load_gif() ******") | |
# #created embedding width='560' height='315' | |
# html_out = "<img src='./gifimage.gif' />" | |
# print(f"html output is : {html_out}") | |
# return | |
button_transcript.click(generate_transcripts, input_video, [text_transcript, text_words, text_wordstimestamps ]) | |
button_gifs.click(generate_gifs, [text_gif_transcript, text_words, text_wordstimestamps], out_gif ) | |
demo.launch(debug=True) |