Final_Assignment_Template / observability.py
Humanlearning's picture
updated error handling for search api
df7388a
"""
Observability module for Langfuse v3 integration with OpenTelemetry support.
This module provides:
- Single global CallbackHandler for LangChain integration
- Root span management for user requests
- Session and user tracking
- Background flushing for async operations
"""
import os
import base64
from typing import Optional, Dict, Any
from contextlib import contextmanager
from dotenv import load_dotenv
# Langfuse v3 imports
from langfuse import get_client
from langfuse.langchain import CallbackHandler
# OpenTelemetry imports for v3 compatibility
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
# Load environment variables
load_dotenv("env.local")
# Global callback handler instance (singleton)
_langfuse_handler: Optional[CallbackHandler] = None
_tracer_provider: Optional[TracerProvider] = None
def initialize_observability() -> bool:
"""
Initialize Langfuse observability with OTEL integration.
Returns:
bool: True if initialization successful, False otherwise
"""
global _langfuse_handler, _tracer_provider
try:
# Check required environment variables
required_vars = ["LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_HOST"]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
print(f"Warning: Missing required environment variables: {missing_vars}")
return False
# Setup OTEL integration for Langfuse v3
langfuse_auth = base64.b64encode(
f"{os.getenv('LANGFUSE_PUBLIC_KEY')}:{os.getenv('LANGFUSE_SECRET_KEY')}".encode()
).decode()
# Configure OTEL environment
os.environ['OTEL_EXPORTER_OTLP_ENDPOINT'] = f"{os.getenv('LANGFUSE_HOST')}/api/public/otel"
os.environ['OTEL_EXPORTER_OTLP_HEADERS'] = f"Authorization=Basic {langfuse_auth}"
# Setup OpenTelemetry tracer provider
_tracer_provider = TracerProvider()
_tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(_tracer_provider)
# Create single global callback handler
_langfuse_handler = CallbackHandler()
print("✅ Langfuse observability initialized successfully")
return True
except Exception as e:
print(f"❌ Failed to initialize observability: {e}")
return False
def get_callback_handler() -> Optional[CallbackHandler]:
"""
Get the global Langfuse callback handler.
Returns:
CallbackHandler or None if not initialized
"""
global _langfuse_handler
if _langfuse_handler is None:
if initialize_observability():
return _langfuse_handler
return None
return _langfuse_handler
@contextmanager
def start_root_span(
name: str,
user_id: str,
session_id: str,
metadata: Optional[Dict[str, Any]] = None
):
"""
Context manager for creating root spans with user and session tracking.
Args:
name: Span name (e.g., "user-request")
user_id: User identifier for session tracking
session_id: Session identifier for conversation continuity
metadata: Optional additional metadata
Yields:
Langfuse span context or None if creation fails
"""
span = None
try:
# Create root span with v3 API
client = get_client()
span = client.start_as_current_span(name=name)
span_context = span.__enter__()
# Update trace with user and session information
span_context.update_trace(
user_id=user_id,
session_id=session_id,
tags=[
os.getenv("ENV", "dev"), # Environment tag
"multi-agent-system" # System identifier
]
)
# Add metadata if provided
if metadata:
span_context.update_trace(metadata=metadata)
yield span_context
except Exception as e:
print(f"Warning: Failed to create root span: {e}")
# Yield None so code doesn't break
yield None
finally:
# Ensure proper cleanup
if span is not None:
try:
span.__exit__(None, None, None)
except Exception as e:
print(f"Warning: Error closing span: {e}")
def flush_traces(background: bool = True) -> None:
"""
Flush pending traces to Langfuse.
Args:
background: Whether to flush in background (non-blocking)
"""
try:
client = get_client()
client.flush()
except Exception as e:
print(f"Warning: Failed to flush traces: {e}")
def shutdown_observability() -> None:
"""
Clean shutdown of observability components.
"""
global _tracer_provider
try:
# Flush any remaining traces
flush_traces(background=False)
# Shutdown tracer provider
if _tracer_provider:
_tracer_provider.shutdown()
except Exception as e:
print(f"Warning: Error during observability shutdown: {e}")
# Agent span helpers for consistent naming
@contextmanager
def agent_span(agent_name: str, metadata: Optional[Dict[str, Any]] = None):
"""
Context manager for agent-level spans.
Args:
agent_name: Name of the agent (e.g., "lead", "research", "code")
metadata: Optional metadata for the span
"""
span_name = f"agent/{agent_name}"
span = None
try:
client = get_client()
span = client.start_as_current_span(name=span_name)
span_context = span.__enter__()
if metadata:
span_context.update_trace(metadata=metadata)
yield span_context
except Exception as e:
print(f"Warning: Failed to create agent span for {agent_name}: {e}")
yield None
finally:
if span is not None:
try:
span.__exit__(None, None, None)
except Exception as e:
print(f"Warning: Error closing agent span: {e}")
@contextmanager
def tool_span(tool_name: str, metadata: Optional[Dict[str, Any]] = None):
"""
Context manager for tool-level spans.
Args:
tool_name: Name of the tool (e.g., "tavily_search", "calculator")
metadata: Optional metadata for the span
"""
span_name = f"tool/{tool_name}"
span = None
try:
client = get_client()
span = client.start_as_current_span(name=span_name)
span_context = span.__enter__()
if metadata:
span_context.update_trace(metadata=metadata)
yield span_context
except Exception as e:
print(f"Warning: Failed to create tool span for {tool_name}: {e}")
yield None
finally:
if span is not None:
try:
span.__exit__(None, None, None)
except Exception as e:
print(f"Warning: Error closing tool span: {e}")