from dotenv import load_dotenv load_dotenv() import json, subprocess import psutil # Import the psutil library import atexit try: ### OPTIONAL DEPENDENCIES ### - pip install redis and celery only when a user opts into using the async endpoints which require both from celery import Celery import redis except: import sys subprocess.check_call( [ sys.executable, "-m", "pip", "install", "redis", "celery" ] ) import time import sys, os sys.path.insert( 0, os.path.abspath("../../..") ) # Adds the parent directory to the system path - for litellm local dev import litellm # Redis connection setup pool = redis.ConnectionPool(host=os.getenv("REDIS_HOST"), port=os.getenv("REDIS_PORT"), password=os.getenv("REDIS_PASSWORD"), db=0, max_connections=5) redis_client = redis.Redis(connection_pool=pool) # Celery setup celery_app = Celery('tasks', broker=f"redis://default:{os.getenv('REDIS_PASSWORD')}@{os.getenv('REDIS_HOST')}:{os.getenv('REDIS_PORT')}", backend=f"redis://default:{os.getenv('REDIS_PASSWORD')}@{os.getenv('REDIS_HOST')}:{os.getenv('REDIS_PORT')}") celery_app.conf.update( broker_pool_limit = None, broker_transport_options = {'connection_pool': pool}, result_backend_transport_options = {'connection_pool': pool}, ) # Celery task @celery_app.task(name='process_job', max_retries=3) def process_job(*args, **kwargs): try: llm_router: litellm.Router = litellm.Router(model_list=kwargs.pop("llm_model_list")) response = llm_router.completion(*args, **kwargs) # type: ignore if isinstance(response, litellm.ModelResponse): response = response.model_dump_json() return json.loads(response) return str(response) except Exception as e: raise e # Ensure Celery workers are terminated when the script exits def cleanup(): try: # Get a list of all running processes for process in psutil.process_iter(attrs=['pid', 'name']): # Check if the process is a Celery worker process if process.info['name'] == 'celery': print(f"Terminating Celery worker with PID {process.info['pid']}") # Terminate the Celery worker process psutil.Process(process.info['pid']).terminate() except Exception as e: print(f"Error during cleanup: {e}") # Register the cleanup function to run when the script exits atexit.register(cleanup)