Spaces:
Sleeping
Sleeping
| import sys | |
| import os | |
| import traceback | |
| import time | |
| import datetime | |
| import requests | |
| from threading import Lock | |
| from typing import Any, Iterator, Sequence, Optional, TYPE_CHECKING | |
| from contextvars import Token | |
| from urllib.parse import urljoin | |
| import opentelemetry.context as otlp_context_api | |
| from opentelemetry.trace import ( | |
| SpanKind, | |
| set_span_in_context, | |
| get_current_span as get_current_otlp_span, | |
| NonRecordingSpan, | |
| SpanContext, | |
| TraceFlags | |
| ) | |
| from opentelemetry.trace.status import StatusCode | |
| from opentelemetry.sdk.trace import ( | |
| ReadableSpan, | |
| SynchronousMultiSpanProcessor, | |
| Tracer as SDKTracer, | |
| Span as SDKSpan, | |
| TracerProvider as SDKTracerProvider | |
| ) | |
| from opentelemetry.context import Context as OTLPContext | |
| from opentelemetry.semconv.trace import SpanAttributes | |
| from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor | |
| from aworld.trace.base import ( | |
| AttributeValueType, | |
| NoOpTracer, | |
| SpanType, | |
| TraceProvider, | |
| Tracer, | |
| Span, | |
| TraceContext, | |
| set_tracer_provider | |
| ) | |
| from aworld.trace.span_cosumer import SpanConsumer | |
| from aworld.trace.propagator import get_global_trace_context | |
| from aworld.trace.baggage.sofa_tracer import SofaSpanHelper | |
| from aworld.logs.util import logger | |
| from aworld.utils.common import get_local_ip | |
| from .memory_storage import InMemorySpanExporter, InMemoryStorage | |
| from ..constants import ATTRIBUTES_MESSAGE_KEY | |
| from .export import FileSpanExporter, NoOpSpanExporter, SpanConsumerExporter | |
| from ..server import set_trace_server | |
| class OTLPTraceProvider(TraceProvider): | |
| """A TraceProvider that wraps an existing `SDKTracerProvider`. | |
| This class provides a way to use a `SDKTracerProvider` as a `TraceProvider`. | |
| When the context manager is entered, it returns the `SDKTracerProvider` itself. | |
| When the context manager is exited, it calls `shutdown` on the `SDKTracerProvider`. | |
| Args: | |
| provider: The internal provider to wrap. | |
| """ | |
| def __init__(self, provider: SDKTracerProvider, suppressed_scopes: Optional[set[str]] = None): | |
| self._provider: SDKTracerProvider = provider | |
| self._suppressed_scopes = set() | |
| if suppressed_scopes: | |
| self._suppressed_scopes.update(suppressed_scopes) | |
| self._lock: Lock = Lock() | |
| def get_tracer( | |
| self, | |
| name: str, | |
| version: Optional[str] = None | |
| ): | |
| with self._lock: | |
| if name in self._suppressed_scopes: | |
| return NoOpTracer() | |
| else: | |
| tracer = self._provider.get_tracer(instrumenting_module_name=name, | |
| instrumenting_library_version=version) | |
| return OTLPTracer(tracer) | |
| def shutdown(self) -> None: | |
| with self._lock: | |
| if isinstance(self._provider, SDKTracerProvider): | |
| self._provider.shutdown() | |
| def force_flush(self, timeout: Optional[float] = None) -> bool: | |
| with self._lock: | |
| if isinstance(self._provider, SDKTracerProvider): | |
| return self._provider.force_flush(timeout) | |
| else: | |
| return False | |
| def get_current_span(self) -> Optional["Span"]: | |
| otlp_span = get_current_otlp_span() | |
| return OTLPSpan(otlp_span, is_new_span=False) | |
| class OTLPTracer(Tracer): | |
| """A Tracer represents a collection of Spans. | |
| Args: | |
| tracer: The internal tracer to wrap. | |
| """ | |
| def __init__(self, tracer: SDKTracer): | |
| self._tracer = tracer | |
| def start_span( | |
| self, | |
| name: str, | |
| span_type: SpanType = SpanType.INTERNAL, | |
| attributes: dict[str, AttributeValueType] = None, | |
| start_time: Optional[int] = None, | |
| record_exception: bool = True, | |
| set_status_on_exception: bool = True, | |
| trace_context: Optional[TraceContext] = None | |
| ) -> "Span": | |
| otel_context = None | |
| trace_context = trace_context or get_global_trace_context().get_and_clear() | |
| if trace_context: | |
| otel_context = self._get_otel_context_from_trace_context( | |
| trace_context) | |
| start_time = start_time or time.time_ns() | |
| attributes = {**(attributes or {})} | |
| attributes.setdefault(ATTRIBUTES_MESSAGE_KEY, name) | |
| SofaSpanHelper.set_sofa_context_to_attr(attributes) | |
| attributes = {k: v for k, v in attributes.items( | |
| ) if is_valid_attribute_value(k, v)} | |
| span_kind = self._convert_to_span_kind( | |
| span_type) if span_type else SpanKind.INTERNAL | |
| span = self._tracer.start_span(name=name, | |
| kind=span_kind, | |
| context=otel_context, | |
| attributes=attributes, | |
| start_time=start_time, | |
| record_exception=record_exception, | |
| set_status_on_exception=set_status_on_exception) | |
| return OTLPSpan(span) | |
| def start_as_current_span( | |
| self, | |
| name: str, | |
| span_type: SpanType = SpanType.INTERNAL, | |
| attributes: dict[str, AttributeValueType] = None, | |
| start_time: Optional[int] = None, | |
| record_exception: bool = True, | |
| set_status_on_exception: bool = True, | |
| end_on_exit: bool = True, | |
| trace_context: Optional[TraceContext] = None | |
| ) -> Iterator["Span"]: | |
| start_time = start_time or time.time_ns() | |
| attributes = {**(attributes or {})} | |
| attributes.setdefault(ATTRIBUTES_MESSAGE_KEY, name) | |
| SofaSpanHelper.set_sofa_context_to_attr(attributes) | |
| attributes = {k: v for k, v in attributes.items( | |
| ) if is_valid_attribute_value(k, v)} | |
| span_kind = self._convert_to_span_kind( | |
| span_type) if span_type else SpanKind.INTERNAL | |
| otel_context = None | |
| trace_context = trace_context or get_global_trace_context().get_and_clear() | |
| if trace_context: | |
| otel_context = self._get_otel_context_from_trace_context( | |
| trace_context) | |
| class _OTLPSpanContextManager: | |
| def __init__(self, tracer: SDKTracer): | |
| self._span_cm = None | |
| self._tracer = tracer | |
| def __enter__(self): | |
| self._span_cm = self._tracer.start_as_current_span( | |
| name=name, | |
| kind=span_kind, | |
| context=otel_context, | |
| attributes=attributes, | |
| start_time=start_time, | |
| record_exception=record_exception, | |
| set_status_on_exception=set_status_on_exception, | |
| end_on_exit=end_on_exit | |
| ) | |
| inner_span = self._span_cm.__enter__() | |
| return OTLPSpan(inner_span) | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| return self._span_cm.__exit__(exc_type, exc_val, exc_tb) | |
| return _OTLPSpanContextManager(self._tracer) | |
| def _convert_to_span_kind(self, span_type: SpanType) -> str: | |
| if span_type == SpanType.INTERNAL: | |
| return SpanKind.INTERNAL | |
| elif span_type == SpanType.CLIENT: | |
| return SpanKind.CLIENT | |
| elif span_type == SpanType.SERVER: | |
| return SpanKind.SERVER | |
| elif span_type == SpanType.PRODUCER: | |
| return SpanKind.PRODUCER | |
| elif span_type == SpanType.CONSUMER: | |
| return SpanKind.CONSUMER | |
| else: | |
| return SpanKind.INTERNAL | |
| def _get_otel_context_from_trace_context(self, trace_context: TraceContext) -> OTLPContext: | |
| trace_flags = None | |
| if trace_context.trace_flags: | |
| trace_flags = TraceFlags(int(trace_context.trace_flags, 16)) | |
| otel_context = otlp_context_api.Context() | |
| return set_span_in_context( | |
| NonRecordingSpan( | |
| SpanContext( | |
| trace_id=int(trace_context.trace_id, 16), | |
| span_id=int(trace_context.span_id, 16), | |
| is_remote=True, | |
| trace_flags=trace_flags | |
| ) | |
| ), | |
| otel_context, | |
| ) | |
| class OTLPSpan(Span, ReadableSpan): | |
| """A Span represents a single operation within a trace. | |
| """ | |
| def __init__(self, span: SDKSpan, is_new_span=True): | |
| self._span = span | |
| self._token: Optional[Token[OTLPContext]] = None | |
| if is_new_span: | |
| self._attach() | |
| self._add_to_open_spans() | |
| if not TYPE_CHECKING: # pragma: no branch | |
| def __getattr__(self, name: str) -> Any: | |
| return getattr(self._span, name) | |
| def end(self, end_time: Optional[int] = None) -> None: | |
| self._remove_from_open_spans() | |
| end_time = end_time or time.time_ns() | |
| if not self._span._status or self._span._status.status_code == StatusCode.UNSET: | |
| self._span.set_status( | |
| status=StatusCode.OK, | |
| description="", | |
| ) | |
| self._span.end(end_time=end_time) | |
| self._detach() | |
| def set_attribute(self, key: str, value: Any) -> None: | |
| if not is_valid_attribute_value(key, value): | |
| return | |
| self._span.set_attribute(key=key, value=value) | |
| def set_attributes(self, attributes: dict[str, Any]) -> None: | |
| attributes = {k: v for k, v in attributes.items( | |
| ) if is_valid_attribute_value(k, v)} | |
| self._span.set_attributes(attributes=attributes) | |
| def is_recording(self) -> bool: | |
| return self._span.is_recording() | |
| def record_exception( | |
| self, | |
| exception: BaseException, | |
| attributes: dict[str, Any] = None, | |
| timestamp: Optional[int] = None, | |
| escaped: bool = False, | |
| ) -> None: | |
| timestamp = timestamp or time.time_ns() | |
| attributes = {**(attributes or {})} | |
| stacktrace = ''.join(traceback.format_exception( | |
| type(exception), exception, exception.__traceback__)) | |
| self._span.set_attributes({ | |
| SpanAttributes.EXCEPTION_STACKTRACE: stacktrace, | |
| SpanAttributes.EXCEPTION_TYPE: type(exception).__name__, | |
| SpanAttributes.EXCEPTION_MESSAGE: str(exception), | |
| SpanAttributes.EXCEPTION_ESCAPED: escaped | |
| }) | |
| if exception is not sys.exc_info()[1]: | |
| attributes[SpanAttributes.EXCEPTION_STACKTRACE] = stacktrace | |
| self._span.record_exception(exception=exception, | |
| attributes=attributes, | |
| timestamp=timestamp, | |
| escaped=escaped) | |
| self._span.set_status( | |
| status=StatusCode.ERROR, | |
| description=str(exception), | |
| ) | |
| def get_trace_id(self) -> str: | |
| """Get the trace ID of the span. | |
| Returns: | |
| The trace ID of the span. | |
| """ | |
| if not self._span or not self._span.get_span_context() or not self.is_recording(): | |
| return None | |
| return f"{self._span.get_span_context().trace_id:032x}" | |
| def get_span_id(self) -> str: | |
| """Get the span ID of the span. | |
| Returns: | |
| The span ID of the span. | |
| """ | |
| if not self._span or not self._span.get_span_context() or not self.is_recording(): | |
| return None | |
| return f"{self._span.get_span_context().span_id:016x}" | |
| def _attach(self): | |
| if self._token is not None: | |
| return | |
| self._token = otlp_context_api.attach(set_span_in_context(self._span)) | |
| def _detach(self): | |
| if self._token is None: | |
| return | |
| try: | |
| otlp_context_api.detach(self._token) | |
| except ValueError as e: | |
| logger.warning(f"Failed to detach context: {e}") | |
| finally: | |
| self._token = None | |
| def configure_otlp_provider( | |
| backends: Sequence[str] = None, | |
| base_url: str = None, | |
| write_token: str = None, | |
| span_consumers: Optional[Sequence[SpanConsumer]] = None, | |
| **kwargs | |
| ) -> None: | |
| """Configure the OTLP provider. | |
| Args: | |
| backend: The backend to use. | |
| write_token: The write token to use. | |
| **kwargs: Additional keyword arguments to pass to the provider. | |
| """ | |
| from aworld.metrics.opentelemetry.opentelemetry_adapter import build_otel_resource | |
| backends = backends or ["logfire"] | |
| processor = SynchronousMultiSpanProcessor() | |
| processor.add_span_processor(BatchSpanProcessor( | |
| SpanConsumerExporter(span_consumers))) | |
| for backend in backends: | |
| if backend == "logfire": | |
| span_exporter = _configure_logfire_exporter( | |
| write_token=write_token, base_url=base_url, **kwargs) | |
| processor.add_span_processor(BatchSpanProcessor(span_exporter)) | |
| elif backend == "console": | |
| from opentelemetry.sdk.trace.export import ConsoleSpanExporter | |
| processor.add_span_processor( | |
| BatchSpanProcessor(ConsoleSpanExporter())) | |
| elif backend == "file": | |
| timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") | |
| file_path = kwargs.get("file_path", f"traces_{timestamp}.json") | |
| processor.add_span_processor( | |
| BatchSpanProcessor(FileSpanExporter(file_path))) | |
| elif backend == "memory": | |
| logger.info("Using in-memory storage for traces.") | |
| storage = kwargs.get( | |
| "storage", InMemoryStorage()) | |
| processor.add_span_processor( | |
| SimpleSpanProcessor(InMemorySpanExporter(storage=storage))) | |
| server_enabled = kwargs.get("server_enabled") or os.getenv( | |
| "START_TRACE_SERVER") or "true" | |
| server_port = kwargs.get("server_port") or 7079 | |
| if (server_enabled.lower() == "true"): | |
| logger.info(f"Starting trace server on port {server_port}.") | |
| set_trace_server(storage=storage, port=int( | |
| server_port), start_server=True) | |
| else: | |
| logger.info("Trace server is not started.") | |
| set_trace_server(storage=storage, port=int( | |
| server_port), start_server=False) | |
| else: | |
| span_exporter = _configure_otlp_exporter( | |
| base_url=base_url, **kwargs) | |
| processor.add_span_processor(BatchSpanProcessor(span_exporter)) | |
| set_tracer_provider(OTLPTraceProvider(SDKTracerProvider(active_span_processor=processor, | |
| resource=build_otel_resource()))) | |
| def _configure_logfire_exporter(write_token: str, base_url: str = None) -> None: | |
| """Configure the Logfire exporter. | |
| Args: | |
| write_token: The write token to use. | |
| base_url: The base URL to use. | |
| **kwargs: Additional keyword arguments to pass to the exporter. | |
| """ | |
| from opentelemetry.exporter.otlp.proto.http import Compression | |
| from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | |
| base_url = base_url or "https://logfire-us.pydantic.dev" | |
| headers = {'User-Agent': f'logfire/3.14.0', 'Authorization': write_token} | |
| session = requests.Session() | |
| session.headers.update(headers) | |
| return OTLPSpanExporter( | |
| endpoint=urljoin(base_url, '/v1/traces'), | |
| session=session, | |
| compression=Compression.Gzip, | |
| ) | |
| def _configure_otlp_exporter(base_url: str = None, **kwargs) -> None: | |
| """Configure the OTLP exporter. | |
| Args: | |
| write_token: The write token to use. | |
| base_url: The base URL to use. | |
| **kwargs: Additional keyword arguments to pass to the exporter. | |
| """ | |
| import requests | |
| from opentelemetry.exporter.otlp.proto.http import Compression | |
| from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | |
| otlp_traces_endpoint = os.getenv("OTLP_TRACES_ENDPOINT") | |
| base_url = base_url or otlp_traces_endpoint | |
| session = requests.Session() | |
| return OTLPSpanExporter( | |
| endpoint=base_url, | |
| session=session, | |
| compression=Compression.Gzip, | |
| ) | |
| def is_valid_attribute_value(k, v): | |
| valid = True | |
| if not v: | |
| valid = False | |
| valid = isinstance(v, (str, bool, int, float)) or \ | |
| (isinstance(v, Sequence) and | |
| all(isinstance(i, (str, bool, int, float)) for i in v)) | |
| if not valid: | |
| logger.warning(f"value of attribute[{k}] is invalid: {v}") | |
| return valid | |