litellm / litellm /proxy /queue /celery_app.py
ka1kuk's picture
Upload 235 files
7db0ae4 verified
# from dotenv import load_dotenv
# load_dotenv()
# import json, subprocess
# import psutil # Import the psutil library
# import atexit
# try:
# ### OPTIONAL DEPENDENCIES ### - pip install redis and celery only when a user opts into using the async endpoints which require both
# from celery import Celery
# import redis
# except:
# import sys
# subprocess.check_call([sys.executable, "-m", "pip", "install", "redis", "celery"])
# import time
# import sys, os
# sys.path.insert(
# 0, os.path.abspath("../../..")
# ) # Adds the parent directory to the system path - for litellm local dev
# import litellm
# # Redis connection setup
# pool = redis.ConnectionPool(
# host=os.getenv("REDIS_HOST"),
# port=os.getenv("REDIS_PORT"),
# password=os.getenv("REDIS_PASSWORD"),
# db=0,
# max_connections=5,
# )
# redis_client = redis.Redis(connection_pool=pool)
# # Celery setup
# celery_app = Celery(
# "tasks",
# broker=f"redis://default:{os.getenv('REDIS_PASSWORD')}@{os.getenv('REDIS_HOST')}:{os.getenv('REDIS_PORT')}",
# backend=f"redis://default:{os.getenv('REDIS_PASSWORD')}@{os.getenv('REDIS_HOST')}:{os.getenv('REDIS_PORT')}",
# )
# celery_app.conf.update(
# broker_pool_limit=None,
# broker_transport_options={"connection_pool": pool},
# result_backend_transport_options={"connection_pool": pool},
# )
# # Celery task
# @celery_app.task(name="process_job", max_retries=3)
# def process_job(*args, **kwargs):
# try:
# llm_router: litellm.Router = litellm.Router(model_list=kwargs.pop("llm_model_list")) # type: ignore
# response = llm_router.completion(*args, **kwargs) # type: ignore
# if isinstance(response, litellm.ModelResponse):
# response = response.model_dump_json()
# return json.loads(response)
# return str(response)
# except Exception as e:
# raise e
# # Ensure Celery workers are terminated when the script exits
# def cleanup():
# try:
# # Get a list of all running processes
# for process in psutil.process_iter(attrs=["pid", "name"]):
# # Check if the process is a Celery worker process
# if process.info["name"] == "celery":
# print(f"Terminating Celery worker with PID {process.info['pid']}")
# # Terminate the Celery worker process
# psutil.Process(process.info["pid"]).terminate()
# except Exception as e:
# print(f"Error during cleanup: {e}")
# # Register the cleanup function to run when the script exits
# atexit.register(cleanup)