import os import json import uuid import requests from helperfunctions import * from models import load_models from languages import CODE2LANG from s3_handler import S3Handler from media_download import YoutubeDownloader from transcription import StableWhisper from translation import Translation from summarizer import Extract_Summary, AudioBookNarration from audiobook import AudioBook from celery import Celery celery = Celery('testing.py', broker='pyamqp://guest:guest@localhost//', backend='rpc://') # Output Directory for Files Storage output_folder = 'Output' # S3 Handler s3 = S3Handler() # Create a context variable to store the contexts for each user users_context = dict() @celery.task def get_media_metadata_task(user_id: str, url: str): # User Folder Path user_folder_path = os.path.join(output_folder, user_id) # Getting User's Youtube Downloader youtube_downloader = YoutubeDownloader(url, user_folder_path) # Getting Youtube Media Info media_metadata = youtube_downloader.get_media_metadata() # Storing User's Media Metadata to Directory media_metadata_path = os.path.join(user_folder_path, 'media_metadata.json') with open(media_metadata_path, "w") as outfile: json.dump(media_metadata, outfile) # Storing User's Media Metadata to S3 s3_path = s3.upload_file(user_id, 'media_metadata.json', media_metadata_path) # Getting Status status = 1 if media_metadata else 0 if status: # Storing Info in the context for this user's session users_context[user_id] = dict() users_context[user_id]['downloader'] = youtube_downloader # users_context[user_id]['media_metadata'] = media_metadata users_context[user_id]['url'] = url return {'status': status, 'user_id': user_id, 'media_metadata': media_metadata, 'media_metadata_path': s3_path} @celery.task def get_media_formats_task(user_id): # Getting Media Formats for User media_formats = users_context[user_id]['downloader'].get_media_formats() # User Folder Path user_folder_path = os.path.join(output_folder, user_id) # Storing User's Media Formats to Directory media_formats_path = os.path.join(user_folder_path, 'media_formats.json') with open(media_formats_path, "w") as outfile: json.dump(media_formats, outfile) # Storing User's Media Formats to S3 s3_path = s3.upload_file(user_id, 'media_formats.json', media_formats_path) # Getting Status status = 1 if media_formats else 0 if status: # Storing Media Info in the context for this user's session users_context[user_id]['media_formats'] = media_formats return {'status': status, 'media_formats': media_formats, 'media_formats_path': s3_path} @celery.task def download_media_task(user_id,media_type: str, media_format: str, media_quality: str): # Downloading Media for User media_path = users_context[user_id]['downloader'].download(media_type, media_format, media_quality) # Storing User's Downloaded Media to S3 media_file = f"{media_type.lower()}_{media_quality.lower()}.{media_format.lower()}" s3_path = s3.upload_file(user_id, media_file, media_path) # Getting Status status = 1 if media_path else 0 if status: # Storing Media Info in the context for this user's session users_context[user_id]['media_path'] = media_path users_context[user_id]['media_type'] = media_type return {'status': status, 'media_path': s3_path} @celery.task def get_transcript_task(user_id: str, subtitle_format: str = 'srt', word_level: bool = False): # If Video Already Downloaded if 'media_path' in users_context[user_id].keys(): # Retrieving the media_path from the context for this user's session media_path = users_context[user_id]['media_path'] # Checking if the media_type is Video, then extract it's audio media_type = users_context[user_id]['media_type'] if media_type == 'video': media_path = extract_audio(media_path) else: # Downloading Audio for Transcription media_path = users_context[user_id]['downloader'].download('audio', 'mp3', '128kbps') # Whisper based transcription user_folder_path = os.path.join(output_folder, user_id) stable_whisper_transcript = StableWhisper(model=MODELS['transcription'], media_path=media_path, output_path=user_folder_path, subtitle_format=subtitle_format, word_level=word_level) transcript = stable_whisper_transcript.generate_transcript() transcript_path = stable_whisper_transcript.save_transcript() subtitles_path = stable_whisper_transcript.save_subtitles() # Storing User's Transcripts to S3 s3_transcript_path = s3.upload_file(user_id, 'transcript.txt', transcript_path) s3_subtitles_path = s3.upload_file(user_id, f'subtitles.{subtitle_format}', subtitles_path) # Getting Status status = 1 if transcript and s3_transcript_path and s3_subtitles_path else 0 if status: # Storing Transcript Info in the context for this user's session users_context[user_id]['transcript'] = transcript users_context[user_id]['transcript_path'] = transcript_path users_context[user_id]['subtitles_path'] = subtitles_path return {'status': status, 'transcript': transcript, 'transcript_path': s3_transcript_path, 'subtitles_path': s3_subtitles_path} @celery.task def get_translation_task(user_id: str, target_language: str = 'en'): # If Transcript Available if 'transcript' in users_context[user_id].keys(): # Retrieving the transcript from the context for this user's session transcript = users_context[user_id]['transcript'] else: return {'status': 0, 'message': 'Transcript not generated yet'} # NLLB based Translation user_folder_path = os.path.join(output_folder, user_id) nllb_translator = Translation(model=MODELS['translation'], transcript_dict=transcript, source_lang=transcript['language'], target_lang=target_language, output_path=user_folder_path) translated_transcript = nllb_translator.get_translated_transcript() translated_subtitles = nllb_translator.get_translated_subtitles() # Storing Translated Transcript as TXT file in UTF-8 format translated_transcript_path = os.path.join(user_folder_path, 'translated_transcript.txt') with open(translated_transcript_path, 'w', encoding='utf-8') as file: file.write(translated_transcript) # Storing Translated Transcript to S3 s3_transcript_path = s3.upload_file(user_id, 'translated_transcript.txt', translated_transcript_path) # TODO: Write Translated Transcript as SRT, VTT, ASS files # Storing Translated Subtitles as JSON file (For Now) translated_subtitles_path = os.path.join(user_folder_path, 'translated_subtitles.json') with open(translated_subtitles_path, "w", encoding='utf-8') as file: json.dump(translated_subtitles_path, file) # Storing Translated Subtitles to S3 s3_subtitles_path = s3.upload_file(user_id, 'translated_subtitles.json', translated_subtitles_path) # Getting Status status = 1 if translated_transcript and translated_subtitles else 0 if status: # Storing Translated Transcript Info in the context for this user's session users_context[user_id]['translated_transcript'] = translated_transcript users_context[user_id]['translated_transcript_path'] = translated_transcript_path users_context[user_id]['translated_subtitles'] = translated_subtitles users_context[user_id]['translated_subtitles_path'] = translated_subtitles_path return {'status': status, 'transcript': translated_transcript, 'subtitles': translated_subtitles, 'transcript_path': s3_transcript_path, 'subtitles_path': s3_subtitles_path} @celery.task def get_summary_task(user_id: str, Summary_type: str, Summary_strategy: str, Target_Person_type: str, Response_length: str, Writing_style: str): # If Transcript Available if 'transcript' in users_context[user_id].keys(): # Retrieving the transcript from the context for this user's session text_input = users_context[user_id]['transcript'] else: return {'status': 0, 'message': 'Transcript not generated yet'} # Extracting Summary summary_extractor = Extract_Summary(text_input=text_input) output = summary_extractor.define_chain(Summary_type=Summary_type, Summary_strategy=Summary_strategy, Target_Person_type=Target_Person_type, Response_length=Response_length, Writing_style=Writing_style, key_information=False) # Getting Status status = 1 if output else 0 if status: # Storing Summary Info in the context for this user's session users_context[user_id]['summary'] = output return {'status': status, "summary": output} @celery.task def get_key_info_task(user_id: str, Summary_type: str, Summary_strategy: str, Target_Person_type: str, Response_length: str, Writing_style: str): # If Transcript Available if 'transcript' in users_context[user_id].keys(): # Retrieving the transcript from the context for this user's session text_input = users_context[user_id]['transcript'] else: return {'status': 0, 'message': 'Transcript not generated yet'} # Extracting Key Value Info summary_extractor = Extract_Summary(text_input=text_input) output = summary_extractor.define_chain(Summary_type=Summary_type, Summary_strategy=Summary_strategy, Target_Person_type=Target_Person_type, Response_length=Response_length, Writing_style=Writing_style, key_information=True) # Getting Status status = 1 if output else 0 if status: # Storing Key Info in the context for this user's session users_context[user_id]['key_info'] = output return {'status': status, "key_info": output} @celery.task def get_audiobook_task(user_id: str, narration_style: str, speaker: str = "male", audio_format: str = "mp3", audio_quality: str = "128kbps"): # If Transcript Available if 'transcript' in users_context[user_id].keys(): # Retrieving the transcript from the context for this user's session text_input = users_context[user_id]['transcript'] else: return {'status': 0, 'message': 'Transcript not generated yet'} # Extracting Narration narrator = AudioBookNarration(text_input=text_input) output = narrator.define_chain(narration_style=narration_style) # Generating Audiobook audiobook = AudioBook(output_folder=output_folder) audio_path = audiobook.generate_audio_from_text(output, speaker=speaker, filename="output_audio") # Converting the Audio to Required Audio Parameters audio_path = convert_audio(audio_path, audio_format, audio_quality) # Storing User's Audiobook to S3 media_file = f"audiobook_{audio_quality.lower()}.{audio_format.lower()}" s3_path = s3.upload_file(user_id, media_file, audio_path) # Getting Status status = 1 if audio_path else 0 if status: # Storing Audiobook path in the context for this user's session users_context[user_id]['audiobook_path'] = audio_path return {'status': status, "audiobook_path": s3_path} @celery.task def get_rendered_video_task(user_id: str, video_format: str, video_quality: str, subtitles_type: str = 'original'): # # Retrieving the media_path from the context for this user's session # media_path = users_context[user_id]['media_path'] # Downloading Video with Required Video Parameters for User media_path = users_context[user_id]['downloader'].download('video', video_format, video_quality) # Getting Required Subtitles if 'original' in subtitles_type.lower(): subtitles_path = users_context[user_id]['subtitles_path'] elif 'translated' in subtitles_type.lower(): # Getting Translated Subtitles from the context for this user's session translated_subtitles = users_context[user_id]['translated_subtitles_path'] # Saving Translated Subtitles subtitles_path = save_translated_subtitles(translated_subtitles, media_path) # Burning Subtitles & Rendering Video rendered_video_path = burn_subtitles(media_path, subtitles_path) # Storing User's Rendered Video to S3 media_file = f"subtitles_video_{video_quality.lower()}.{video_format.lower()}" s3_path = s3.upload_file(user_id, media_file, media_path) # Getting Status status = 1 if rendered_video_path else 0 return {'status': status, "rendered_video_path": s3_path}