File size: 10,303 Bytes
c92afc1
 
 
 
 
 
 
 
 
 
 
 
 
 
bf13b5b
 
c92afc1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed8a3b7
c92afc1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed8a3b7
c92afc1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed8a3b7
c92afc1
 
 
 
 
 
 
 
 
ed8a3b7
c92afc1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5f19ebb
c92afc1
5f19ebb
 
 
c92afc1
5f19ebb
 
 
c92afc1
5f19ebb
c92afc1
5f19ebb
 
c92afc1
5f19ebb
c92afc1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed8a3b7
0e5164a
c92afc1
 
 
ed8a3b7
c92afc1
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
import gradio as gr
import ffmpeg
from pathlib import Path
import os
import ast
import json
import base64 
import requests
import moviepy.editor as mp
from PIL import Image, ImageSequence
import cv2


API_URL = "https://api-inference.huggingface.co/models/facebook/wav2vec2-base-960h"
#HF_TOKEN = os.environ["HF_TOKEN"]
#headers = {"Authorization": f"Bearer {HF_TOKEN}"}


def generate_transcripts(in_video): #generate_gifs(in_video, gif_transcript):
    print("********* Inside generate_transcripts() **********")
    #convert video to audio
    print(f" input video is : {in_video}")
    
    #sample
    video_path = Path("./ShiaLaBeouf.mp4")
    audio_memory, _ = ffmpeg.input(in_video).output('-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True)
    #audio_memory, _ = ffmpeg.input(video_path).output('-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True)
    
    #Getting transcripts using wav2Vec2 huggingface hosted accelerated inference
    #sending audio file in request along with stride and chunk length information
    model_response = query_api(audio_memory)
    
    #model response has both - transcripts as well as character timestamps or chunks
    print(f"model_response is : {model_response}")
    transcription = model_response["text"].lower()
    chnk = model_response["chunks"]
    
    #creating lists from chunks to consume downstream easily
    timestamps = [[chunk["text"].lower(), chunk["timestamp"][0], chunk["timestamp"][1]]
              for chunk in chnk]
    
    #getting words and word timestamps
    words, words_timestamp = get_word_timestamps(timestamps)
    print(f"Total words in the audio transcript is:{len(words)}, transcript word list is :{words}, type of words is :{type(words)} ") 
    print(f"Total Word timestamps derived fromcharacter timestamp are :{len(words_timestamp)}, Word timestamps are :{words_timestamp}")
    
    return transcription, words, words_timestamp
    
    
def generate_gifs(in_video, gif_transcript, words, words_timestamp, vid_speed):
    print("********* Inside generate_gifs() **********")
    
    #creating list from input gif transcript 
    #gif = "don't let your dreams be dreams"
    gif = gif_transcript
    #gif = gif_transcript
    giflist = gif.split()
    
    #getting gif indexes from the generator
    # Converting string to list
    words = ast.literal_eval(words)
    words_timestamp = ast.literal_eval(words_timestamp)
    print(f"words is :{words}")
    print(f"type of words is :{type(words)}")
    print(f"length of words is :{len(words)}")
    print(f"giflist is :{giflist}")

    giflist_indxs = list(list(get_gif_word_indexes(words, giflist))[0])
    print(f"giflist_indxs is : {giflist_indxs}")
    #getting start and end timestamps for a gif video
    start_seconds, end_seconds = get_gif_timestamps(giflist_indxs, words_timestamp)
    print(f"start_seconds, end_seconds  are : ({start_seconds}, {end_seconds})")
    #generated .gif image
    #gif_out, vid_out = gen_moviepy_gif(in_video, start_seconds, end_seconds)
    slomo_vid = gen_moviepy_gif(in_video, start_seconds, end_seconds, vid_speed)
    
    return slomo_vid 

    
#calling the hosted model
def query_api(audio_bytes: bytes):
    """
    Query for Huggingface Inference API for Automatic Speech Recognition task
    """
    print("********* Inside query_api() **********")
    payload = json.dumps({
        "inputs": base64.b64encode(audio_bytes).decode("utf-8"),
        "parameters": {
            "return_timestamps": "char",
            "chunk_length_s": 10,
            "stride_length_s": [4, 2]
        },
        "options": {"use_gpu": False}
    }).encode("utf-8")

    response = requests.request(
        "POST", API_URL, headers=headers, data=payload)
    json_reponse = json.loads(response.content.decode("utf-8"))
    print(f"json_reponse is :{json_reponse}")
    return json_reponse


#getting word timestamps from character timestamps
def get_word_timestamps(timestamps): 
  words, word = [], []
  letter_timestamp, word_timestamp, words_timestamp = [], [], []
  for idx,entry in enumerate(timestamps):
    word.append(entry[0])
    letter_timestamp.append(entry[1])
    if entry[0] == ' ':
      words.append(''.join(word))
      word_timestamp.append(letter_timestamp[0])
      word_timestamp.append(timestamps[idx-1][2])
      words_timestamp.append(word_timestamp)
      word, word_timestamp, letter_timestamp = [], [], []

  words = [word.strip() for word in words]
  return words, words_timestamp


#getting index of gif words in main transcript
def get_gif_word_indexes(total_words_list, gif_words_list):  
    if not gif_words_list:
        return
    # just optimization
    COUNT=0
    lengthgif_words_list = len(gif_words_list)
    firstgif_words_list = gif_words_list[0]
    
    print(f"total_words_list is :{total_words_list}")
    print(f"length of total_words_list is :{len(total_words_list)}")
    print(f"gif_words_list is :{gif_words_list}")
    print(f"length of gif_words_list is :{len(gif_words_list)}")
    
    for idx, item in enumerate(total_words_list):
        COUNT+=1
        if item == firstgif_words_list:
            if total_words_list[idx:idx+lengthgif_words_list] == gif_words_list:
                print(f"value of tuple is : {tuple(range(idx, idx+lengthgif_words_list))}")
                yield tuple(range(idx, idx+lengthgif_words_list))


#getting start and end timestamps for gif transcript
def get_gif_timestamps(giflist_indxs, words_timestamp):    
  print(f"******** Inside get_gif_timestamps() **********")
  min_idx = min(giflist_indxs)
  max_idx = max(giflist_indxs)
  print(f"min_idx is :{min_idx}")
  print(f"max_idx is :{max_idx}")
  
  gif_words_timestamp = words_timestamp[min_idx : max_idx+1]
  print(f"words_timestamp is :{words_timestamp}")
  print(f"gif_words_timestamp is :{gif_words_timestamp}")
  
  start_seconds, end_seconds = gif_words_timestamp[0][0], gif_words_timestamp[-1][-1]
  print(f"start_seconds, end_seconds are :{start_seconds},{end_seconds}")
  
  return start_seconds, end_seconds


#extracting the  video and building and serving a .gif image
def gen_moviepy_gif(in_video, start_seconds, end_seconds, vid_speed):
  print("******** inside moviepy_gif () ***************")
  #sample
  video_path = "./ShiaLaBeouf.mp4"
  video = mp.VideoFileClip(in_video) 
  #video = mp.VideoFileClip(video_path) 
  
  final_clip = video.subclip(start_seconds, end_seconds)
  
  #slowmo
  slomo_clip = video.subclip(mp.vfx.speedx, vid_speed)
  slomo_clip.write_videofile("slomo.mp4")
  
  #writing to RAM
  final_clip.write_gif("gifimage.gif") #, program='ffmpeg', tempfiles=True, fps=15, fuzz=3)
  final_clip.write_videofile("gifimage.mp4")
  final_clip.close()
  #reading in a variable
  gif_img = mp.VideoFileClip("gifimage.gif")
  #gif_vid = mp.VideoFileClip("gifimage.mp4")
  #im = Image.open("gifimage.gif")
  #vid_cap = cv2.VideoCapture('gifimage.mp4')
  return "slomo.mp4" #"gifimage.gif", "gifimage.mp4" #im, gif_img, gif_vid, vid_cap,  #"gifimage.mp4"


sample_video = ['./ShiaLaBeouf.mp4']
sample_vid = gr.Video(label='Video file')  #for displaying the example
examples = gr.components.Dataset(components=[sample_vid], samples=[sample_video], type='values')


demo = gr.Blocks()

with demo:
    gr.Markdown("""# **Watch a part of your video in SloMo or in Timelapse!** """)
    gr.Markdown("""
    ### Editing your video using ASR pipeline..
    
    A Space by [Yuvraj Sharma](https://huggingface.co/ysharma). 
        
    **Motivation and background:** In this Gradio BLocks Party Space, I am trying to - 
    - Provide a capability to slow down your video 
    - Timelapse your video 
    
    **How To Use:** 1. Upload a video or simply click on the sample provided here. 
    2. Then click on 'Generate transcripts' button and first textbox will display the extract Transcript from the audio associated with your sample.
    3. Clip the text from transcript or type transcripts manually in the second Textbox provided.
    4. A slowed down or timelapsed version of your video will get generated on the right hand side ! 
    
    Hope you have fun using this 😀
    """)
    
    with gr.Row():
        #for incoming video
        input_video = gr.Video(label="Upload a Video", visible=True)  
        #to generate and display transcriptions for input video
        text_transcript = gr.Textbox(label="Transcripts", lines = 10, interactive = True )
        
        #Just to move dgata between function hence keeping visible false
        text_words = gr.Textbox(visible=False)
        text_wordstimestamps = gr.Textbox(visible=False)
        
        #to copy paste required gif transcript / or to populate by itslef on pressing the button
        text_gif_transcript = gr.Textbox(label="Transcripts", placeholder="Copy paste transcripts here to create GIF image" , lines = 3, interactive = True ) 
        
        def load_gif_text(text):
            print("****** inside load_gif_text() ******")
            print("text for gif is : ", text)
            return text
             
        text_transcript.change(load_gif_text, text_transcript, text_gif_transcript )
        
        #out_gif = gr.Image(label="Generated GIF image") 
        out_slomo_vid = gr.Video(label="Generated GIF image")         
        
    with gr.Row():
        button_transcript = gr.Button("Generate transcripts")
        button_gifs = gr.Button("Create Gif")
        
    with gr.Row():
        #to render video example on mouse hover/click        
        examples.render()
        #to load sample video into input_video upon clicking on it
        def load_examples(video):  
            print("****** inside load_example() ******")
            print("in_video is : ", video[0])
            return video[0]
        
        examples.click(load_examples, examples, input_video) 
        
        vid_speed = gr.Slider(0.9,0.1, step=0.1)
        
        
    button_transcript.click(generate_transcripts, input_video, [text_transcript, text_words, text_wordstimestamps ])
    button_gifs.click(generate_gifs, [input_video, text_gif_transcript, text_words, text_wordstimestamps, vid_speed], out_slomo_vid )
    
   
demo.launch(debug=True)