Spaces:
Sleeping
Sleeping
import os | |
HF_TOKEN = os.environ["HF_TOKEN"] | |
import re | |
import string | |
import torch | |
import spaces | |
import gradio as gr | |
import json | |
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline | |
from transformers import BitsAndBytesConfig | |
from pytube import YouTube | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from llama_index.llms.groq import Groq | |
from llama_index.core.llms import ChatMessage | |
#Whisper-Small | |
quantization_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_use_double_quant=True, | |
bnb_4bit_quant_type="nf4", | |
bnb_4bit_compute_dtype=torch.float16) | |
torch_dtype =torch.float16 | |
model_id = "openai/whisper-small" | |
model = AutoModelForSpeechSeq2Seq.from_pretrained( | |
model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True, quantization_config=quantization_config,) | |
processor = AutoProcessor.from_pretrained(model_id) | |
pipe = pipeline( | |
"automatic-speech-recognition", | |
model=model, | |
tokenizer=processor.tokenizer, | |
feature_extractor=processor.feature_extractor, | |
max_new_tokens=128, | |
chunk_length_s=30, | |
batch_size=16, | |
return_timestamps=True, | |
torch_dtype=torch_dtype) | |
def chunk_it(yt_link): | |
match = re.search(r"v=([^&]+)", yt_link) | |
if match: | |
video_id = match.group(1) | |
#Youtube_Audio_Extraction | |
yt = YouTube(yt_link) | |
stream= yt.streams.filter(only_audio=True).first() | |
path=stream.download() | |
#Youtube_Transcript | |
srt = YouTubeTranscriptApi.get_transcript(video_id) | |
#Whisper-Small_inference | |
result = pipe(path, return_timestamps="word") | |
#LLaMa3-8B Extraction | |
llm = Groq(model="llama3-8b-8192", api_key="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") | |
messages = [ | |
ChatMessage( | |
role="system", content='''You are provided with the audio-to-text transcript of a YouTube video. Your task is to IDENTIFY AND EXTRACT, WITHOUT ANY MODIFICATION, the most valuable and informative phrases from the transcript. This includes retaining ORIGINAL punctuation, capitalization, and wording. The goal is to create a list of verbatim excerpts that distill the essence of the video's content. There should NOT be any sort of modification done to the text, simply copy and paste it here!! | |
Example:- If you found this line/phrase to be important "we want to basically make generative ai available and accessible to the people in the country and that's that's the intent and when we said that we want to do this there was a resonance in the investment community". Then, the output should include "we want to basically make generative ai available and accessible to the people in the country and that's that's the intent and when we said that we want to do this there was a resonance in the investment community" exactly without any alteration.''' | |
), | |
ChatMessage(role="user", content=result["text"].lower()),] | |
resp = llm.chat(messages) | |
chat_response_dict = resp.raw | |
choice_obj = chat_response_dict['choices'][0] | |
message_dict = choice_obj.to_dict()['message'] | |
content = message_dict['content'] | |
#Chunking | |
def create_semantic_chunks(word_timing_data, extracted_text, max_chunk_length=30.0): | |
output_chunks = [] | |
chunk_id = 1 | |
for phrase in extracted_text: | |
phrase_words = phrase.lower().split() | |
dp = [[0] * (len(word_timing_data) + 1) for _ in range(len(phrase_words) + 1)] | |
for i in range(1, len(phrase_words) + 1): | |
for j in range(1, len(word_timing_data) + 1): | |
if phrase_words[i - 1] == word_timing_data[j - 1]['text'].lower(): | |
dp[i][j] = dp[i - 1][j - 1] + 1 | |
else: | |
dp[i][j] = max(dp[i - 1][j], dp[i][j - 1]) | |
i, j = len(phrase_words), len(word_timing_data) | |
match = [] | |
while i > 0 and j > 0: | |
if phrase_words[i - 1] == word_timing_data[j - 1]['text'].lower(): | |
match.append(word_timing_data[j - 1]) | |
i -= 1 | |
j -= 1 | |
elif dp[i - 1][j] > dp[i][j - 1]: | |
i -= 1 | |
else: | |
j -= 1 | |
match.reverse() | |
start_time = match[0]['timestamp'][0] | |
chunk_end_time = match[0]['timestamp'][1] | |
chunk_text = '' | |
for data in match: | |
chunk_text += data['text'] + ' ' | |
if data['timestamp'][1] - start_time > max_chunk_length: | |
output_chunks.append({ | |
"chunk_id": chunk_id, | |
"chunk_length": round(data['timestamp'][1] - start_time, 2), | |
"text": chunk_text.strip(), | |
"start_time": start_time, | |
"end_time": data['timestamp'][1], | |
}) | |
chunk_id += 1 | |
start_time = data['timestamp'][1] | |
chunk_text = '' | |
chunk_end_time = max(chunk_end_time, data['timestamp'][1]) | |
if chunk_text: | |
output_chunks.append({ | |
"chunk_id": chunk_id, | |
"chunk_length": round(chunk_end_time - start_time, 2), | |
"text": chunk_text.strip(), | |
"start_time": start_time, | |
"end_time": chunk_end_time, | |
}) | |
chunk_id += 1 | |
return output_chunks | |
transcript_text = content | |
pattern = r'"(.*?)"' | |
extracted_phrases = re.findall(pattern, transcript_text) | |
word_timing_data=result['chunks'].copy() | |
def preprocess_extracted_text(extracted_phrases): | |
"""Preprocesses extracted_text (lowercase, no punctuation).""" | |
processed_text = [] | |
translator = str.maketrans('', '', string.punctuation) # Create translation table | |
for sentence in extracted_phrases: | |
sentence = sentence.lower().translate(translator) # Remove punctuation | |
processed_text.append(sentence) | |
return processed_text | |
def preprocess_word_timing_data(word_timing_data): | |
"""Preprocesses word_timing_data (lowercase, no punctuation, no spaces).""" | |
processed_data = [] | |
translator = str.maketrans('', '', string.punctuation + ' ') # Include space in translation | |
for data in word_timing_data: | |
word = data['text'].lower().translate(translator) | |
processed_data.append({'text': word, 'timestamp': data['timestamp']}) | |
return processed_data | |
#Final_Stage | |
processed_phrases = preprocess_extracted_text(extracted_phrases) | |
processed_data = preprocess_word_timing_data(word_timing_data) | |
final_output=create_semantic_chunks(processed_data, processed_phrases, max_chunk_length=30.0) | |
return final_output | |
iface=gr.Interface(fn=chunk_it, | |
inputs="text", | |
outputs="text", | |
title="Youtube Chunker", | |
) | |
iface.launch(inline=False) |