Spaces:
Running
Running
from openai import AzureOpenAI | |
from langchain_openai import AzureChatOpenAI | |
from huggingface_hub import InferenceClient | |
import os | |
import ffmpeg | |
from typing import List | |
import nltk | |
from gtts import gTTS | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from langchain import HuggingFaceHub, PromptTemplate, LLMChain | |
import gradio as gr | |
from pytube import YouTube | |
import requests | |
import logging | |
import os | |
from pydub import AudioSegment | |
import speech_recognition as sr | |
import torchaudio | |
from speechbrain.inference.classifiers import EncoderClassifier | |
from pydub.silence import split_on_silence | |
from moviepy.editor import VideoFileClip | |
nltk.download('punkt') | |
nltk.download('stopwords') | |
class VideoAnalytics: | |
""" | |
Class for performing analytics on videos including transcription, summarization, topic generation, | |
and extraction of important sentences. | |
""" | |
def __init__(self): | |
""" | |
Initialize the VideoAnalytics object. | |
Args: | |
hf_token (str): Hugging Face API token. | |
""" | |
# Initialize AzureOpenAI client | |
self.client = AzureOpenAI() | |
hf_key = os.getenv("HF_TOKEN") | |
self.mistral_client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1",token=hf_key) | |
# Initialize transcribed text variable | |
self.transcribed_text = "" | |
self.r = sr.Recognizer() | |
self.language_id = EncoderClassifier.from_hparams(source="speechbrain/lang-id-voxlingua107-ecapa", savedir="tmp") | |
# Initialize english text variable | |
self.english_text = "" | |
self.openai_llm = AzureChatOpenAI( | |
deployment_name="ChatGPT", | |
) | |
# Configure logging settings | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
def mp3_to_wav(self, mp3_file: str, wav_file: str) -> str: | |
""" | |
Convert an MP3 audio file to WAV format. | |
Args: | |
mp3_file (str): The path to the input MP3 file. | |
wav_file (str): The path to save the output WAV file. | |
Returns: | |
str: The filename of the converted WAV file. | |
Raises: | |
Exception: If there's an error during the conversion process. | |
""" | |
try: | |
# Load the MP3 file | |
audio = AudioSegment.from_mp3(mp3_file) | |
# Export the audio to WAV format | |
audio.export(wav_file, format="wav") | |
logging.info(f"MP3 file '{mp3_file}' converted to WAV successfully: {wav_file}") | |
return wav_file | |
except Exception as e: | |
# Log the exception and raise it further | |
logging.error(f"Error occurred while converting MP3 to WAV: {e}") | |
raise e | |
def split_audio(self, input_file: str) -> List[AudioSegment]: | |
""" | |
Split an audio file into segments of fixed length. | |
Args: | |
input_file (str): Path to the input audio file. | |
Returns: | |
List[AudioSegment]: List containing segments of the input audio. | |
""" | |
try: | |
# Load the audio file | |
audio = AudioSegment.from_file(input_file) | |
# Define segment length in milliseconds (5 minutes = 300,000 milliseconds) | |
segment_length = 60000 | |
# Split the audio into segments | |
segments = [] | |
for i, start_time in enumerate(range(0, len(audio), segment_length)): | |
# Calculate end time for current segment | |
end_time = start_time + segment_length if start_time + segment_length < len(audio) else len(audio) | |
# Extract segment | |
segment = audio[start_time:end_time] | |
# Append segment to list | |
segments.append(segment) | |
return segments | |
except CouldntDecodeError as e: | |
logging.error(f"Error decoding audio: {e}") | |
return [] | |
# Function to recognize speech in the audio file | |
def transcribe_audio(self,path: str,lang: str): | |
"""Transcribe speech from an audio file.""" | |
try: | |
with sr.AudioFile(path) as source: | |
audio_listened = self.r.record(source) | |
text = self.r.recognize_google(audio_listened,language=lang) | |
return text | |
except sr.UnknownValueError as e: | |
logging.error(f"Speech recognition could not understand audio: {e}") | |
return "" | |
except sr.RequestError as e: | |
logging.error(f"Could not request results from Google Speech Recognition service: {e}") | |
return "" | |
# Function to split the audio file into chunks on silence and apply speech recognition | |
def get_large_audio_transcription_on_silence(self,path: str,lang: str): | |
"""Split the large audio file into chunks and apply speech recognition on each chunk.""" | |
try: | |
sound = AudioSegment.from_file(path) | |
chunks = split_on_silence(sound, min_silence_len=500, silence_thresh=sound.dBFS-14, keep_silence=500) | |
folder_name = "audio-chunks" | |
if not os.path.isdir(folder_name): | |
os.mkdir(folder_name) | |
whole_text = "" | |
for i, audio_chunk in enumerate(chunks, start=1): | |
chunk_filename = os.path.join(folder_name, f"chunk{i}.wav") | |
audio_chunk.export(chunk_filename, format="wav") | |
text = self.transcribe_audio(chunk_filename,lang) | |
if text: | |
text = f"{text.capitalize()}. " | |
logging.info(f"Transcribed {chunk_filename}: {text}") | |
whole_text += text | |
else: | |
logging.warning(f"No speech recognized in {chunk_filename}") | |
return whole_text | |
except Exception as e: | |
logging.error(f"Error processing audio: {e}") | |
return "" | |
def transcribe_video(self, vid: str) -> str: | |
""" | |
Transcribe the audio of the video. | |
Args: | |
vid (str): Path to the video file. | |
Returns: | |
str: Transcribed text. | |
""" | |
try: | |
# Load the video file and extract audio | |
video = VideoFileClip(vid) | |
audio = video.audio | |
# Write audio to a temporary file | |
audio.write_audiofile("output_audio.mp3") | |
# Replace 'input.mp3' and 'output.wav' with your file paths | |
audio_filename = self.mp3_to_wav("output_audio.mp3", 'output.wav') | |
segments = self.split_audio(audio_filename) | |
splitted_audio_filename = segments[0].export("segment_for_1_min.wav",format="wav") | |
# for detect lang | |
signal = self.language_id.load_audio(splitted_audio_filename.name) | |
prediction = self.language_id.classify_batch(signal) | |
lang = [prediction[3][0].split(":")][0][0] | |
text = self.get_large_audio_transcription_on_silence(audio_filename,lang) | |
# Update the transcribed_text attribute with the transcription result | |
self.transcribed_text = text | |
# Update the translation text into english_text | |
self.english_text = self.translation() | |
# Return the transcribed text | |
return text | |
except Exception as e: | |
logging.error(f"Error transcribing video: {e}") | |
return "" | |
def extractive_summary(self,text: str): | |
""" | |
Generate an extractive summary of the input text. | |
Args: | |
text (str): The input text to be summarized. | |
Returns: | |
str: The extractive summary of the input text. | |
""" | |
try: | |
article_text =text | |
# Removing Square Brackets and Extra Spaces | |
article_text = re.sub(r'\[[0-9]*\]', ' ', article_text) | |
article_text = re.sub(r'\s+', ' ', article_text) | |
# Removing special characters and digits | |
formatted_article_text = re.sub('[^a-zA-Z]', ' ', article_text ) | |
formatted_article_text = re.sub(r'\s+', ' ', formatted_article_text) | |
sentence_list = nltk.sent_tokenize(article_text) | |
stopwords = nltk.corpus.stopwords.words('english') | |
word_frequencies = {} | |
for word in nltk.word_tokenize(formatted_article_text): | |
if word not in stopwords: | |
if word not in word_frequencies.keys(): | |
word_frequencies[word] = 1 | |
else: | |
word_frequencies[word] += 1 | |
maximum_frequncy = max(word_frequencies.values()) | |
for word in word_frequencies.keys(): | |
word_frequencies[word] = (word_frequencies[word]/maximum_frequncy) | |
sentence_scores = {} | |
for sent in sentence_list: | |
for word in nltk.word_tokenize(sent.lower()): | |
if word in word_frequencies.keys(): | |
if len(sent.split(' ')) < 30: | |
if sent not in sentence_scores.keys(): | |
sentence_scores[sent] = word_frequencies[word] | |
else: | |
sentence_scores[sent] += word_frequencies[word] | |
import heapq | |
summary_sentences = heapq.nlargest(12, sentence_scores, key=sentence_scores.get) | |
summary = ' '.join(summary_sentences) | |
return summary | |
except Exception as e: | |
logging.error(f"Error occurred during summarization: {e}") | |
return "" | |
def generate_video_summary(self,model) -> str: | |
""" | |
Generate a summary of the transcribe_video. | |
Returns: | |
str: Generated summary. | |
""" | |
try: | |
if model == "OpenAI": | |
# Define a conversation between system and user | |
conversation = [ | |
{"role": "system", "content": "You are a Summarizer"}, | |
{"role": "user", "content": f"""summarize the following text delimited by triple backticks.Output must in english.give me a detailed summary.abstractive summary working be like summary of what about the given text.don't make bullet points write like a passage. | |
In this format of Outputs given below: | |
Abstractive Summary: | |
```{self.english_text}``` | |
"""} | |
] | |
# Generate completion using ChatGPT model | |
response = self.client.chat.completions.create( | |
model="GPT-3", | |
messages=conversation, | |
temperature=0, | |
max_tokens=1000 | |
) | |
# Get the generated summary message | |
message = response.choices[0].message.content | |
return message | |
elif model == "Mixtral": | |
task = "summary" | |
# Generate answer using Mixtral model | |
prompt = f"""<s>[INST]summarize the following text delimited by triple backticks.Output must in english.give me a detailed summary.abstractive summary working be like summary of what about the given text.don't make bullet points write like a passage. | |
In this format of Outputs given below: | |
Abstractive Summary: | |
```data:{self.english_text}```[/INST]""" | |
result = self.generate(prompt) | |
print("self.english_text",self.english_text) | |
return result | |
except Exception as e: | |
logging.error(f"Error generating video summary: {e}") | |
return "" | |
def generate_topics(self,model) -> str: | |
""" | |
Generate topics from the transcribe_video. | |
Returns: | |
str: Generated topics. | |
""" | |
try: | |
if model == "OpenAI": | |
# Define a conversation between system and user | |
conversation = [ | |
{"role": "system", "content": "You are a Topic Generator"}, | |
{"role": "user", "content": f"""generate single Topics from the following text don't make sentence for topic generation,delimited by triple backticks.Output must in english. | |
list out the topics: | |
Topics: | |
```{self.english_text}``` | |
"""} | |
] | |
# Generate completion using ChatGPT model | |
response = self.client.chat.completions.create( | |
model="GPT-3", | |
messages=conversation, | |
temperature=0, | |
max_tokens=1000 | |
) | |
# Get the generated topics message | |
message = response.choices[0].message.content | |
return message | |
elif model == "Mixtral": | |
task = "topics" | |
# Generate answer using Mixtral model | |
prompt = f"""<s>[INST]generate single Topics from the following text don't make sentence for topic generation,delimited by triple backticks.Output must in english. | |
list out the topics: | |
Topics: | |
```data:{self.english_text}```[/INST]""" | |
result = self.generate(prompt) | |
return result | |
except Exception as e: | |
logging.error(f"Error generating topics: {e}") | |
return "" | |
def extract_video_important_sentence(self,model) -> str: | |
""" | |
Extract important sentences from the pdf. | |
Returns: | |
str: Extracted important sentences. | |
""" | |
try: | |
if model == "OpenAI": | |
# Define a conversation between system and user | |
conversation = [ | |
{"role": "system", "content": "You are a Sentence Extracter"}, | |
{"role": "user", "content": f""" Extract Most important of the sentences from text.the text is given in triple backtics. | |
listout the sentences: | |
```{self.english_text}``` | |
"""} | |
] | |
# Generate completion using ChatGPT model | |
response = self.client.chat.completions.create( | |
model="GPT-3", | |
messages=conversation, | |
temperature=0, | |
max_tokens=1000 | |
) | |
# Get the generated topics message | |
message = response.choices[0].message.content | |
return message | |
elif model == "Mixtral": | |
task = "topics" | |
# Generate answer using Mixtral model | |
prompt = f"""<s>[INST] Extract Most important of the sentences from text.the text is given in triple backtics. | |
listout the sentences: | |
```{self.english_text}```[/INST]""" | |
result = self.generate(prompt) | |
return result | |
except Exception as e: | |
logging.error(f"Error Extracting Important Sentence: {e}") | |
return "" | |
def translation(self) -> str: | |
""" | |
translation from the transcribed video. | |
Returns: | |
str: translation. | |
""" | |
try: | |
# Define a conversation between system and user | |
conversation = [ | |
{"role": "system", "content": "You are a Multilingual Translator"}, | |
{"role": "user", "content": f""" Translate the following text in English ,delimited by triple backticks. | |
```{self.transcribed_text}``` | |
"""} | |
] | |
# Generate completion using ChatGPT model | |
response = self.client.chat.completions.create( | |
model="GPT-3", | |
messages=conversation, | |
temperature=0, | |
max_tokens=1000 | |
) | |
# Get the generated topics message | |
message = response.choices[0].message.content | |
return message | |
except Exception as e: | |
logging.error(f"Error generating topics: {e}") | |
return "" | |
def format_prompt(self, question: str, data: str) -> str: | |
""" | |
Formats the prompt for the language model. | |
Args: | |
question (str): The user's question. | |
data (str): The data to be analyzed. | |
Returns: | |
str: Formatted prompt. | |
""" | |
prompt = "<s>" | |
prompt = f"""[INST] you are the german language and universal language expert .your task is analyze the given data and user ask any question about given data answer to the user question.your returning answer must in user's language.otherwise reply i don't know. | |
data:{data} | |
question:{question}[/INST]""" | |
prompt1 = f"[INST] {question} [/INST]" | |
return prompt+prompt1 | |
def generate(self, task: str,temperature=0.9, max_new_tokens=5000, top_p=0.95, | |
repetition_penalty=1.0) -> str: | |
""" | |
Generates text based on the prompt and transcribed text. | |
Args: | |
prompt (str): The prompt for generating text. | |
transcribed_text (str): The transcribed text for analysis. | |
temperature (float): Controls the randomness of the sampling. Default is 0.9. | |
max_new_tokens (int): Maximum number of tokens to generate. Default is 5000. | |
top_p (float): Nucleus sampling parameter. Default is 0.95. | |
repetition_penalty (float): Penalty for repeating the same token. Default is 1.0. | |
Returns: | |
str: Generated text. | |
""" | |
try: | |
temperature = float(temperature) | |
if temperature < 1e-2: | |
temperature = 1e-2 | |
top_p = float(top_p) | |
generate_kwargs = dict( | |
temperature=temperature, | |
max_new_tokens=max_new_tokens, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
do_sample=True, | |
seed=42, | |
) | |
# Generate text using the mistral client | |
stream = self.mistral_client.text_generation(task, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
output = "" | |
# Concatenate generated text | |
for response in stream: | |
output += response.token.text | |
return output.replace("</s>","") | |
except Exception as e: | |
logging.error(f"Error in text generation: {e}") | |
return "An error occurred during text generation." | |
def video_qa(self, question: str, model: str) -> str: | |
""" | |
Performs video question answering. | |
Args: | |
question (str): The question asked by the user. | |
model (str): The language model to be used ("OpenAI" or "Mixtral"). | |
Returns: | |
str: Answer to the user's question. | |
""" | |
try: | |
if model == "OpenAI": | |
template = """you are the universal language expert .your task is analyze the given text and user ask any question about given text answer to the user question.otherwise reply i don't know. | |
english_text:{text} | |
user_question:{question}""" | |
prompt = PromptTemplate(template=template, input_variables=["text","question"]) | |
llm_chain = LLMChain(prompt=prompt, verbose=True, llm=self.openai_llm) | |
# Run the language model chain | |
result = llm_chain.run({"text":self.english_text,"question":question}) | |
return result | |
elif model == "Mixtral": | |
# Generate answer using Mixtral model | |
result = self.generate(question,self.english_text) | |
return result | |
except Exception as e: | |
logging.error(f"Error in video question answering: {e}") | |
return "An error occurred during video question answering." | |
def write_text_files(self, text: str, filename: str) -> None: | |
""" | |
Write text to a file. | |
Args: | |
text (str): Text to be written to the file. | |
filename (str): Name of the file. | |
""" | |
try: | |
file_path = f"{filename}.txt" | |
with open(file_path, 'w') as file: | |
# Write content to the file | |
file.write(text) | |
except Exception as e: | |
logging.error(f"Error writing text to file: {e}") | |
def Download(self, link: str) -> str: | |
""" | |
Download a video from YouTube. | |
Args: | |
link (str): YouTube video link. | |
Returns: | |
str: Path to the downloaded video file. | |
""" | |
try: | |
# Initialize YouTube object with the provided link | |
youtubeObject = YouTube(link) | |
# Get the highest resolution stream | |
youtubeObject = youtubeObject.streams.get_highest_resolution() | |
try: | |
# Attempt to download the video | |
file_name = youtubeObject.download() | |
return file_name | |
except: | |
# Log any errors that occur during video download | |
logging.info("An error has occurred") | |
logging.info("Download is completed successfully") | |
except Exception as e: | |
# Log any errors that occur during initialization of YouTube object | |
logging.error(f"Error downloading video: {e}") | |
return "" | |
def save_audio_with_gtts(self, text: str, filename: str) -> str: | |
""" | |
Generate an audio file from the given text using gTTS and save it. | |
Args: | |
text (str): The text to be converted into speech. | |
filename (str): The filename (including path) to save the audio file. | |
Returns: | |
str: The filename of the saved audio file. | |
Raises: | |
Exception: If there's an error during the conversion or saving process. | |
""" | |
try: | |
tts = gTTS(text=text, lang='en') | |
tts.save(filename) | |
logging.info(f"Audio file saved successfully: {filename}") | |
return filename | |
except Exception as e: | |
# Log the exception and raise it further | |
logging.error(f"Error occurred while saving audio: {e}") | |
raise e | |
def main(self, video: str = None, input_path: str = None,model: str = None) -> tuple: | |
""" | |
Perform video analytics. | |
Args: | |
video (str): Path to the video file. | |
input_path (str): Input path for the video. | |
Returns: | |
tuple: Summary, important sentences, and topics. | |
""" | |
try: | |
# Download the video if input_path is provided, otherwise use the provided video path | |
if input_path: | |
input_path = self.Download(input_path) | |
video_ = VideoFileClip(input_path) | |
duration = video_.duration | |
video_.close() | |
if round(duration) <= 6*600: | |
text = self.transcribe_video(input_path) | |
else: | |
return "Video Duration Above 10 Minutes,Try Below 10 Minutes Video","","",None,None,None | |
elif video: | |
video_ = VideoFileClip(video) | |
duration = video_.duration | |
video_.close() | |
if round(duration) <= 6*600: | |
text = self.transcribe_video(video) | |
input_path = video | |
else: | |
return "Video Duration Above 10 Minutes,Try Below 10 Minutes Video","","",None,None,None | |
overall_summary = "" | |
# Generate summary, important sentences, and topics | |
summary = self.generate_video_summary(model) | |
extractive_summary = self.extractive_summary(self.english_text) | |
overall_summary = summary + "\n\n Extractive Summary: \n\n" + extractive_summary | |
self.write_text_files(overall_summary,"Summary") | |
summary_voice = self.save_audio_with_gtts(overall_summary,"summary.mp3") | |
important_sentences = self.extract_video_important_sentence(model) | |
self.write_text_files(important_sentences,"Important_Sentence") | |
important_sentences_voice = self.save_audio_with_gtts(important_sentences,"important_sentences.mp3") | |
topics = self.generate_topics(model) | |
self.write_text_files(topics,"Topics") | |
topics_voice = self.save_audio_with_gtts(topics,"topics.mp3") | |
# Return the generated summary, important sentences, and topics | |
return overall_summary,important_sentences,topics,summary_voice,important_sentences_voice,topics_voice | |
except Exception as e: | |
# Log any errors that occur during video analytics | |
logging.error(f"Error in main function: {e}") | |
return "", "", "" | |
def gradio_interface(self): | |
with gr.Blocks(css="style.css",theme=gr.themes.Soft()) as demo: | |
gr.HTML("""<center><h1>Video Analytics</h1></center>""") | |
with gr.Row(): | |
with gr.Column(scale=0.70): | |
yt_link = gr.Textbox(label= "Youtube Link",placeholder="https://www.youtube.com/watch?v=") | |
with gr.Column(scale=0.30): | |
model_selection = gr.Dropdown(["OpenAI", "Mixtral"],label="Model",value="model") | |
with gr.Row(): | |
video = gr.Video(sources="upload",height=200,width=300) | |
with gr.Row(): | |
submit_btn = gr.Button(value="Submit") | |
with gr.Tab("Summary"): | |
with gr.Row(): | |
summary = gr.Textbox(show_label=False,lines=10) | |
with gr.Row(): | |
summary_download = gr.DownloadButton(label="Download",value="Summary.txt",visible=True,size='lg',elem_classes="download_button") | |
with gr.Row(): | |
summary_audio = gr.Audio(show_label= False,elem_classes='audio_class') | |
with gr.Tab("Important Sentences"): | |
with gr.Row(): | |
Important_Sentences = gr.Textbox(show_label=False,lines=10) | |
with gr.Row(): | |
sentence_download = gr.DownloadButton(label="Download",value="Important_Sentence.txt",visible=True,size='lg',elem_classes="download_button") | |
with gr.Row(): | |
important_sentence_audio = gr.Audio(show_label = False,elem_classes='audio_class') | |
with gr.Tab("Topics"): | |
with gr.Row(): | |
Topics = gr.Textbox(show_label=False,lines=10) | |
with gr.Row(): | |
topics_download = gr.DownloadButton(label="Download",value="Topics.txt",visible=True,size='lg',elem_classes="download_button") | |
with gr.Row(): | |
topics_audio = gr.Audio(show_label=False,elem_classes='audio_class') | |
with gr.Tab("Video QA"): | |
with gr.Row(): | |
with gr.Column(scale=0.70): | |
question = gr.Textbox(show_label=False,placeholder="Ask Your Questions...") | |
with gr.Column(scale=0.30): | |
model = gr.Dropdown(["OpenAI", "Mixtral"],show_label=False,value="model") | |
with gr.Row(): | |
result = gr.Textbox(label='Answer',lines=10) | |
submit_btn.click(self.main,[video,yt_link,model_selection],[summary,Important_Sentences,Topics,summary_audio,important_sentence_audio,topics_audio]) | |
question.submit(self.video_qa,[question,model],result) | |
demo.launch() | |
if __name__ == "__main__": | |
video_analytics = VideoAnalytics() | |
video_analytics.gradio_interface() |