|
from elevenlabs import VoiceSettings |
|
from elevenlabs.client import ElevenLabs |
|
from transformers import M2M100ForConditionalGeneration, M2M100Tokenizer |
|
import whisper |
|
from ai71 import AI71 |
|
from datetime import datetime |
|
import os |
|
import time |
|
from pydub import AudioSegment |
|
|
|
|
|
from base64 import b64encode |
|
import gradio as gr |
|
import concurrent.futures |
|
|
|
AI71_API_KEY = os.getenv('AI71_API_KEY') |
|
XI_API_KEY = os.getenv('ELEVEN_LABS_API_KEY') |
|
client = ElevenLabs(api_key=XI_API_KEY) |
|
|
|
model = M2M100ForConditionalGeneration.from_pretrained("facebook/m2m100_1.2B") |
|
tokenizer = M2M100Tokenizer.from_pretrained("facebook/m2m100_1.2B") |
|
transcriber = whisper.load_model("turbo") |
|
|
|
language_codes = {"English":"en", "Hindi":"hi", "Portuguese":"pt", "Chinese":"zh", "Spanish":"es", |
|
"French":"fr", "German":"de", "Japanese":"ja", "Arabic":"ar", "Russian":"ru", |
|
"Korean":"ko", "Indonesian":"id", "Italian":"it", "Dutch":"nl","Turkish":"tr", |
|
"Polish":"pl", "Swedish":"sv", "Filipino":"fil", "Malay":"ms", "Romanian":"ro", |
|
"Ukrainian":"uk", "Greek":"el", "Czech":"cs", "Danish":"da", "Finnish":"fi", |
|
"Bulgarian":"bg", "Croatian":"hr", "Slovak":"sk"} |
|
|
|
meeting_texts = [] |
|
n_participants = 4 |
|
language_choices = ["English", "Polish", "Hindi", "Arabic"] |
|
|
|
|
|
def wait_for_dubbing_completion(dubbing_id: str) -> bool: |
|
""" |
|
Waits for the dubbing process to complete by periodically checking the status. |
|
|
|
Args: |
|
dubbing_id (str): The dubbing project id. |
|
|
|
Returns: |
|
bool: True if the dubbing is successful, False otherwise. |
|
""" |
|
MAX_ATTEMPTS = 120 |
|
CHECK_INTERVAL = 10 |
|
|
|
for _ in range(MAX_ATTEMPTS): |
|
metadata = client.dubbing.get_dubbing_project_metadata(dubbing_id) |
|
if metadata.status == "dubbed": |
|
return True |
|
elif metadata.status == "dubbing": |
|
print( |
|
"Dubbing in progress... Will check status again in", |
|
CHECK_INTERVAL, |
|
"seconds.", |
|
) |
|
time.sleep(CHECK_INTERVAL) |
|
else: |
|
print("Dubbing failed:", metadata.error_message) |
|
return False |
|
|
|
print("Dubbing timed out") |
|
return False |
|
|
|
def download_dubbed_file(dubbing_id: str, language_code: str) -> str: |
|
""" |
|
Downloads the dubbed file for a given dubbing ID and language code. |
|
|
|
Args: |
|
dubbing_id: The ID of the dubbing project. |
|
language_code: The language code for the dubbing. |
|
|
|
Returns: |
|
The file path to the downloaded dubbed file. |
|
""" |
|
dir_path = f"data/{dubbing_id}" |
|
os.makedirs(dir_path, exist_ok=True) |
|
|
|
file_path = f"{dir_path}/{language_code}.mp4" |
|
with open(file_path, "wb") as file: |
|
for chunk in client.dubbing.get_dubbed_file(dubbing_id, language_code): |
|
file.write(chunk) |
|
|
|
return file_path |
|
|
|
def create_dub_from_file( |
|
input_file_path: str, |
|
file_format: str, |
|
source_language: str, |
|
target_language: str, |
|
): |
|
|
|
""" |
|
Dubs an audio or video file from one language to another and saves the output. |
|
|
|
Args: |
|
input_file_path (str): The file path of the audio or video to dub. |
|
file_format (str): The file format of the input file. |
|
source_language (str): The language of the input file. |
|
target_language (str): The target language to dub into. |
|
|
|
Returns: |
|
Optional[str]: The file path of the dubbed file or None if operation failed. |
|
""" |
|
if not os.path.isfile(input_file_path): |
|
raise FileNotFoundError(f"The input file does not exist: {input_file_path}") |
|
|
|
with open(input_file_path, "rb") as audio_file: |
|
response = client.dubbing.dub_a_video_or_an_audio_file( |
|
file=(os.path.basename(input_file_path), audio_file, file_format), |
|
target_lang=target_language, |
|
|
|
source_lang=source_language, |
|
num_speakers=1, |
|
watermark=True, |
|
) |
|
|
|
|
|
dubbing_id = response.dubbing_id |
|
if wait_for_dubbing_completion(dubbing_id): |
|
output_file_path = download_dubbed_file(dubbing_id, target_language) |
|
return output_file_path |
|
else: |
|
return None |
|
|
|
|
|
def summarize(meeting_texts=meeting_texts): |
|
mt = ', '.join([f"{k}: {v}" for i in meeting_texts for k, v in i.items()]) |
|
meeting_date_time = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) |
|
meeting_texts = meeting_date_time + '\n' + mt |
|
|
|
meeting_conversation_processed ='\n'.join(mt) |
|
|
|
|
|
minutes_of_meeting = "" |
|
for chunk in AI71(AI71_API_KEY.strip()).chat.completions.create( |
|
model="tiiuae/falcon-180b-chat", |
|
messages=[ |
|
{"role": "system", "content": """You are an expereiced Secretary who can summarize meeting discussions into minutes of meeting. |
|
Summarize the meetings discussions provided as Speakerwise conversation. Ensure to mention the title as 'Minutes of Meeting held on {meeting_date_time} and present the summary with better viewing format and title in bold letters"""}, |
|
{"role": "user", "content": meeting_conversation_processed}, |
|
], |
|
stream=True, |
|
): |
|
if chunk.choices[0].delta.content: |
|
summary = chunk.choices[0].delta.content |
|
minutes_of_meeting += summary |
|
minutes_of_meeting = minutes_of_meeting.replace('User:', '').strip() |
|
print("\n") |
|
print("minutes_of_meeting:", minutes_of_meeting) |
|
return minutes_of_meeting |
|
|
|
|
|
|
|
def speech_to_text(video): |
|
print('Started transcribing') |
|
audio = AudioSegment.from_file(video) |
|
audio.export('temp.mp3', format="mp3") |
|
transcript= transcriber.transcribe('temp.mp3')['text'] |
|
print('transcript:', transcript) |
|
return transcript |
|
|
|
|
|
def translate_text(text, source_language,target_language): |
|
tokenizer.src_lang = source_language |
|
encoded_ln = tokenizer(text, return_tensors="pt") |
|
generated_tokens = model.generate(**encoded_ln, forced_bos_token_id=tokenizer.get_lang_id(target_language)) |
|
translated_text = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0] |
|
print('translated_text:', translated_text) |
|
return translated_text |
|
|
|
|
|
def synthesize_speech(video, source_language,target_language): |
|
print('Started dubbing') |
|
dub_video = create_dub_from_file(input_file_path = video, |
|
file_format = 'audio/mpeg', |
|
source_language = source_language, |
|
target_language = target_language) |
|
return dub_video |
|
|
|
|
|
def process_speaker(video, speaker_idx, n_participants, *language_list): |
|
transcript = speech_to_text(video) |
|
|
|
|
|
outputs = [] |
|
global meeting_texts |
|
def process_translation_dubbing(i): |
|
if i != speaker_idx: |
|
participant_language = language_codes[language_list[i]] |
|
speaker_language = language_codes[language_list[speaker_idx]] |
|
translated_text = translate_text(transcript, speaker_language, participant_language) |
|
dubbed_video = synthesize_speech(video, speaker_language, participant_language) |
|
return translated_text, dubbed_video |
|
return None, None |
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
futures = [executor.submit(process_translation_dubbing, i) for i in range(n_participants)] |
|
results = [f.result() for f in futures] |
|
|
|
for i, (translated_text, dubbed_video) in enumerate(results): |
|
if i == speaker_idx: |
|
outputs.insert(0, transcript) |
|
else: |
|
outputs.append(translated_text) |
|
outputs.append(dubbed_video) |
|
if speaker_idx == 0: |
|
meeting_texts.append({f"Speaker_{speaker_idx+1}":outputs[0]}) |
|
else: |
|
meeting_texts.append({f"Speaker_{speaker_idx+1}":outputs[1]}) |
|
|
|
print(len(outputs)) |
|
print(outputs) |
|
print('meeting_texts: ',meeting_texts) |
|
return outputs |
|
|
|
def create_participant_row(i, language_choices): |
|
"""Creates the UI for a single participant.""" |
|
with gr.Row(): |
|
video_input = gr.Video(label=f"Participant {i+1} Video", interactive=True) |
|
language_dropdown = gr.Dropdown(choices=language_choices, label=f"Participant {i+1} Language", value=language_choices[i]) |
|
transcript_output = gr.Textbox(label=f"Participant {i+1} Transcript") |
|
translated_text = gr.Textbox(label="Speaker's Translated Text") |
|
dubbed_video = gr.Video(label="Speaker's Dubbed Video") |
|
return video_input, language_dropdown, transcript_output, translated_text, dubbed_video |
|
|
|
|
|
def create_gradio_interface(n_participants, language_choices): |
|
with gr.Blocks() as demo: |
|
gr.Markdown("# LinguaPolis: Bridging Languages, Uniting Teams Globally - Multilingual Conference Call Simulation") |
|
|
|
video_inputs = [] |
|
language_dropdowns = [] |
|
transcript_outputs = [] |
|
translated_texts = [] |
|
dubbed_videos = [] |
|
|
|
|
|
for i in range(n_participants): |
|
video_input, language_dropdown, transcript_output, translated_text, dubbed_video = create_participant_row(i, language_choices) |
|
video_inputs.append(video_input) |
|
language_dropdowns.append(language_dropdown) |
|
transcript_outputs.append(transcript_output) |
|
translated_texts.append(translated_text) |
|
dubbed_videos.append(dubbed_video) |
|
|
|
|
|
for i in range(n_participants): |
|
gr.Button(f"Submit Speaker {i+1}'s Speech").click( |
|
process_speaker, |
|
[video_inputs[i], gr.State(i), gr.State(n_participants)] + [language_dropdowns[j] for j in range(n_participants)], |
|
[transcript_outputs[i]] + [k for j in zip(translated_texts[:i]+translated_texts[i+1:], dubbed_videos[:i]+dubbed_videos[i+1:]) for k in j] |
|
) |
|
minutes = gr.Textbox(label="Minutes of Meeting") |
|
gr.Button(f"Generate Minutes of meeting").click(summarize, None, minutes) |
|
|
|
|
|
demo.queue().launch(debug=True, share=True) |
|
|
|
|
|
create_gradio_interface(n_participants, language_choices) |