#### What this does #### # On success, logs events to Langfuse import dotenv, os import requests import requests from datetime import datetime dotenv.load_dotenv() # Loading env variables using dotenv import traceback from packaging.version import Version class LangFuseLogger: # Class variables or attributes def __init__(self): try: from langfuse import Langfuse except Exception as e: raise Exception( f"\033[91mLangfuse not installed, try running 'pip install langfuse' to fix this error: {e}\033[0m" ) # Instance variables self.secret_key = os.getenv("LANGFUSE_SECRET_KEY") self.public_key = os.getenv("LANGFUSE_PUBLIC_KEY") self.langfuse_host = os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com") self.langfuse_release = os.getenv("LANGFUSE_RELEASE") self.langfuse_debug = os.getenv("LANGFUSE_DEBUG") self.Langfuse = Langfuse( public_key=self.public_key, secret_key=self.secret_key, host=self.langfuse_host, release=self.langfuse_release, debug=self.langfuse_debug, ) def log_event( self, kwargs, response_obj, start_time, end_time, user_id, print_verbose ): # Method definition try: print_verbose( f"Langfuse Logging - Enters logging function for model {kwargs}" ) litellm_params = kwargs.get("litellm_params", {}) metadata = ( litellm_params.get("metadata", {}) or {} ) # if litellm_params['metadata'] == None prompt = [kwargs.get("messages")] optional_params = kwargs.get("optional_params", {}) optional_params.pop("functions", None) optional_params.pop("tools", None) # langfuse only accepts str, int, bool, float for logging for param, value in optional_params.items(): if not isinstance(value, (str, int, bool, float)): try: optional_params[param] = str(value) except: # if casting value to str fails don't block logging pass # end of processing langfuse ######################## input = prompt output = response_obj["choices"][0]["message"].json() print_verbose( f"OUTPUT IN LANGFUSE: {output}; original: {response_obj['choices'][0]['message']}" ) self._log_langfuse_v2( user_id, metadata, output, start_time, end_time, kwargs, optional_params, input, response_obj, ) if self._is_langfuse_v2() else self._log_langfuse_v1( user_id, metadata, output, start_time, end_time, kwargs, optional_params, input, response_obj, ) self.Langfuse.flush() print_verbose( f"Langfuse Layer Logging - final response object: {response_obj}" ) except: traceback.print_exc() print_verbose(f"Langfuse Layer Error - {traceback.format_exc()}") pass async def _async_log_event( self, kwargs, response_obj, start_time, end_time, user_id, print_verbose ): self.log_event( kwargs, response_obj, start_time, end_time, user_id, print_verbose ) def _is_langfuse_v2(self): import langfuse return Version(langfuse.version.__version__) >= Version("2.0.0") def _log_langfuse_v1( self, user_id, metadata, output, start_time, end_time, kwargs, optional_params, input, response_obj, ): from langfuse.model import CreateTrace, CreateGeneration print( "Please upgrade langfuse to v2.0.0 or higher: https://github.com/langfuse/langfuse-python/releases/tag/v2.0.1" ) trace = self.Langfuse.trace( CreateTrace( name=metadata.get("generation_name", "litellm-completion"), input=input, output=output, userId=user_id, ) ) trace.generation( CreateGeneration( name=metadata.get("generation_name", "litellm-completion"), startTime=start_time, endTime=end_time, model=kwargs["model"], modelParameters=optional_params, input=input, output=output, usage={ "prompt_tokens": response_obj["usage"]["prompt_tokens"], "completion_tokens": response_obj["usage"]["completion_tokens"], }, metadata=metadata, ) ) def _log_langfuse_v2( self, user_id, metadata, output, start_time, end_time, kwargs, optional_params, input, response_obj, ): trace = self.Langfuse.trace( name=metadata.get("generation_name", "litellm-completion"), input=input, output=output, user_id=metadata.get("trace_user_id", user_id), id=metadata.get("trace_id", None), ) trace.generation( name=metadata.get("generation_name", "litellm-completion"), id=metadata.get("generation_id", None), startTime=start_time, endTime=end_time, model=kwargs["model"], modelParameters=optional_params, input=input, output=output, usage={ "prompt_tokens": response_obj["usage"]["prompt_tokens"], "completion_tokens": response_obj["usage"]["completion_tokens"], }, metadata=metadata, )