Spaces:
Running
Running
from openai import AzureOpenAI | |
from langchain_openai import AzureChatOpenAI | |
from huggingface_hub import InferenceClient | |
import os | |
import ffmpeg | |
from typing import List | |
from moviepy.editor import VideoFileClip | |
import nltk | |
from gtts import gTTS | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from langchain import HuggingFaceHub, PromptTemplate, LLMChain | |
import gradio as gr | |
from pytube import YouTube | |
import requests | |
import logging | |
import os | |
from pydub import AudioSegment | |
import speech_recognition as sr | |
nltk.download('punkt') | |
nltk.download('stopwords') | |
class VideoAnalytics: | |
""" | |
Class for performing analytics on videos including transcription, summarization, topic generation, | |
and extraction of important sentences. | |
""" | |
def __init__(self): | |
""" | |
Initialize the VideoAnalytics object. | |
Args: | |
hf_token (str): Hugging Face API token. | |
""" | |
# Initialize AzureOpenAI client | |
self.client = AzureOpenAI() | |
self.mistral_client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") | |
# Initialize transcribed text variable | |
self.transcribed_text = "" | |
self.r = sr.Recognizer() | |
# Initialize english text variable | |
self.english_text = "" | |
self.openai_llm = AzureChatOpenAI( | |
deployment_name="ChatGPT", | |
) | |
# Configure logging settings | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
def mp3_to_wav(self, mp3_file: str, wav_file: str) -> str: | |
""" | |
Convert an MP3 audio file to WAV format. | |
Args: | |
mp3_file (str): The path to the input MP3 file. | |
wav_file (str): The path to save the output WAV file. | |
Returns: | |
str: The filename of the converted WAV file. | |
Raises: | |
Exception: If there's an error during the conversion process. | |
""" | |
try: | |
# Load the MP3 file | |
audio = AudioSegment.from_mp3(mp3_file) | |
# Export the audio to WAV format | |
audio.export(wav_file, format="wav") | |
logging.info(f"MP3 file '{mp3_file}' converted to WAV successfully: {wav_file}") | |
return wav_file | |
except Exception as e: | |
# Log the exception and raise it further | |
logging.error(f"Error occurred while converting MP3 to WAV: {e}") | |
raise e | |
# Function to recognize speech in the audio file | |
def transcribe_audio(self,path): | |
"""Transcribe speech from an audio file.""" | |
try: | |
with sr.AudioFile(path) as source: | |
audio_listened = r.record(source) | |
text = r.recognize_google(audio_listened) | |
return text | |
except sr.UnknownValueError as e: | |
logging.error(f"Speech recognition could not understand audio: {e}") | |
return "" | |
except sr.RequestError as e: | |
logging.error(f"Could not request results from Google Speech Recognition service: {e}") | |
return "" | |
# Function to split the audio file into chunks on silence and apply speech recognition | |
def get_large_audio_transcription_on_silence(self,path): | |
"""Split the large audio file into chunks and apply speech recognition on each chunk.""" | |
try: | |
sound = AudioSegment.from_file(path) | |
chunks = split_on_silence(sound, min_silence_len=500, silence_thresh=sound.dBFS-14, keep_silence=500) | |
folder_name = "audio-chunks" | |
if not os.path.isdir(folder_name): | |
os.mkdir(folder_name) | |
whole_text = "" | |
for i, audio_chunk in enumerate(chunks, start=1): | |
chunk_filename = os.path.join(folder_name, f"chunk{i}.wav") | |
audio_chunk.export(chunk_filename, format="wav") | |
text = self.transcribe_audio(chunk_filename) | |
if text: | |
text = f"{text.capitalize()}. " | |
logging.info(f"Transcribed {chunk_filename}: {text}") | |
whole_text += text | |
else: | |
logging.warning(f"No speech recognized in {chunk_filename}") | |
return whole_text | |
except Exception as e: | |
logging.error(f"Error processing audio: {e}") | |
return "" | |
def transcribe_video(self, vid: str) -> str: | |
""" | |
Transcribe the audio of the video. | |
Args: | |
vid (str): Path to the video file. | |
Returns: | |
str: Transcribed text. | |
""" | |
try: | |
# Load the video file and extract audio | |
video = VideoFileClip(vid) | |
audio = video.audio | |
# Write audio to a temporary file | |
audio.write_audiofile("output_audio.mp3") | |
# Replace 'input.mp3' and 'output.wav' with your file paths | |
audio_filename = self.mp3_to_wav("output_audio.mp3", 'output.wav') | |
text = self.get_large_audio_transcription_on_silence(audio_filename) | |
# Update the transcribed_text attribute with the transcription result | |
self.transcribed_text = text | |
# Update the translation text into english_text | |
self.english_text = self.translation() | |
# Return the transcribed text | |
return text | |
except Exception as e: | |
logging.error(f"Error transcribing video: {e}") | |
return "" | |
def generate_video_summary(self) -> str: | |
""" | |
Generate a summary of the transcribed video. | |
Returns: | |
str: Generated summary. | |
""" | |
try: | |
# Define a conversation between system and user | |
conversation = [ | |
{"role": "system", "content": "You are a Summarizer"}, | |
{"role": "user", "content": f"""summarize the following text delimited by triple backticks.Output must in english. | |
In two format of Outputs given below: | |
Abstractive Summary: | |
Extractive Summary: | |
```{self.english_text}``` | |
"""} | |
] | |
# Generate completion using ChatGPT model | |
response = self.client.chat.completions.create( | |
model="ChatGPT", | |
messages=conversation, | |
temperature=0, | |
max_tokens=1000 | |
) | |
# Get the generated summary message | |
message = response.choices[0].message.content | |
return message | |
except Exception as e: | |
logging.error(f"Error generating video summary: {e}") | |
return "" | |
def generate_topics(self) -> str: | |
""" | |
Generate topics from the transcribed video. | |
Returns: | |
str: Generated topics. | |
""" | |
try: | |
# Define a conversation between system and user | |
conversation = [ | |
{"role": "system", "content": "You are a Topic Generator"}, | |
{"role": "user", "content": f"""generate single Topics from the following text don't make sentence for topic generation,delimited by triple backticks.Output must in english. | |
list out the topics: | |
Topics: | |
```{self.english_text}``` | |
"""} | |
] | |
# Generate completion using ChatGPT model | |
response = self.client.chat.completions.create( | |
model="ChatGPT", | |
messages=conversation, | |
temperature=0, | |
max_tokens=1000 | |
) | |
# Get the generated topics message | |
message = response.choices[0].message.content | |
return message | |
except Exception as e: | |
logging.error(f"Error generating topics: {e}") | |
return "" | |
def translation(self) -> str: | |
""" | |
translation from the transcribed video. | |
Returns: | |
str: translation. | |
""" | |
try: | |
# Define a conversation between system and user | |
conversation = [ | |
{"role": "system", "content": "You are a Multilingual Translator"}, | |
{"role": "user", "content": f""" Translate the following text in English ,delimited by triple backticks. | |
```{self.transcribed_text}``` | |
"""} | |
] | |
# Generate completion using ChatGPT model | |
response = self.client.chat.completions.create( | |
model="ChatGPT", | |
messages=conversation, | |
temperature=0, | |
max_tokens=1000 | |
) | |
# Get the generated topics message | |
message = response.choices[0].message.content | |
return message | |
except Exception as e: | |
logging.error(f"Error generating topics: {e}") | |
return "" | |
def format_prompt(self, question: str, data: str) -> str: | |
""" | |
Formats the prompt for the language model. | |
Args: | |
question (str): The user's question. | |
data (str): The data to be analyzed. | |
Returns: | |
str: Formatted prompt. | |
""" | |
prompt = "<s>" | |
prompt = f"""[INST] you are the german language and universal language expert .your task is analyze the given data and user ask any question about given data answer to the user question.your returning answer must in user's language.otherwise reply i don't know. | |
data:{data} | |
question:{question}[/INST]""" | |
prompt1 = f"[INST] {question} [/INST]" | |
return prompt+prompt1 | |
def generate(self, prompt: str, transcribed_text: str, temperature=0.9, max_new_tokens=5000, top_p=0.95, | |
repetition_penalty=1.0) -> str: | |
""" | |
Generates text based on the prompt and transcribed text. | |
Args: | |
prompt (str): The prompt for generating text. | |
transcribed_text (str): The transcribed text for analysis. | |
temperature (float): Controls the randomness of the sampling. Default is 0.9. | |
max_new_tokens (int): Maximum number of tokens to generate. Default is 5000. | |
top_p (float): Nucleus sampling parameter. Default is 0.95. | |
repetition_penalty (float): Penalty for repeating the same token. Default is 1.0. | |
Returns: | |
str: Generated text. | |
""" | |
try: | |
temperature = float(temperature) | |
if temperature < 1e-2: | |
temperature = 1e-2 | |
top_p = float(top_p) | |
generate_kwargs = dict( | |
temperature=temperature, | |
max_new_tokens=max_new_tokens, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
do_sample=True, | |
seed=42, | |
) | |
# Format the prompt | |
formatted_prompt = self.format_prompt(prompt,transcribed_text) | |
# Generate text using the mistral client | |
stream = self.mistral_client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
output = "" | |
# Concatenate generated text | |
for response in stream: | |
output += response.token.text | |
return output.replace("</s>","") | |
except Exception as e: | |
logging.error(f"Error in text generation: {e}") | |
return "An error occurred during text generation." | |
def video_qa(self, question: str, model: str) -> str: | |
""" | |
Performs video question answering. | |
Args: | |
question (str): The question asked by the user. | |
model (str): The language model to be used ("OpenAI" or "Mixtral"). | |
Returns: | |
str: Answer to the user's question. | |
""" | |
try: | |
if model == "OpenAI": | |
template = """you are the universal language expert .your task is analyze the given text and user ask any question about given text answer to the user question.otherwise reply i don't know. | |
extracted_text:{text} | |
user_question:{question}""" | |
prompt = PromptTemplate(template=template, input_variables=["text","question"]) | |
llm_chain = LLMChain(prompt=prompt, verbose=True, llm=self.openai_llm) | |
# Run the language model chain | |
result = llm_chain.run({"text":self.english_text,"question":question}) | |
return result | |
elif model == "Mixtral": | |
# Generate answer using Mixtral model | |
result = self.generate(question,self.english_text) | |
return result | |
except Exception as e: | |
logging.error(f"Error in video question answering: {e}") | |
return "An error occurred during video question answering." | |
def extract_video_important_sentence(self) -> str: | |
""" | |
Extract important sentences from the transcribed video. | |
Returns: | |
str: Extracted important sentences. | |
""" | |
try: | |
# Tokenize the sentences | |
sentences = nltk.sent_tokenize(self.english_text) | |
# Initialize TF-IDF vectorizer | |
tfidf_vectorizer = TfidfVectorizer() | |
# Fit the vectorizer on the summary sentences | |
tfidf_matrix = tfidf_vectorizer.fit_transform(sentences) | |
# Calculate sentence scores based on TF-IDF values | |
sentence_scores = tfidf_matrix.sum(axis=1) | |
# Create a list of (score, sentence) tuples | |
sentence_rankings = [(score, sentence) for score, sentence in zip(sentence_scores, sentences)] | |
# Sort sentences by score in descending order | |
sentence_rankings.sort(reverse=True) | |
# Set a threshold for selecting sentences | |
threshold = 2.5 # Adjust as needed | |
# Select sentences with scores above the threshold | |
selected_sentences = [sentence for score, sentence in sentence_rankings if score >= threshold] | |
# Join selected sentences to form the summary | |
summary = '\n\n'.join(selected_sentences) | |
return summary | |
except Exception as e: | |
logging.error(f"Error extracting important sentences: {e}") | |
return "" | |
def write_text_files(self, text: str, filename: str) -> None: | |
""" | |
Write text to a file. | |
Args: | |
text (str): Text to be written to the file. | |
filename (str): Name of the file. | |
""" | |
try: | |
file_path = f"{filename}.txt" | |
with open(file_path, 'w') as file: | |
# Write content to the file | |
file.write(text) | |
except Exception as e: | |
logging.error(f"Error writing text to file: {e}") | |
def Download(self, link: str) -> str: | |
""" | |
Download a video from YouTube. | |
Args: | |
link (str): YouTube video link. | |
Returns: | |
str: Path to the downloaded video file. | |
""" | |
try: | |
# Initialize YouTube object with the provided link | |
youtubeObject = YouTube(link) | |
# Get the highest resolution stream | |
youtubeObject = youtubeObject.streams.get_highest_resolution() | |
try: | |
# Attempt to download the video | |
file_name = youtubeObject.download() | |
return file_name | |
except: | |
# Log any errors that occur during video download | |
logging.info("An error has occurred") | |
logging.info("Download is completed successfully") | |
except Exception as e: | |
# Log any errors that occur during initialization of YouTube object | |
logging.error(f"Error downloading video: {e}") | |
return "" | |
def save_audio_with_gtts(self, text: str, filename: str) -> str: | |
""" | |
Generate an audio file from the given text using gTTS and save it. | |
Args: | |
text (str): The text to be converted into speech. | |
filename (str): The filename (including path) to save the audio file. | |
Returns: | |
str: The filename of the saved audio file. | |
Raises: | |
Exception: If there's an error during the conversion or saving process. | |
""" | |
try: | |
tts = gTTS(text=text, lang='en') | |
tts.save(filename) | |
logging.info(f"Audio file saved successfully: {filename}") | |
return filename | |
except Exception as e: | |
# Log the exception and raise it further | |
logging.error(f"Error occurred while saving audio: {e}") | |
raise e | |
def main(self, video: str = None, input_path: str = None) -> tuple: | |
""" | |
Perform video analytics. | |
Args: | |
video (str): Path to the video file. | |
input_path (str): Input path for the video. | |
Returns: | |
tuple: Summary, important sentences, and topics. | |
""" | |
try: | |
# Download the video if input_path is provided, otherwise use the provided video path | |
if input_path: | |
input_path = self.Download(input_path) | |
video_ = VideoFileClip(input_path) | |
duration = video_.duration | |
video_.close() | |
if round(duration) <= 6*600: | |
text = self.transcribe_video(input_path) | |
else: | |
return "Video Duration Above 10 Minutes,Try Below 10 Minutes Video","","",None,None,None | |
elif video: | |
video_ = VideoFileClip(video) | |
duration = video_.duration | |
video_.close() | |
if round(duration) <= 6*600: | |
text = self.transcribe_video(video) | |
input_path = video | |
else: | |
return "Video Duration Above 10 Minutes,Try Below 10 Minutes Video","","",None,None,None | |
# Generate summary, important sentences, and topics | |
summary = self.generate_video_summary() | |
self.write_text_files(summary,"Summary") | |
summary_voice = self.save_audio_with_gtts(summary,"summary.mp3") | |
important_sentences = self.extract_video_important_sentence() | |
self.write_text_files(important_sentences,"Important_Sentence") | |
important_sentences_voice = self.save_audio_with_gtts(important_sentences,"important_sentences.mp3") | |
topics = self.generate_topics() | |
self.write_text_files(topics,"Topics") | |
topics_voice = self.save_audio_with_gtts(topics,"topics.mp3") | |
# Return the generated summary, important sentences, and topics | |
return summary,important_sentences,topics,summary_voice,important_sentences_voice,topics_voice | |
except Exception as e: | |
# Log any errors that occur during video analytics | |
logging.error(f"Error in main function: {e}") | |
return "", "", "" | |
def gradio_interface(self): | |
with gr.Blocks(css="style.css",theme=gr.themes.Soft()) as demo: | |
gr.HTML("""<center><h1>Video Analytics</h1></center>""") | |
with gr.Row(): | |
yt_link = gr.Textbox(label= "Youtube Link",placeholder="https://www.youtube.com/watch?v=") | |
with gr.Row(): | |
video = gr.Video(sources="upload",height=200,width=300) | |
with gr.Row(): | |
submit_btn = gr.Button(value="Submit") | |
with gr.Tab("Summary"): | |
with gr.Row(): | |
summary = gr.Textbox(show_label=False,lines=10) | |
with gr.Row(): | |
summary_download = gr.DownloadButton(label="Download",value="Summary.txt",visible=True,size='lg',elem_classes="download_button") | |
with gr.Row(): | |
summary_audio = gr.Audio(show_label= False,elem_classes='audio_class') | |
with gr.Tab("Important Sentences"): | |
with gr.Row(): | |
Important_Sentences = gr.Textbox(show_label=False,lines=10) | |
with gr.Row(): | |
sentence_download = gr.DownloadButton(label="Download",value="Important_Sentence.txt",visible=True,size='lg',elem_classes="download_button") | |
with gr.Row(): | |
important_sentence_audio = gr.Audio(show_label = False,elem_classes='audio_class') | |
with gr.Tab("Topics"): | |
with gr.Row(): | |
Topics = gr.Textbox(show_label=False,lines=10) | |
with gr.Row(): | |
topics_download = gr.DownloadButton(label="Download",value="Topics.txt",visible=True,size='lg',elem_classes="download_button") | |
with gr.Row(): | |
topics_audio = gr.Audio(show_label=False,elem_classes='audio_class') | |
with gr.Tab("Video QA"): | |
with gr.Row(): | |
with gr.Column(scale=0.70): | |
question = gr.Textbox(show_label=False,placeholder="Ask Your Questions...") | |
with gr.Column(scale=0.30): | |
model = gr.Dropdown(["OpenAI", "Mixtral"],show_label=False,value="model") | |
with gr.Row(): | |
result = gr.Textbox(label='Answer',lines=10) | |
submit_btn.click(self.main,[video,yt_link],[summary,Important_Sentences,Topics,summary_audio,important_sentence_audio,topics_audio]) | |
question.submit(self.video_qa,[question,model],result) | |
demo.launch() | |
if __name__ == "__main__": | |
video_analytics = VideoAnalytics() | |
video_analytics.gradio_interface() |