import os import json # import pytorch_test import uvicorn from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from media_download import YoutubeDownloader # from transcription import StableWhisper # from summarizer import Extract_Summary, AudioBookNarration # from audiobook import AudioBook from helperfunctions import * ### API Configurations app = FastAPI() # Output Directory for Files Storage output_folder = 'Output' # Create a context variable to store the contexts for each user users_context = dict() # # CORS (Cross-Origin Resource Sharing) # origins = [ # "http://localhost", # "http://localhost:4200", # ] # app.add_middleware( # CORSMiddleware, # allow_origins=["*"], # origins, # allow_credentials=True, # allow_methods=["*"], # allow_headers=["*"], # ) ### APIs @app.get("/get_media_metadata") async def get_media_metadata(request: Request, url: str): # Getting User's IP # user_ip = request.client.host user_ip = 1 # Getting User's Youtube Downloader youtube_downloader = YoutubeDownloader(url, output_folder) # Getting Youtube Media Info media_metadata = youtube_downloader.get_media_metadata() # Getting Status status = 1 if media_metadata else 0 if status: # Storing Info in the context for this user's session users_context[user_ip] = dict() users_context[user_ip]['downloader'] = youtube_downloader # users_context[user_ip]['media_metadata'] = media_metadata users_context[user_ip]['url'] = url return {'status': status, 'media_metadata': media_metadata} @app.get("/get_media_formats") async def get_media_formats(request: Request): # Getting User's IP # user_ip = request.client.host user_ip = 1 # Downloading Media for User media_formats = users_context[user_ip]['downloader'].get_media_formats() # Getting Status status = 1 if media_formats else 0 if status: # Storing Media Info in the context for this user's session users_context[user_ip]['media_formats'] = media_formats return {'status': status, 'media_formats': media_formats} @app.get("/download_media") async def download_media(request: Request, media_type: str, media_format: str, media_quality: str): # Getting User's IP # user_ip = request.client.host user_ip = 1 # Downloading Media for User media_path = users_context[user_ip]['downloader'].download(media_type, media_format, media_quality) # Getting Status status = 1 if media_path else 0 if status: # Storing Media Info in the context for this user's session users_context[user_ip]['media_path'] = media_path users_context[user_ip]['media_type'] = media_type return {'status': status, 'media_path': media_path} @app.get("/get_transcript") async def get_transcript(request: Request, subtitle_format: str = 'srt', word_level: bool = False): # Getting User's IP # user_ip = request.client.host user_ip = 1 # Retrieving the media_path from the context for this user's session media_path = users_context[user_ip]['media_path'] # Checking if the media_type is Video, then extract it's audio media_type = users_context[user_ip]['media_type'] if media_type == 'video': media_path = extract_audio(media_path) # # Whisper based transcription # stable_whisper_transcript = StableWhisper(media_path, output_folder, subtitle_format=subtitle_format, word_level=word_level) # transcript = stable_whisper_transcript.generate_transcript() # transcript_path = stable_whisper_transcript.save_transcript() temp_dir = 'temp' if word_level: transcript_path = os.path.join(temp_dir, 'word_level_transcript.json') with open(transcript_path, "r") as json_file: transcript = json.load(json_file) else: transcript_path = os.path.join(temp_dir, 'sentence_level_transcript.json') with open(transcript_path, "r") as json_file: transcript = json.load(json_file) # Getting Status status = 1 if transcript else 0 if status: # Storing Transcript Info in the context for this user's session users_context[user_ip]['transcript'] = transcript users_context[user_ip]['transcript_path'] = transcript_path return {'status': status, "transcript": transcript} @app.get("/get_translation") async def get_translation(request: Request, target_language: str = 'en'): # Getting User's IP # user_ip = request.client.host user_ip = 1 # Retrieving the transcript from the context for this user's session transcript = users_context[user_ip]['transcript'] # # # NLLB based Translation # nllb_translator = Translation(transcript, transcript['language'], target_language, 'output_path') # translated_transcript = nllb_translator.get_translated_transcript() # translated_subtitles = nllb_translator.get_translated_subtitles() temp_dir = 'temp' translated_transcript_path = os.path.join(temp_dir, 'translated_transcript.txt') with open(translated_transcript_path, "r", encoding="utf-8") as f: translated_transcript = f.read() translated_subtitles_path = os.path.join(temp_dir, 'translated_subtitles.json') with open(translated_subtitles_path, "r", encoding="utf-8") as json_file: translated_subtitles = json.load(json_file) # Getting Status status = 1 if translated_transcript and translated_subtitles else 0 if status: # Storing Translated Transcript Info in the context for this user's session users_context[user_ip]['translated_transcript'] = translated_transcript users_context[user_ip]['translated_subtitles'] = translated_subtitles # users_context[user_ip]['transcript_path'] = transcript_path return {'status': status, "transcript": translated_transcript, "subtitles": translated_subtitles} @app.get("/get_summary") async def get_summary(request: Request, Summary_type: str, Summary_strategy: str, Target_Person_type: str, Response_length: str, Writing_style: str, text_input: str = None): # Getting User's IP # user_ip = request.client.host user_ip = 1 # Getting Transcript if not provided if not text_input: text_input = users_context[user_ip]['transcript'] # # Extracting Summary # summary_extractor = Extract_Summary(text_input=text_input) # output = summary_extractor.define_chain(Summary_type=Summary_type, # Summary_strategy=Summary_strategy, # Target_Person_type=Target_Person_type, # Response_length=Response_length, # Writing_style=Writing_style, # key_information=False) temp_dir = 'temp' file_path = os.path.join(temp_dir, 'summary.txt') with open(file_path, 'r') as file: output = file.read() # Getting Status status = 1 if output else 0 if status: # Storing Summary Info in the context for this user's session users_context[user_ip]['summary'] = output return {'status': status, "summary": output} @app.get("/get_key_info") async def get_key_info(request: Request, Summary_type: str, Summary_strategy: str, Target_Person_type: str, Response_length: str, Writing_style: str, text_input: str = None): # Getting User's IP # user_ip = request.client.host user_ip = 1 # Getting Transcript if not provided if not text_input: text_input = users_context[user_ip]['transcript'] # # Extracting Summary # summary_extractor = Extract_Summary(text_input=text_input) # output = summary_extractor.define_chain(Summary_type=Summary_type, # Summary_strategy=Summary_strategy, # Target_Person_type=Target_Person_type, # Response_length=Response_length, # Writing_style=Writing_style, # key_information=True) temp_dir = 'temp' file_path = os.path.join(temp_dir, 'key_info.txt') with open(file_path, 'r') as file: output = file.read() # Getting Status status = 1 if output else 0 if status: # Storing Key Info in the context for this user's session users_context[user_ip]['key_info'] = output return {'status': status, "key_info": output} # @app.get("/get_narration") # async def get_narration(request: Request, narration_style: str, text_input: str = None): # # Getting User's IP # # user_ip = request.client.host # user_ip = 1 # # Getting Transcript if not provided # if not text_input: # text_input = users_context[user_ip]['transcript'] # # # Extracting Narration # # narrator = AudioBookNarration(text_input=text_input) # # output = narrator.define_chain(narration_style=narration_style) # temp_dir = 'temp' # file_path = os.path.join(temp_dir, 'narration.txt') # with open(file_path, 'r') as file: # output = file.read() # # Getting Status # status = 1 if output else 0 # if status: # # Storing Narration Info in the context for this user's session # users_context[user_ip]['narration'] = output # return {'status': status, "narration": output} @app.get("/get_audiobook") async def get_audiobook(request: Request, output_type : str, narration_style: str, speaker: str = "male", text_input: str = None): # Getting User's IP # user_ip = request.client.host user_ip = 1 # Getting Transcript if not provided if not text_input: text_input = users_context[user_ip]['transcript'] # # Extracting Narration # narrator = AudioBookNarration(text_input=text_input) # output = narrator.define_chain(narration_style=narration_style) # # Generating Audiobook # audiobook = AudioBook(output_folder=output_folder) # audio_path = audiobook.generate_audio_from_text(output, speaker=speaker, filename="output_audio") temp_dir = 'temp' file_path = os.path.join(temp_dir, 'narration.txt') audio_path = file_path # Getting Status status = 1 if audio_path else 0 if status: # Storing Audiobook path in the context for this user's session users_context[user_ip]['audiobook_path'] = audio_path return {'status': status, "audiobook_path": audio_path} @app.get("/get_rendered_video") async def get_rendered_video(request: Request, subtitles_type: str = 'original'): # Getting User's IP # user_ip = request.client.host user_ip = 1 # Retrieving the media_path from the context for this user's session media_path = users_context[user_ip]['media_path'] # Getting Required Subtitles if subtitles_type == 'original': subtitles_path = users_context[user_ip]['transcript_path'] elif subtitles_type == 'translated': # Getting Translated Subtitles from the context for this user's session translated_subtitles = users_context[user_ip]['translated_subtitles'] # Saving Translated Subtitles subtitles_path = save_translated_subtitles(translated_subtitles, media_path) # Burning Subtitles & Rendering Video rendered_video_path = burn_subtitles(media_path, subtitles_path) # Getting Status status = 1 if rendered_video_path else 0 return {'status': status, "rendered_video_path": rendered_video_path} if __name__ == "__main__": uvicorn.run(app, host="127.0.0.1", port=8000)