Spaces:
Paused
Paused
| """ | |
| Call Hook for LiteLLM Proxy which allows Langfuse prompt management. | |
| """ | |
| import os | |
| from functools import lru_cache | |
| from typing import TYPE_CHECKING, Any, List, Literal, Optional, Tuple, Union, cast | |
| from packaging.version import Version | |
| from typing_extensions import TypeAlias | |
| from litellm.integrations.custom_logger import CustomLogger | |
| from litellm.integrations.prompt_management_base import PromptManagementClient | |
| from litellm.litellm_core_utils.asyncify import run_async_function | |
| from litellm.types.llms.openai import AllMessageValues, ChatCompletionSystemMessage | |
| from litellm.types.utils import StandardCallbackDynamicParams, StandardLoggingPayload | |
| from ...litellm_core_utils.specialty_caches.dynamic_logging_cache import ( | |
| DynamicLoggingCache, | |
| ) | |
| from ..prompt_management_base import PromptManagementBase | |
| from .langfuse import LangFuseLogger | |
| from .langfuse_handler import LangFuseHandler | |
| if TYPE_CHECKING: | |
| from langfuse import Langfuse | |
| from langfuse.client import ChatPromptClient, TextPromptClient | |
| LangfuseClass: TypeAlias = Langfuse | |
| PROMPT_CLIENT = Union[TextPromptClient, ChatPromptClient] | |
| else: | |
| PROMPT_CLIENT = Any | |
| LangfuseClass = Any | |
| in_memory_dynamic_logger_cache = DynamicLoggingCache() | |
| def langfuse_client_init( | |
| langfuse_public_key=None, | |
| langfuse_secret=None, | |
| langfuse_secret_key=None, | |
| langfuse_host=None, | |
| flush_interval=1, | |
| ) -> LangfuseClass: | |
| """ | |
| Initialize Langfuse client with caching to prevent multiple initializations. | |
| Args: | |
| langfuse_public_key (str, optional): Public key for Langfuse. Defaults to None. | |
| langfuse_secret (str, optional): Secret key for Langfuse. Defaults to None. | |
| langfuse_host (str, optional): Host URL for Langfuse. Defaults to None. | |
| flush_interval (int, optional): Flush interval in seconds. Defaults to 1. | |
| Returns: | |
| Langfuse: Initialized Langfuse client instance | |
| Raises: | |
| Exception: If langfuse package is not installed | |
| """ | |
| try: | |
| import langfuse | |
| from langfuse import Langfuse | |
| except Exception as e: | |
| raise Exception( | |
| f"\033[91mLangfuse not installed, try running 'pip install langfuse' to fix this error: {e}\n\033[0m" | |
| ) | |
| # Instance variables | |
| secret_key = ( | |
| langfuse_secret or langfuse_secret_key or os.getenv("LANGFUSE_SECRET_KEY") | |
| ) | |
| public_key = langfuse_public_key or os.getenv("LANGFUSE_PUBLIC_KEY") | |
| langfuse_host = langfuse_host or os.getenv( | |
| "LANGFUSE_HOST", "https://cloud.langfuse.com" | |
| ) | |
| if not ( | |
| langfuse_host.startswith("http://") or langfuse_host.startswith("https://") | |
| ): | |
| # add http:// if unset, assume communicating over private network - e.g. render | |
| langfuse_host = "http://" + langfuse_host | |
| langfuse_release = os.getenv("LANGFUSE_RELEASE") | |
| langfuse_debug = os.getenv("LANGFUSE_DEBUG") | |
| parameters = { | |
| "public_key": public_key, | |
| "secret_key": secret_key, | |
| "host": langfuse_host, | |
| "release": langfuse_release, | |
| "debug": langfuse_debug, | |
| "flush_interval": LangFuseLogger._get_langfuse_flush_interval( | |
| flush_interval | |
| ), # flush interval in seconds | |
| } | |
| if Version(langfuse.version.__version__) >= Version("2.6.0"): | |
| parameters["sdk_integration"] = "litellm" | |
| client = Langfuse(**parameters) | |
| return client | |
| class LangfusePromptManagement(LangFuseLogger, PromptManagementBase, CustomLogger): | |
| def __init__( | |
| self, | |
| langfuse_public_key=None, | |
| langfuse_secret=None, | |
| langfuse_host=None, | |
| flush_interval=1, | |
| ): | |
| import langfuse | |
| self.langfuse_sdk_version = langfuse.version.__version__ | |
| self.Langfuse = langfuse_client_init( | |
| langfuse_public_key=langfuse_public_key, | |
| langfuse_secret=langfuse_secret, | |
| langfuse_host=langfuse_host, | |
| flush_interval=flush_interval, | |
| ) | |
| def integration_name(self): | |
| return "langfuse" | |
| def _get_prompt_from_id( | |
| self, langfuse_prompt_id: str, langfuse_client: LangfuseClass | |
| ) -> PROMPT_CLIENT: | |
| return langfuse_client.get_prompt(langfuse_prompt_id) | |
| def _compile_prompt( | |
| self, | |
| langfuse_prompt_client: PROMPT_CLIENT, | |
| langfuse_prompt_variables: Optional[dict], | |
| call_type: Union[Literal["completion"], Literal["text_completion"]], | |
| ) -> List[AllMessageValues]: | |
| compiled_prompt: Optional[Union[str, list]] = None | |
| if langfuse_prompt_variables is None: | |
| langfuse_prompt_variables = {} | |
| compiled_prompt = langfuse_prompt_client.compile(**langfuse_prompt_variables) | |
| if isinstance(compiled_prompt, str): | |
| compiled_prompt = [ | |
| ChatCompletionSystemMessage(role="system", content=compiled_prompt) | |
| ] | |
| else: | |
| compiled_prompt = cast(List[AllMessageValues], compiled_prompt) | |
| return compiled_prompt | |
| def _get_optional_params_from_langfuse( | |
| self, langfuse_prompt_client: PROMPT_CLIENT | |
| ) -> dict: | |
| config = langfuse_prompt_client.config | |
| optional_params = {} | |
| for k, v in config.items(): | |
| if k != "model": | |
| optional_params[k] = v | |
| return optional_params | |
| async def async_get_chat_completion_prompt( | |
| self, | |
| model: str, | |
| messages: List[AllMessageValues], | |
| non_default_params: dict, | |
| prompt_id: Optional[str], | |
| prompt_variables: Optional[dict], | |
| dynamic_callback_params: StandardCallbackDynamicParams, | |
| ) -> Tuple[ | |
| str, | |
| List[AllMessageValues], | |
| dict, | |
| ]: | |
| return self.get_chat_completion_prompt( | |
| model, | |
| messages, | |
| non_default_params, | |
| prompt_id, | |
| prompt_variables, | |
| dynamic_callback_params, | |
| ) | |
| def should_run_prompt_management( | |
| self, | |
| prompt_id: str, | |
| dynamic_callback_params: StandardCallbackDynamicParams, | |
| ) -> bool: | |
| langfuse_client = langfuse_client_init( | |
| langfuse_public_key=dynamic_callback_params.get("langfuse_public_key"), | |
| langfuse_secret=dynamic_callback_params.get("langfuse_secret"), | |
| langfuse_secret_key=dynamic_callback_params.get("langfuse_secret_key"), | |
| langfuse_host=dynamic_callback_params.get("langfuse_host"), | |
| ) | |
| langfuse_prompt_client = self._get_prompt_from_id( | |
| langfuse_prompt_id=prompt_id, langfuse_client=langfuse_client | |
| ) | |
| return langfuse_prompt_client is not None | |
| def _compile_prompt_helper( | |
| self, | |
| prompt_id: str, | |
| prompt_variables: Optional[dict], | |
| dynamic_callback_params: StandardCallbackDynamicParams, | |
| ) -> PromptManagementClient: | |
| langfuse_client = langfuse_client_init( | |
| langfuse_public_key=dynamic_callback_params.get("langfuse_public_key"), | |
| langfuse_secret=dynamic_callback_params.get("langfuse_secret"), | |
| langfuse_secret_key=dynamic_callback_params.get("langfuse_secret_key"), | |
| langfuse_host=dynamic_callback_params.get("langfuse_host"), | |
| ) | |
| langfuse_prompt_client = self._get_prompt_from_id( | |
| langfuse_prompt_id=prompt_id, langfuse_client=langfuse_client | |
| ) | |
| ## SET PROMPT | |
| compiled_prompt = self._compile_prompt( | |
| langfuse_prompt_client=langfuse_prompt_client, | |
| langfuse_prompt_variables=prompt_variables, | |
| call_type="completion", | |
| ) | |
| template_model = langfuse_prompt_client.config.get("model") | |
| template_optional_params = self._get_optional_params_from_langfuse( | |
| langfuse_prompt_client | |
| ) | |
| return PromptManagementClient( | |
| prompt_id=prompt_id, | |
| prompt_template=compiled_prompt, | |
| prompt_template_model=template_model, | |
| prompt_template_optional_params=template_optional_params, | |
| completed_messages=None, | |
| ) | |
| def log_success_event(self, kwargs, response_obj, start_time, end_time): | |
| return run_async_function( | |
| self.async_log_success_event, kwargs, response_obj, start_time, end_time | |
| ) | |
| async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): | |
| standard_callback_dynamic_params = kwargs.get( | |
| "standard_callback_dynamic_params" | |
| ) | |
| langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request( | |
| globalLangfuseLogger=self, | |
| standard_callback_dynamic_params=standard_callback_dynamic_params, | |
| in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, | |
| ) | |
| langfuse_logger_to_use.log_event_on_langfuse( | |
| kwargs=kwargs, | |
| response_obj=response_obj, | |
| start_time=start_time, | |
| end_time=end_time, | |
| user_id=kwargs.get("user", None), | |
| ) | |
| async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): | |
| standard_callback_dynamic_params = kwargs.get( | |
| "standard_callback_dynamic_params" | |
| ) | |
| langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request( | |
| globalLangfuseLogger=self, | |
| standard_callback_dynamic_params=standard_callback_dynamic_params, | |
| in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, | |
| ) | |
| standard_logging_object = cast( | |
| Optional[StandardLoggingPayload], | |
| kwargs.get("standard_logging_object", None), | |
| ) | |
| if standard_logging_object is None: | |
| return | |
| langfuse_logger_to_use.log_event_on_langfuse( | |
| start_time=start_time, | |
| end_time=end_time, | |
| response_obj=None, | |
| user_id=kwargs.get("user", None), | |
| status_message=standard_logging_object["error_str"], | |
| level="ERROR", | |
| kwargs=kwargs, | |
| ) | |