import os import json import uuid import requests import uvicorn from fastapi import HTTPException, FastAPI, Request, WebSocket, Depends from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager from helperfunctions import * from celery import Celery from celery.result import AsyncResult from celery_config import broker_url, result_backend import api_functions from languages import CODE2LANG # Initialize Celery celery = Celery( 'tasks', broker=broker_url, backend=result_backend ) # Load Celery Configuration from File celery.config_from_object('celery_config') celery.autodiscover_tasks(['api_functions']) ### API Configurations # Context Manager for FastAPI Start/Shutdown @asynccontextmanager async def lifespan(app: FastAPI): ## FastAPI Startup Code # Loading ML models print('Loading ML Models..') # For Storing Models global MODELS MODELS = {} # MODELS = load_models() print('ML Models Loaded!') yield ## FastAPI Shutdown Code # Cleaning ML Models & Releasing the Resources MODELS.clear() # Initializing FastAPI App app = FastAPI(lifespan=lifespan) # CORS (Cross-Origin Resource Sharing) origins = [ "http://localhost", "http://localhost:4200", ] app.add_middleware( CORSMiddleware, allow_origins=["*"], # origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) ### APIs # WebSocket route to monitor task status @app.websocket("/ws/task_status/{task_id}") async def websocket_endpoint(websocket: WebSocket, task_id: str): await websocket.accept() while True: result = AsyncResult(task_id, app=celery) await websocket.send_json({"status": result.status, "result": result.result}) await asyncio.sleep(1) # API for monitoring task status @app.get("/task_status/{task_id}") async def get_task_status(request: Request, task_id: str): result = AsyncResult(task_id, app=celery) return {"status": result.status, "result": result.result} # Function to create a unique task ID def create_task_id(): return f"task_{str(uuid.uuid4())[:8]}" # Function to run a background task and return task ID def run_background_task(task_function, *args): task_id = create_task_id() task = celery.send_task( task_function, args=args, task_id=task_id ) return task_id, task @app.get("/get_media_metadata") async def get_media_metadata(request: Request, url: str): # try: # Getting User's IP & Generating UUID user_ip = request.client.host user_id = generate_uuid(user_ip, url) # Run background task for media metadata task_id, _ = run_background_task("api_functions.get_media_metadata_task", user_id, url) # Getting Status status = 1 if task_id else 0 return {'status': 'Task Started', 'task_id': task_id, 'user_id': user_id, 'status': status} # except Exception as e: # print(f"An unexpected error occurred:\n{str(e)}") # raise HTTPException(status_code=500, detail=str(e)) @app.get("/get_media_formats") async def get_media_formats(user_id: str): # try: # Run background task for media metadata task_id, _ = run_background_task("api_functions.get_media_formats_task", user_id) # Getting Status status = 1 if task_id else 0 return {'status': 'Task Started', 'task_id': task_id, 'status': status} # except Exception as e: # print(f"An unexpected error occurred:\n{str(e)}") # raise HTTPException(status_code=500, detail=str(e)) @app.get("/download_media") async def download_media(user_id: str, media_type: str, media_format: str, media_quality: str): try: # Run background task for media Download task_id, _ = run_background_task("api_functions.download_media_task", user_id, media_type,media_format,media_quality) # Getting Status status = 1 if task_id else 0 return {'status': 'Task Started', 'task_id': task_id, 'status': status} except Exception as e: print(f"An unexpected error occurred:\n{str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/get_transcript") async def get_transcript(user_id: str, subtitle_format: str = 'srt', word_level: bool = False): try: # Run background task for media Download task_id, _ = run_background_task("api_functions.get_transcript_task", user_id, subtitle_format,word_level) # Getting Status status = 1 if task_id else 0 return {'status': 'Task Started', 'task_id': task_id, 'status': status} except Exception as e: print(f"An unexpected error occurred:\n{str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/get_languages") async def get_translation(user_id: str): return CODE2LANG @app.get("/get_translation") async def get_translation(user_id: str, target_language: str = 'en'): try: # Run background task for get_translation task_id, _ = run_background_task("api_functions.get_translation_task", user_id,target_language) # Getting Status status = 1 if task_id else 0 return {'status': 'Task Started', 'task_id': task_id, 'status': status} except Exception as e: print(f"An unexpected error occurred:\n{str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/get_summary") async def get_summary(user_id: str, Summary_type: str, Summary_strategy: str, Target_Person_type: str, Response_length: str, Writing_style: str): try: # Run background task for get_summary task_id, _ = run_background_task("api_functions.get_summary_task", user_id,Summary_type,Summary_strategy, Target_Person_type,Response_length, Writing_style) # Getting Status status = 1 if task_id else 0 return {'status': 'Task Started', 'task_id': task_id, 'status': status} except Exception as e: print(f"An unexpected error occurred:\n{str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/get_key_info") async def get_key_info(user_id: str, Summary_type: str, Summary_strategy: str, Target_Person_type: str, Response_length: str, Writing_style: str): try: # Run background task for get_summary task_id, _ = run_background_task("api_functions.get_key_info_task", user_id,Summary_type,Summary_strategy, Target_Person_type,Response_length, Writing_style) # Getting Status status = 1 if task_id else 0 return {'status': 'Task Started', 'task_id': task_id, 'status': status} except Exception as e: print(f"An unexpected error occurred:\n{str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/get_audiobook") async def get_audiobook(user_id: str, narration_style: str, speaker: str = "male", audio_format: str = "mp3", audio_quality: str = "128kbps"): try: # Run background task for get_summary task_id, _ = run_background_task("api_functions.get_audiobook_task", user_id,narration_style,speaker, audio_format,audio_quality) # Getting Status status = 1 if task_id else 0 return {'status': 'Task Started', 'task_id': task_id, 'status': status} except Exception as e: print(f"An unexpected error occurred:\n{str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/get_rendered_video") async def get_rendered_video(user_id: str, video_format: str, video_quality: str, subtitles_type: str = 'original'): try: # Run background task for get_summary task_id, _ = run_background_task("api_functions.get_rendered_video_task", user_id,video_format,video_quality, subtitles_type) # Getting Status status = 1 if task_id else 0 return {'status': 'Task Started', 'task_id': task_id, 'status': status} except Exception as e: print(f"An unexpected error occurred:\n{str(e)}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run(app, host="127.0.0.1", port=8000)