litellm / litellm /tests /test_alangfuse.py
ka1kuk's picture
Upload 235 files
7db0ae4 verified
import json
import sys
import os
import io, asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
sys.path.insert(0, os.path.abspath("../.."))
from litellm import completion
import litellm
litellm.num_retries = 3
litellm.success_callback = ["langfuse"]
os.environ["LANGFUSE_DEBUG"] = "True"
import time
import pytest
def search_logs(log_file_path, num_good_logs=1):
"""
Searches the given log file for logs containing the "/api/public" string.
Parameters:
- log_file_path (str): The path to the log file to be searched.
Returns:
- None
Raises:
- Exception: If there are any bad logs found in the log file.
"""
import re
print("\n searching logs")
bad_logs = []
good_logs = []
all_logs = []
try:
with open(log_file_path, "r") as log_file:
lines = log_file.readlines()
print(f"searching logslines: {lines}")
for line in lines:
all_logs.append(line.strip())
if "/api/public" in line:
print("Found log with /api/public:")
print(line.strip())
print("\n\n")
match = re.search(
r'"POST /api/public/ingestion HTTP/1.1" (\d+) (\d+)',
line,
)
if match:
status_code = int(match.group(1))
print("STATUS CODE", status_code)
if (
status_code != 200
and status_code != 201
and status_code != 207
):
print("got a BAD log")
bad_logs.append(line.strip())
else:
good_logs.append(line.strip())
print("\nBad Logs")
print(bad_logs)
if len(bad_logs) > 0:
raise Exception(f"bad logs, Bad logs = {bad_logs}")
assert (
len(good_logs) == num_good_logs
), f"Did not get expected number of good logs, expected {num_good_logs}, got {len(good_logs)}. All logs \n {all_logs}"
print("\nGood Logs")
print(good_logs)
if len(good_logs) <= 0:
raise Exception(
f"There were no Good Logs from Langfuse. No logs with /api/public status 200. \nAll logs:{all_logs}"
)
except Exception as e:
raise e
def pre_langfuse_setup():
"""
Set up the logging for the 'pre_langfuse_setup' function.
"""
# sends logs to langfuse.log
import logging
# Configure the logging to write to a file
logging.basicConfig(filename="langfuse.log", level=logging.DEBUG)
logger = logging.getLogger()
# Add a FileHandler to the logger
file_handler = logging.FileHandler("langfuse.log", mode="w")
file_handler.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
return
@pytest.mark.skip(reason="beta test - checking langfuse output")
def test_langfuse_logging_async():
try:
pre_langfuse_setup()
litellm.set_verbose = True
litellm.success_callback = ["langfuse"]
async def _test_langfuse():
response = await litellm.acompletion(
model="azure/chatgpt-v-2",
messages=[{"role": "user", "content": "This is a test"}],
max_tokens=100,
temperature=0.7,
timeout=5,
user="test_user",
)
await asyncio.sleep(1)
return response
response = asyncio.run(_test_langfuse())
print(f"response: {response}")
# # check langfuse.log to see if there was a failed response
search_logs("langfuse.log")
except litellm.Timeout as e:
pass
except Exception as e:
pytest.fail(f"An exception occurred - {e}")
# def test_langfuse_logging_async_text_completion():
# try:
# pre_langfuse_setup()
# litellm.set_verbose = False
# litellm.success_callback = ["langfuse"]
# async def _test_langfuse():
# response = await litellm.atext_completion(
# model="gpt-3.5-turbo-instruct",
# prompt="this is a test",
# max_tokens=5,
# temperature=0.7,
# timeout=5,
# user="test_user",
# stream=True
# )
# async for chunk in response:
# print()
# print(chunk)
# await asyncio.sleep(1)
# return response
# response = asyncio.run(_test_langfuse())
# print(f"response: {response}")
# # # check langfuse.log to see if there was a failed response
# search_logs("langfuse.log")
# except litellm.Timeout as e:
# pass
# except Exception as e:
# pytest.fail(f"An exception occurred - {e}")
# test_langfuse_logging_async_text_completion()
@pytest.mark.skip(reason="beta test - checking langfuse output")
def test_langfuse_logging():
try:
pre_langfuse_setup()
litellm.set_verbose = True
response = completion(
model="claude-instant-1.2",
messages=[{"role": "user", "content": "Hi πŸ‘‹ - i'm claude"}],
max_tokens=10,
temperature=0.2,
)
print(response)
# time.sleep(5)
# # check langfuse.log to see if there was a failed response
# search_logs("langfuse.log")
except litellm.Timeout as e:
pass
except Exception as e:
pytest.fail(f"An exception occurred - {e}")
# test_langfuse_logging()
@pytest.mark.skip(reason="beta test - checking langfuse output")
def test_langfuse_logging_stream():
try:
litellm.set_verbose = True
response = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "this is a streaming test for llama2 + langfuse",
}
],
max_tokens=20,
temperature=0.2,
stream=True,
)
print(response)
for chunk in response:
pass
# print(chunk)
except litellm.Timeout as e:
pass
except Exception as e:
print(e)
# test_langfuse_logging_stream()
@pytest.mark.skip(reason="beta test - checking langfuse output")
def test_langfuse_logging_custom_generation_name():
try:
litellm.set_verbose = True
response = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hi πŸ‘‹ - i'm claude"}],
max_tokens=10,
metadata={
"langfuse/foo": "bar",
"langsmith/fizz": "buzz",
"prompt_hash": "asdf98u0j9131123",
"generation_name": "ishaan-test-generation",
"generation_id": "gen-id22",
"trace_id": "trace-id22",
"trace_user_id": "user-id2",
},
)
print(response)
except litellm.Timeout as e:
pass
except Exception as e:
pytest.fail(f"An exception occurred - {e}")
print(e)
test_langfuse_logging_custom_generation_name()
@pytest.mark.skip(reason="beta test - checking langfuse output")
def test_langfuse_logging_function_calling():
litellm.set_verbose = True
function1 = [
{
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location"],
},
}
]
try:
response = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "what's the weather in boston"}],
temperature=0.1,
functions=function1,
)
print(response)
except litellm.Timeout as e:
pass
except Exception as e:
print(e)
# test_langfuse_logging_function_calling()
def test_langfuse_logging_tool_calling():
litellm.set_verbose = True
def get_current_weather(location, unit="fahrenheit"):
"""Get the current weather in a given location"""
if "tokyo" in location.lower():
return json.dumps(
{"location": "Tokyo", "temperature": "10", "unit": "celsius"}
)
elif "san francisco" in location.lower():
return json.dumps(
{"location": "San Francisco", "temperature": "72", "unit": "fahrenheit"}
)
elif "paris" in location.lower():
return json.dumps(
{"location": "Paris", "temperature": "22", "unit": "celsius"}
)
else:
return json.dumps({"location": location, "temperature": "unknown"})
messages = [
{
"role": "user",
"content": "What's the weather like in San Francisco, Tokyo, and Paris?",
}
]
tools = [
{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location"],
},
},
}
]
response = litellm.completion(
model="gpt-3.5-turbo-1106",
messages=messages,
tools=tools,
tool_choice="auto", # auto is default, but we'll be explicit
)
print("\nLLM Response1:\n", response)
response_message = response.choices[0].message
tool_calls = response.choices[0].message.tool_calls
# test_langfuse_logging_tool_calling()