File size: 4,323 Bytes
527d5e6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import gradio as gr

#final
import gradio as gr
#import json
#from difflib import Differ
import ffmpeg
#import os
from pathlib import Path
#import time

API_URL = "https://api-inference.huggingface.co/models/facebook/wav2vec2-base-960h"
headers = {"Authorization": "Bearer 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"}

#convert video to audio
video_path = Path("/content/gdrive/My Drive/AI/videoedit/ShiaLaBeouf.mp4")
audio_memory, _ = ffmpeg.input(video_path).output('-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True)

#calling the hosted model
def query_api(audio_bytes: bytes):
    """
    Query for Huggingface Inference API for Automatic Speech Recognition task
    """
    payload = json.dumps({
        "inputs": base64.b64encode(audio_bytes).decode("utf-8"),
        "parameters": {
            "return_timestamps": "char",
            "chunk_length_s": 10,
            "stride_length_s": [4, 2]
        },
        "options": {"use_gpu": False}
    }).encode("utf-8")

    response = requests.request(
        "POST", API_URL, headers=headers, data=payload)
    json_reponse = json.loads(response.content.decode("utf-8"))
    return json_reponse

#Getting transcripts using wav2Vec2 huggingface hosted accelerated inference
#sending audio file in request along with stride and chunk length information
model_response = query_api(audio_memory)

#model response has both - transcripts as well as character timestamps or chunks
transcription = model_response["text"].lower()
chnk = model_response["chunks"]

#creating lists from chunks to consume downstream easily
timestamps = [[chunk["text"].lower(), chunk["timestamp"][0], chunk["timestamp"][1]]
          for chunk in chnk]


#getting word timestams from character timestamps
def get_word_timestamps(timestamps): 
  words, word = [], []
  letter_timestamp, word_timestamp, words_timestamp = [], [], []
  for idx,entry in enumerate(timestamps):
    word.append(entry[0])
    letter_timestamp.append(entry[1])
    if entry[0] == ' ':
      words.append(''.join(word))
      word_timestamp.append(letter_timestamp[0])
      word_timestamp.append(timestamps[idx-1][2])
      words_timestamp.append(word_timestamp)
      word, word_timestamp, letter_timestamp = [], [], []

  words = [word.strip() for word in words]
  return words, words_timestamp

words, words_timestamp = get_word_timestamps(timestamps)
#words = [word.strip() for word in words]

print(f"Total words in the audio transcript is:{len(words)}, transcript word list is :{words}")
print(f"Total Word timestamps derived fromcharacter timestamp are :{len(words_timestamp)}, Word timestamps are :{words_timestamp}")

#creating list from input gif transcript 
gif = "don't let your dreams be dreams"
giflist = gif.split()

#getting index of gif words in main transcript
def get_gif_word_indexes(total_words_list, gif_words_list):  
    if not gif_words_list:
        return
    # just optimization
    lengthgif_words_list = len(gif_words_list)
    firstgif_words_list = gif_words_list[0]
    for idx, item in enumerate(total_words_list):
        if item == firstgif_words_list:
            if total_words_list[idx:idx+lengthgif_words_list] == gif_words_list:
                yield tuple(range(idx, idx+lengthgif_words_list))

#getting gif indexes from the generator
giflist_indxs = list(list(get_gif_word_indexes(words, giflist))[0])

#getting start and end timestamps for gif transcript
def get_gif_timestamps(giflist_indxs):
  #giflist_indxs = list(list(get_gif_word_indexes(words, giflist))[0])
  min_idx = min(giflist_indxs)
  max_idx = max(giflist_indxs)

  gif_words_timestamp = words_timestamp[min_idx : max_idx+1]
  start_seconds, end_seconds = gif_words_timestamp[0][0], gif_words_timestamp[-1][-1]
  return start_seconds, end_seconds

#getting start and end timestamps for a gif video
start_seconds, end_seconds = get_gif_timestamps(giflist_indxs)

#extracting the  video and building and serving a .gif image
def generate_gif(start_seconds, end_seconds):
  final_clip = video.subclip(start_seconds, end_seconds)
  #final_clip.write_videofile("/content/gdrive/My Drive/AI/videoedit/gif1.mp4")
  final_clip.write_gif("/content/gdrive/My Drive/AI/videoedit/gif1.gif",)
  final_clip.close()
  return 

generate_gif(start_seconds, end_seconds)