Spaces:
Runtime error
Runtime error
import importlib.metadata | |
import logging | |
import os | |
import traceback | |
import warnings | |
from contextvars import ContextVar | |
from typing import Any, Dict, List, Union, cast | |
from uuid import UUID | |
import requests | |
from langchain_core.agents import AgentAction, AgentFinish | |
from langchain_core.messages import BaseMessage | |
from langchain_core.outputs import LLMResult | |
from packaging.version import parse | |
from langchain.callbacks.base import BaseCallbackHandler | |
logger = logging.getLogger(__name__) | |
DEFAULT_API_URL = "https://app.llmonitor.com" | |
user_ctx = ContextVar[Union[str, None]]("user_ctx", default=None) | |
user_props_ctx = ContextVar[Union[str, None]]("user_props_ctx", default=None) | |
PARAMS_TO_CAPTURE = [ | |
"temperature", | |
"top_p", | |
"top_k", | |
"stop", | |
"presence_penalty", | |
"frequence_penalty", | |
"seed", | |
"function_call", | |
"functions", | |
"tools", | |
"tool_choice", | |
"response_format", | |
"max_tokens", | |
"logit_bias", | |
] | |
class UserContextManager: | |
"""Context manager for LLMonitor user context.""" | |
def __init__(self, user_id: str, user_props: Any = None) -> None: | |
user_ctx.set(user_id) | |
user_props_ctx.set(user_props) | |
def __enter__(self) -> Any: | |
pass | |
def __exit__(self, exc_type: Any, exc_value: Any, exc_tb: Any) -> Any: | |
user_ctx.set(None) | |
user_props_ctx.set(None) | |
def identify(user_id: str, user_props: Any = None) -> UserContextManager: | |
"""Builds an LLMonitor UserContextManager | |
Parameters: | |
- `user_id`: The user id. | |
- `user_props`: The user properties. | |
Returns: | |
A context manager that sets the user context. | |
""" | |
return UserContextManager(user_id, user_props) | |
def _serialize(obj: Any) -> Union[Dict[str, Any], List[Any], Any]: | |
if hasattr(obj, "to_json"): | |
return obj.to_json() | |
if isinstance(obj, dict): | |
return {key: _serialize(value) for key, value in obj.items()} | |
if isinstance(obj, list): | |
return [_serialize(element) for element in obj] | |
return obj | |
def _parse_input(raw_input: Any) -> Any: | |
if not raw_input: | |
return None | |
# if it's an array of 1, just parse the first element | |
if isinstance(raw_input, list) and len(raw_input) == 1: | |
return _parse_input(raw_input[0]) | |
if not isinstance(raw_input, dict): | |
return _serialize(raw_input) | |
input_value = raw_input.get("input") | |
inputs_value = raw_input.get("inputs") | |
question_value = raw_input.get("question") | |
query_value = raw_input.get("query") | |
if input_value: | |
return input_value | |
if inputs_value: | |
return inputs_value | |
if question_value: | |
return question_value | |
if query_value: | |
return query_value | |
return _serialize(raw_input) | |
def _parse_output(raw_output: dict) -> Any: | |
if not raw_output: | |
return None | |
if not isinstance(raw_output, dict): | |
return _serialize(raw_output) | |
text_value = raw_output.get("text") | |
output_value = raw_output.get("output") | |
output_text_value = raw_output.get("output_text") | |
answer_value = raw_output.get("answer") | |
result_value = raw_output.get("result") | |
if text_value: | |
return text_value | |
if answer_value: | |
return answer_value | |
if output_value: | |
return output_value | |
if output_text_value: | |
return output_text_value | |
if result_value: | |
return result_value | |
return _serialize(raw_output) | |
def _parse_lc_role( | |
role: str, | |
) -> str: | |
if role == "human": | |
return "user" | |
else: | |
return role | |
def _get_user_id(metadata: Any) -> Any: | |
if user_ctx.get() is not None: | |
return user_ctx.get() | |
metadata = metadata or {} | |
user_id = metadata.get("user_id") | |
if user_id is None: | |
user_id = metadata.get("userId") # legacy, to delete in the future | |
return user_id | |
def _get_user_props(metadata: Any) -> Any: | |
if user_props_ctx.get() is not None: | |
return user_props_ctx.get() | |
metadata = metadata or {} | |
return metadata.get("user_props", None) | |
def _parse_lc_message(message: BaseMessage) -> Dict[str, Any]: | |
keys = ["function_call", "tool_calls", "tool_call_id", "name"] | |
parsed = {"text": message.content, "role": _parse_lc_role(message.type)} | |
parsed.update( | |
{ | |
key: cast(Any, message.additional_kwargs.get(key)) | |
for key in keys | |
if message.additional_kwargs.get(key) is not None | |
} | |
) | |
return parsed | |
def _parse_lc_messages(messages: Union[List[BaseMessage], Any]) -> List[Dict[str, Any]]: | |
return [_parse_lc_message(message) for message in messages] | |
class LLMonitorCallbackHandler(BaseCallbackHandler): | |
"""Callback Handler for LLMonitor`. | |
#### Parameters: | |
- `app_id`: The app id of the app you want to report to. Defaults to | |
`None`, which means that `LLMONITOR_APP_ID` will be used. | |
- `api_url`: The url of the LLMonitor API. Defaults to `None`, | |
which means that either `LLMONITOR_API_URL` environment variable | |
or `https://app.llmonitor.com` will be used. | |
#### Raises: | |
- `ValueError`: if `app_id` is not provided either as an | |
argument or as an environment variable. | |
- `ConnectionError`: if the connection to the API fails. | |
#### Example: | |
```python | |
from langchain.llms import OpenAI | |
from langchain.callbacks import LLMonitorCallbackHandler | |
llmonitor_callback = LLMonitorCallbackHandler() | |
llm = OpenAI(callbacks=[llmonitor_callback], | |
metadata={"userId": "user-123"}) | |
llm.predict("Hello, how are you?") | |
``` | |
""" | |
__api_url: str | |
__app_id: str | |
__verbose: bool | |
__llmonitor_version: str | |
__has_valid_config: bool | |
def __init__( | |
self, | |
app_id: Union[str, None] = None, | |
api_url: Union[str, None] = None, | |
verbose: bool = False, | |
) -> None: | |
super().__init__() | |
self.__has_valid_config = True | |
try: | |
import llmonitor | |
self.__llmonitor_version = importlib.metadata.version("llmonitor") | |
self.__track_event = llmonitor.track_event | |
except ImportError: | |
logger.warning( | |
"""[LLMonitor] To use the LLMonitor callback handler you need to | |
have the `llmonitor` Python package installed. Please install it | |
with `pip install llmonitor`""" | |
) | |
self.__has_valid_config = False | |
return | |
if parse(self.__llmonitor_version) < parse("0.0.32"): | |
logger.warning( | |
f"""[LLMonitor] The installed `llmonitor` version is | |
{self.__llmonitor_version} | |
but `LLMonitorCallbackHandler` requires at least version 0.0.32 | |
upgrade `llmonitor` with `pip install --upgrade llmonitor`""" | |
) | |
self.__has_valid_config = False | |
self.__has_valid_config = True | |
self.__api_url = api_url or os.getenv("LLMONITOR_API_URL") or DEFAULT_API_URL | |
self.__verbose = verbose or bool(os.getenv("LLMONITOR_VERBOSE")) | |
_app_id = app_id or os.getenv("LLMONITOR_APP_ID") | |
if _app_id is None: | |
logger.warning( | |
"""[LLMonitor] app_id must be provided either as an argument or | |
as an environment variable""" | |
) | |
self.__has_valid_config = False | |
else: | |
self.__app_id = _app_id | |
if self.__has_valid_config is False: | |
return None | |
try: | |
res = requests.get(f"{self.__api_url}/api/app/{self.__app_id}") | |
if not res.ok: | |
raise ConnectionError() | |
except Exception: | |
logger.warning( | |
f"""[LLMonitor] Could not connect to the LLMonitor API at | |
{self.__api_url}""" | |
) | |
def on_llm_start( | |
self, | |
serialized: Dict[str, Any], | |
prompts: List[str], | |
*, | |
run_id: UUID, | |
parent_run_id: Union[UUID, None] = None, | |
tags: Union[List[str], None] = None, | |
metadata: Union[Dict[str, Any], None] = None, | |
**kwargs: Any, | |
) -> None: | |
if self.__has_valid_config is False: | |
return | |
try: | |
user_id = _get_user_id(metadata) | |
user_props = _get_user_props(metadata) | |
params = kwargs.get("invocation_params", {}) | |
params.update( | |
serialized.get("kwargs", {}) | |
) # Sometimes, for example with ChatAnthropic, `invocation_params` is empty | |
name = ( | |
params.get("model") | |
or params.get("model_name") | |
or params.get("model_id") | |
) | |
if not name and "anthropic" in params.get("_type"): | |
name = "claude-2" | |
extra = { | |
param: params.get(param) | |
for param in PARAMS_TO_CAPTURE | |
if params.get(param) is not None | |
} | |
input = _parse_input(prompts) | |
self.__track_event( | |
"llm", | |
"start", | |
user_id=user_id, | |
run_id=str(run_id), | |
parent_run_id=str(parent_run_id) if parent_run_id else None, | |
name=name, | |
input=input, | |
tags=tags, | |
extra=extra, | |
metadata=metadata, | |
user_props=user_props, | |
app_id=self.__app_id, | |
) | |
except Exception as e: | |
warnings.warn(f"[LLMonitor] An error occurred in on_llm_start: {e}") | |
def on_chat_model_start( | |
self, | |
serialized: Dict[str, Any], | |
messages: List[List[BaseMessage]], | |
*, | |
run_id: UUID, | |
parent_run_id: Union[UUID, None] = None, | |
tags: Union[List[str], None] = None, | |
metadata: Union[Dict[str, Any], None] = None, | |
**kwargs: Any, | |
) -> Any: | |
if self.__has_valid_config is False: | |
return | |
try: | |
user_id = _get_user_id(metadata) | |
user_props = _get_user_props(metadata) | |
params = kwargs.get("invocation_params", {}) | |
params.update( | |
serialized.get("kwargs", {}) | |
) # Sometimes, for example with ChatAnthropic, `invocation_params` is empty | |
name = ( | |
params.get("model") | |
or params.get("model_name") | |
or params.get("model_id") | |
) | |
if not name and "anthropic" in params.get("_type"): | |
name = "claude-2" | |
extra = { | |
param: params.get(param) | |
for param in PARAMS_TO_CAPTURE | |
if params.get(param) is not None | |
} | |
input = _parse_lc_messages(messages[0]) | |
self.__track_event( | |
"llm", | |
"start", | |
user_id=user_id, | |
run_id=str(run_id), | |
parent_run_id=str(parent_run_id) if parent_run_id else None, | |
name=name, | |
input=input, | |
tags=tags, | |
extra=extra, | |
metadata=metadata, | |
user_props=user_props, | |
app_id=self.__app_id, | |
) | |
except Exception as e: | |
logger.error(f"[LLMonitor] An error occurred in on_chat_model_start: {e}") | |
def on_llm_end( | |
self, | |
response: LLMResult, | |
*, | |
run_id: UUID, | |
parent_run_id: Union[UUID, None] = None, | |
**kwargs: Any, | |
) -> None: | |
if self.__has_valid_config is False: | |
return | |
try: | |
token_usage = (response.llm_output or {}).get("token_usage", {}) | |
parsed_output: Any = [ | |
_parse_lc_message(generation.message) | |
if hasattr(generation, "message") | |
else generation.text | |
for generation in response.generations[0] | |
] | |
# if it's an array of 1, just parse the first element | |
if len(parsed_output) == 1: | |
parsed_output = parsed_output[0] | |
self.__track_event( | |
"llm", | |
"end", | |
run_id=str(run_id), | |
parent_run_id=str(parent_run_id) if parent_run_id else None, | |
output=parsed_output, | |
token_usage={ | |
"prompt": token_usage.get("prompt_tokens"), | |
"completion": token_usage.get("completion_tokens"), | |
}, | |
app_id=self.__app_id, | |
) | |
except Exception as e: | |
logger.error(f"[LLMonitor] An error occurred in on_llm_end: {e}") | |
def on_tool_start( | |
self, | |
serialized: Dict[str, Any], | |
input_str: str, | |
*, | |
run_id: UUID, | |
parent_run_id: Union[UUID, None] = None, | |
tags: Union[List[str], None] = None, | |
metadata: Union[Dict[str, Any], None] = None, | |
**kwargs: Any, | |
) -> None: | |
if self.__has_valid_config is False: | |
return | |
try: | |
user_id = _get_user_id(metadata) | |
user_props = _get_user_props(metadata) | |
name = serialized.get("name") | |
self.__track_event( | |
"tool", | |
"start", | |
user_id=user_id, | |
run_id=str(run_id), | |
parent_run_id=str(parent_run_id) if parent_run_id else None, | |
name=name, | |
input=input_str, | |
tags=tags, | |
metadata=metadata, | |
user_props=user_props, | |
app_id=self.__app_id, | |
) | |
except Exception as e: | |
logger.error(f"[LLMonitor] An error occurred in on_tool_start: {e}") | |
def on_tool_end( | |
self, | |
output: str, | |
*, | |
run_id: UUID, | |
parent_run_id: Union[UUID, None] = None, | |
tags: Union[List[str], None] = None, | |
**kwargs: Any, | |
) -> None: | |
if self.__has_valid_config is False: | |
return | |
try: | |
self.__track_event( | |
"tool", | |
"end", | |
run_id=str(run_id), | |
parent_run_id=str(parent_run_id) if parent_run_id else None, | |
output=output, | |
app_id=self.__app_id, | |
) | |
except Exception as e: | |
logger.error(f"[LLMonitor] An error occurred in on_tool_end: {e}") | |
def on_chain_start( | |
self, | |
serialized: Dict[str, Any], | |
inputs: Dict[str, Any], | |
*, | |
run_id: UUID, | |
parent_run_id: Union[UUID, None] = None, | |
tags: Union[List[str], None] = None, | |
metadata: Union[Dict[str, Any], None] = None, | |
**kwargs: Any, | |
) -> Any: | |
if self.__has_valid_config is False: | |
return | |
try: | |
name = serialized.get("id", [None, None, None, None])[3] | |
type = "chain" | |
metadata = metadata or {} | |
agentName = metadata.get("agent_name") | |
if agentName is None: | |
agentName = metadata.get("agentName") | |
if name == "AgentExecutor" or name == "PlanAndExecute": | |
type = "agent" | |
if agentName is not None: | |
type = "agent" | |
name = agentName | |
if parent_run_id is not None: | |
type = "chain" | |
user_id = _get_user_id(metadata) | |
user_props = _get_user_props(metadata) | |
input = _parse_input(inputs) | |
self.__track_event( | |
type, | |
"start", | |
user_id=user_id, | |
run_id=str(run_id), | |
parent_run_id=str(parent_run_id) if parent_run_id else None, | |
name=name, | |
input=input, | |
tags=tags, | |
metadata=metadata, | |
user_props=user_props, | |
app_id=self.__app_id, | |
) | |
except Exception as e: | |
logger.error(f"[LLMonitor] An error occurred in on_chain_start: {e}") | |
def on_chain_end( | |
self, | |
outputs: Dict[str, Any], | |
*, | |
run_id: UUID, | |
parent_run_id: Union[UUID, None] = None, | |
**kwargs: Any, | |
) -> Any: | |
if self.__has_valid_config is False: | |
return | |
try: | |
output = _parse_output(outputs) | |
self.__track_event( | |
"chain", | |
"end", | |
run_id=str(run_id), | |
parent_run_id=str(parent_run_id) if parent_run_id else None, | |
output=output, | |
app_id=self.__app_id, | |
) | |
except Exception as e: | |
logger.error(f"[LLMonitor] An error occurred in on_chain_end: {e}") | |
def on_agent_action( | |
self, | |
action: AgentAction, | |
*, | |
run_id: UUID, | |
parent_run_id: Union[UUID, None] = None, | |
**kwargs: Any, | |
) -> Any: | |
if self.__has_valid_config is False: | |
return | |
try: | |
name = action.tool | |
input = _parse_input(action.tool_input) | |
self.__track_event( | |
"tool", | |
"start", | |
run_id=str(run_id), | |
parent_run_id=str(parent_run_id) if parent_run_id else None, | |
name=name, | |
input=input, | |
app_id=self.__app_id, | |
) | |
except Exception as e: | |
logger.error(f"[LLMonitor] An error occurred in on_agent_action: {e}") | |
def on_agent_finish( | |
self, | |
finish: AgentFinish, | |
*, | |
run_id: UUID, | |
parent_run_id: Union[UUID, None] = None, | |
**kwargs: Any, | |
) -> Any: | |
if self.__has_valid_config is False: | |
return | |
try: | |
output = _parse_output(finish.return_values) | |
self.__track_event( | |
"agent", | |
"end", | |
run_id=str(run_id), | |
parent_run_id=str(parent_run_id) if parent_run_id else None, | |
output=output, | |
app_id=self.__app_id, | |
) | |
except Exception as e: | |
logger.error(f"[LLMonitor] An error occurred in on_agent_finish: {e}") | |
def on_chain_error( | |
self, | |
error: BaseException, | |
*, | |
run_id: UUID, | |
parent_run_id: Union[UUID, None] = None, | |
**kwargs: Any, | |
) -> Any: | |
if self.__has_valid_config is False: | |
return | |
try: | |
self.__track_event( | |
"chain", | |
"error", | |
run_id=str(run_id), | |
parent_run_id=str(parent_run_id) if parent_run_id else None, | |
error={"message": str(error), "stack": traceback.format_exc()}, | |
app_id=self.__app_id, | |
) | |
except Exception as e: | |
logger.error(f"[LLMonitor] An error occurred in on_chain_error: {e}") | |
def on_tool_error( | |
self, | |
error: BaseException, | |
*, | |
run_id: UUID, | |
parent_run_id: Union[UUID, None] = None, | |
**kwargs: Any, | |
) -> Any: | |
if self.__has_valid_config is False: | |
return | |
try: | |
self.__track_event( | |
"tool", | |
"error", | |
run_id=str(run_id), | |
parent_run_id=str(parent_run_id) if parent_run_id else None, | |
error={"message": str(error), "stack": traceback.format_exc()}, | |
app_id=self.__app_id, | |
) | |
except Exception as e: | |
logger.error(f"[LLMonitor] An error occurred in on_tool_error: {e}") | |
def on_llm_error( | |
self, | |
error: BaseException, | |
*, | |
run_id: UUID, | |
parent_run_id: Union[UUID, None] = None, | |
**kwargs: Any, | |
) -> Any: | |
if self.__has_valid_config is False: | |
return | |
try: | |
self.__track_event( | |
"llm", | |
"error", | |
run_id=str(run_id), | |
parent_run_id=str(parent_run_id) if parent_run_id else None, | |
error={"message": str(error), "stack": traceback.format_exc()}, | |
app_id=self.__app_id, | |
) | |
except Exception as e: | |
logger.error(f"[LLMonitor] An error occurred in on_llm_error: {e}") | |
__all__ = ["LLMonitorCallbackHandler", "identify"] | |