ysharma's picture
ysharma HF staff
update
9e468d6
import gradio as gr
import ffmpeg
from pathlib import Path
import os
import ast
import json
import base64
import requests
import moviepy.editor as mp
from PIL import Image, ImageSequence
import cv2
API_URL = "https://api-inference.huggingface.co/models/facebook/wav2vec2-base-960h"
HF_TOKEN = os.environ["HF_TOKEN"]
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
video_list = []
def generate_transcripts(in_video): #generate_gifs(in_video, gif_transcript):
print("********* Inside generate_transcripts() **********")
#convert video to audio
print(f" input video is : {in_video}")
#sample
#video_path = Path("./ShiaLaBeouf.mp4")
audio_memory, _ = ffmpeg.input(in_video).output('-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True)
#audio_memory, _ = ffmpeg.input(video_path).output('-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True)
#Getting transcripts using wav2Vec2 huggingface hosted accelerated inference
#sending audio file in request along with stride and chunk length information
model_response = query_api(audio_memory)
#model response has both - transcripts as well as character timestamps or chunks
print(f"model_response is : {model_response}")
transcription = model_response["text"].lower()
chnk = model_response["chunks"]
#creating lists from chunks to consume downstream easily
timestamps = [[chunk["text"].lower(), chunk["timestamp"][0], chunk["timestamp"][1]]
for chunk in chnk]
#getting words and word timestamps
words, words_timestamp = get_word_timestamps(timestamps)
print(f"Total words in the audio transcript is:{len(words)}, transcript word list is :{words}, type of words is :{type(words)} ")
print(f"Total Word timestamps derived fromcharacter timestamp are :{len(words_timestamp)}, Word timestamps are :{words_timestamp}")
return transcription, words, words_timestamp
def generate_gifs(in_video, gif_transcript, words, words_timestamp, vid_speed):
print("********* Inside generate_gifs() **********")
#creating list from input gif transcript
#gif = "don't let your dreams be dreams"
gif = gif_transcript
#gif = gif_transcript
giflist = gif.split()
#getting gif indexes from the generator
# Converting string to list
words = ast.literal_eval(words)
words_timestamp = ast.literal_eval(words_timestamp)
print(f"words is :{words}")
print(f"type of words is :{type(words)}")
print(f"length of words is :{len(words)}")
print(f"giflist is :{giflist}")
giflist_indxs = list(list(get_gif_word_indexes(words, giflist))[0])
print(f"giflist_indxs is : {giflist_indxs}")
#getting start and end timestamps for a gif video
start_seconds, end_seconds = get_gif_timestamps(giflist_indxs, words_timestamp)
print(f"start_seconds, end_seconds are : ({start_seconds}, {end_seconds})")
#generated .gif image
#gif_out, vid_out = gen_moviepy_gif(in_video, start_seconds, end_seconds)
print(f"vid_speed from SLider is : {vid_speed}")
speededit_vids_list, concat_vid = gen_moviepy_gif(in_video, start_seconds, end_seconds, float(vid_speed), video_list)
return concat_vid #speededit_vids_list
#calling the hosted model
def query_api(audio_bytes: bytes):
"""
Query for Huggingface Inference API for Automatic Speech Recognition task
"""
print("********* Inside query_api() **********")
payload = json.dumps({
"inputs": base64.b64encode(audio_bytes).decode("utf-8"),
"parameters": {
"return_timestamps": "char",
"chunk_length_s": 10,
"stride_length_s": [4, 2]
},
"options": {"use_gpu": False}
}).encode("utf-8")
response = requests.request(
"POST", API_URL, headers=headers, data=payload)
json_reponse = json.loads(response.content.decode("utf-8"))
print(f"json_reponse is :{json_reponse}")
return json_reponse
#getting word timestamps from character timestamps
def get_word_timestamps(timestamps):
print("********* inside get_word_timestamps() **************")
words, word = [], []
letter_timestamp, word_timestamp, words_timestamp = [], [], []
for idx,entry in enumerate(timestamps):
word.append(entry[0])
letter_timestamp.append(entry[1])
if entry[0] == ' ':
words.append(''.join(word))
word_timestamp.append(letter_timestamp[0])
word_timestamp.append(timestamps[idx-1][2])
words_timestamp.append(word_timestamp)
word, word_timestamp, letter_timestamp = [], [], []
words = [word.strip() for word in words]
print(f"words created from timestamps are : {words}")
return words, words_timestamp
#getting index of gif words in main transcript
def get_gif_word_indexes(total_words_list, gif_words_list):
if not gif_words_list:
return
# just optimization
COUNT=0
lengthgif_words_list = len(gif_words_list)
firstgif_words_list = gif_words_list[0]
print(f"total_words_list is :{total_words_list}")
print(f"length of total_words_list is :{len(total_words_list)}")
print(f"gif_words_list is :{gif_words_list}")
print(f"length of gif_words_list is :{len(gif_words_list)}")
for idx, item in enumerate(total_words_list):
COUNT+=1
if item == firstgif_words_list:
if total_words_list[idx:idx+lengthgif_words_list] == gif_words_list:
print(f"value of tuple is : {tuple(range(idx, idx+lengthgif_words_list))}")
yield tuple(range(idx, idx+lengthgif_words_list))
#getting start and end timestamps for gif transcript
def get_gif_timestamps(giflist_indxs, words_timestamp):
print(f"******** Inside get_gif_timestamps() **********")
min_idx = min(giflist_indxs)
max_idx = max(giflist_indxs)
print(f"min_idx is :{min_idx}")
print(f"max_idx is :{max_idx}")
gif_words_timestamp = words_timestamp[min_idx : max_idx+1]
print(f"words_timestamp is :{words_timestamp}")
print(f"gif_words_timestamp is :{gif_words_timestamp}")
start_seconds, end_seconds = gif_words_timestamp[0][0], gif_words_timestamp[-1][-1]
print(f"start_seconds, end_seconds are :{start_seconds},{end_seconds}")
return start_seconds, end_seconds
#extracting the video and building and serving a .gif image
def gen_moviepy_gif(in_video, start_seconds, end_seconds, vid_speed, vid_list):
print("******** inside moviepy_gif () ***************")
#sample
#video_path = "./ShiaLaBeouf.mp4"
video = mp.VideoFileClip(in_video)
#video = mp.VideoFileClip(video_path)
leftover_clip_start = video.subclip(0, int(start_seconds) + float("{:.2f}".format(1-start_seconds%1))).without_audio() #float("{:.2f}".format(1-a%1))
final_clip = video.subclip(start_seconds, end_seconds)
tmp = int(end_seconds) + float("{:.2f}".format(1-end_seconds%1))
if tmp < video.duration:
leftover_clip_end = video.subclip(int(end_seconds) + float("{:.2f}".format(1-end_seconds%1)) ).without_audio() #end=None
else:
leftover_clip_end = video.subclip(int(end_seconds)).without_audio()
#slowmo
print(f"vid_speed from calling function is : {vid_speed}")
speededit_clip = final_clip.fx(mp.vfx.speedx, vid_speed)
speededit_clip = speededit_clip.without_audio()
#concat
concatenated_clip = mp.concatenate_videoclips([leftover_clip_start, speededit_clip, leftover_clip_end])
concatenated_clip.write_videofile("concat.mp4")
filename = f"speededit{len(vid_list)}"
print("filename is :",filename)
speededit_clip.write_videofile("speededit.mp4") #(filename)
vid_list.append("speededit.mp4") #(filename)
#might use later
#if len(vid_list) == 1:
# speededit_clip.write_videofile("slomo.mp4")
#elif len(vid_list) == 2:
# speededit_clip.write_videofile("timelapse.mp4")
#writing to RAM - gif and smaller clip
#final_clip.write_gif("gifimage.gif") #, program='ffmpeg', tempfiles=True, fps=15, fuzz=3)
#final_clip.write_videofile("gifimage.mp4")
final_clip.close()
#reading in a variable
#gif_img = mp.VideoFileClip("gifimage.gif")
#gif_vid = mp.VideoFileClip("gifimage.mp4")
#im = Image.open("gifimage.gif")
#vid_cap = cv2.VideoCapture('gifimage.mp4')
return vid_list, "concat.mp4" #"slomo.mp4", "timelapse.mp4", #"gifimage.gif", "gifimage.mp4" #im, gif_img, gif_vid, vid_cap, #"gifimage.mp4"
sample_video = ["olympic100m.mp4"] #[['./ShiaLaBeouf.mp4']]
sample_vid = gr.Video(label='Video file') #for displaying the example
examples = gr.components.Dataset(components=[sample_vid], samples=[sample_video], type='values')
demo = gr.Blocks()
with demo:
gr.Markdown("""# **Watch your video in SloMo or in Timelapse!** """)
gr.Markdown("""
### Editing your video using ASR pipeline..
A Space by [Yuvraj Sharma](https://huggingface.co/ysharma).
**Background:** In this Gradio BLocks Party Space, I am trying to -
- Provide a capability to slow down your video
- Timelapse your video
**How To Use:** 1. Upload a video or simply click on the sample provided here.
2. Then click on 'Generate transcripts' button and first textbox will display the extract Transcript from the audio associated with your sample.
3. Clip the text from transcript or type transcripts manually in the second Textbox provided.
4. A slowed down or timelapsed version of your video will get generated on the right hand side !
Hope you have fun using this πŸ˜€
""")
with gr.Row():
#for incoming video
input_video = gr.Video(label="Upload a Video", visible=True)
#to generate and display transcriptions for input video
text_transcript = gr.Textbox(label="Transcripts", lines = 10, interactive = True )
#Just to move data between function hence keeping visible false
text_words = gr.Textbox(visible=False)
text_wordstimestamps = gr.Textbox(visible=False)
with gr.Row():
button_transcript = gr.Button("Generate transcripts")
#For SlowMo
with gr.Row():
#to copy paste required gif transcript / or to populate by itself on pressing the button
text_slomo_transcript = gr.Textbox(label="Transcripts", placeholder="Copy paste transcripts here to create SlowMo Video" , lines = 5, interactive = True )
def load_slomo_text(text):
print("****** inside load_slomo_text() ******")
print("text for slomo video is : ", text)
return text
text_transcript.change(load_slomo_text, text_transcript, text_slomo_transcript )
#out_gif = gr.Image(label="Generated GIF image")
out_slomo_vid = gr.Video(label="Generated SlowMo Video")
with gr.Row():
#button_transcript = gr.Button("Generate transcripts")
vid_speed_slomo = gr.Slider(0.1,0.9, step=0.1)
button_slomo = gr.Button("Create SloMo")
#For TimeLapse
with gr.Row():
#to copy paste required gif transcript / or to populate by itself on pressing the button
text_timelapse_transcript = gr.Textbox(label="Transcripts", placeholder="Copy paste transcripts here to create GIF image" , lines = 5) #, interactive = True )
def load_timelapse_text(text):
print("****** inside load_timelapse_text() ******")
print("text for timelapse video is : ", text)
return text
text_transcript.change(load_timelapse_text, text_transcript, text_timelapse_transcript )
#out_gif = gr.Image(label="Generated GIF image")
out_timelapse_vid = gr.Video(label="Generated TimeLapse Video")
with gr.Row():
#button_transcript = gr.Button("Generate transcripts")
vid_speed_timelapse = gr.Slider(1,2, step=0.25)
button_timelapse = gr.Button("Create TimeLapse")
with gr.Row():
#to render video example on mouse hover/click
examples.render()
#to load sample video into input_video upon clicking on it
def load_examples(video):
print("****** inside load_example() ******")
print("in_video is : ", video[0])
return video[0]
examples.click(load_examples, examples, input_video)
#vid_speed = gr.Slider(0.1,0.9, step=0.1)
button_transcript.click(generate_transcripts, input_video, [text_transcript, text_words, text_wordstimestamps ])
button_slomo.click(generate_gifs, [input_video, text_slomo_transcript, text_words, text_wordstimestamps, vid_speed_slomo], out_slomo_vid )
button_timelapse.click(generate_gifs, [out_slomo_vid, text_timelapse_transcript, text_words, text_wordstimestamps, vid_speed_timelapse], out_timelapse_vid )
demo.launch(debug=True)