import gradio as gr #final import gradio as gr #import json #from difflib import Differ import ffmpeg #import os from pathlib import Path #import time API_URL = "https://api-inference.huggingface.co/models/facebook/wav2vec2-base-960h" #headers = {"Authorization": "Bearer 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"} HF_TOKEN = os.environ["HF_TOKEN"] headers = {"Authorization": f"Bearer {HF_TOKEN}"} #convert video to audio video_path = Path("./ShiaLaBeouf.mp4") audio_memory, _ = ffmpeg.input(video_path).output('-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True) #calling the hosted model def query_api(audio_bytes: bytes): """ Query for Huggingface Inference API for Automatic Speech Recognition task """ payload = json.dumps({ "inputs": base64.b64encode(audio_bytes).decode("utf-8"), "parameters": { "return_timestamps": "char", "chunk_length_s": 10, "stride_length_s": [4, 2] }, "options": {"use_gpu": False} }).encode("utf-8") response = requests.request( "POST", API_URL, headers=headers, data=payload) json_reponse = json.loads(response.content.decode("utf-8")) return json_reponse #Getting transcripts using wav2Vec2 huggingface hosted accelerated inference #sending audio file in request along with stride and chunk length information model_response = query_api(audio_memory) #model response has both - transcripts as well as character timestamps or chunks transcription = model_response["text"].lower() chnk = model_response["chunks"] #creating lists from chunks to consume downstream easily timestamps = [[chunk["text"].lower(), chunk["timestamp"][0], chunk["timestamp"][1]] for chunk in chnk] #getting word timestams from character timestamps def get_word_timestamps(timestamps): words, word = [], [] letter_timestamp, word_timestamp, words_timestamp = [], [], [] for idx,entry in enumerate(timestamps): word.append(entry[0]) letter_timestamp.append(entry[1]) if entry[0] == ' ': words.append(''.join(word)) word_timestamp.append(letter_timestamp[0]) word_timestamp.append(timestamps[idx-1][2]) words_timestamp.append(word_timestamp) word, word_timestamp, letter_timestamp = [], [], [] words = [word.strip() for word in words] return words, words_timestamp words, words_timestamp = get_word_timestamps(timestamps) #words = [word.strip() for word in words] print(f"Total words in the audio transcript is:{len(words)}, transcript word list is :{words}") print(f"Total Word timestamps derived fromcharacter timestamp are :{len(words_timestamp)}, Word timestamps are :{words_timestamp}") #creating list from input gif transcript gif = "don't let your dreams be dreams" giflist = gif.split() #getting index of gif words in main transcript def get_gif_word_indexes(total_words_list, gif_words_list): if not gif_words_list: return # just optimization lengthgif_words_list = len(gif_words_list) firstgif_words_list = gif_words_list[0] for idx, item in enumerate(total_words_list): if item == firstgif_words_list: if total_words_list[idx:idx+lengthgif_words_list] == gif_words_list: yield tuple(range(idx, idx+lengthgif_words_list)) #getting gif indexes from the generator giflist_indxs = list(list(get_gif_word_indexes(words, giflist))[0]) #getting start and end timestamps for gif transcript def get_gif_timestamps(giflist_indxs): #giflist_indxs = list(list(get_gif_word_indexes(words, giflist))[0]) min_idx = min(giflist_indxs) max_idx = max(giflist_indxs) gif_words_timestamp = words_timestamp[min_idx : max_idx+1] start_seconds, end_seconds = gif_words_timestamp[0][0], gif_words_timestamp[-1][-1] return start_seconds, end_seconds #getting start and end timestamps for a gif video start_seconds, end_seconds = get_gif_timestamps(giflist_indxs) #extracting the video and building and serving a .gif image def generate_gif(start_seconds, end_seconds): final_clip = video.subclip(start_seconds, end_seconds) #final_clip.write_videofile("/content/gdrive/My Drive/AI/videoedit/gif1.mp4") final_clip.write_gif("/content/gdrive/My Drive/AI/videoedit/gif1.gif",) final_clip.close() return generate_gif(start_seconds, end_seconds)