From 44cf4d345bf3a99b2f90b03e8b545c6f108c3937 Mon Sep 17 00:00:00 2001 From: CI-DEV <154627941+IlumCI@users.noreply.github.com> Date: Wed, 12 Nov 2025 19:05:16 +0200 Subject: [PATCH] Implement OpenTelemetry integration for Swarms framework This file implements OpenTelemetry integration for the Swarms framework, enabling distributed tracing, metrics, and logging capabilities. It includes configuration options via environment variables and provides functions for tracing, logging, and metrics recording. --- swarms/telemetry/opentelemetry_intergation.py | 376 ++++++++++++++++++ 1 file changed, 376 insertions(+) create mode 100644 swarms/telemetry/opentelemetry_intergation.py diff --git a/swarms/telemetry/opentelemetry_intergation.py b/swarms/telemetry/opentelemetry_intergation.py new file mode 100644 index 00000000..8c67c3fb --- /dev/null +++ b/swarms/telemetry/opentelemetry_intergation.py @@ -0,0 +1,376 @@ +""" +OpenTelemetry integration for Swarms framework. + +Provides distributed tracing, metrics, and logging capabilities across +agents and multi-agent structures using OpenTelemetry standards. + +Configuration via environment variables: + OTEL_SERVICE_NAME: Service name (default: "swarms") + OTEL_EXPORTER_OTLP_ENDPOINT: OTLP endpoint URL + OTEL_EXPORTER_OTLP_HEADERS: Headers for OTLP exporter (JSON format) + OTEL_TRACES_EXPORTER: Traces exporter (default: "otlp") + OTEL_METRICS_EXPORTER: Metrics exporter (default: "otlp") + OTEL_LOGS_EXPORTER: Logs exporter (default: "otlp") + OTEL_ENABLED: Enable/disable OpenTelemetry (default: "true") + OTEL_SDK_DISABLED: Disable OpenTelemetry SDK (default: "false") +""" + +import os +import time +from contextlib import contextmanager +from typing import Any, Callable, Dict, Optional +from functools import wraps + +from loguru import logger + +_otel_available = False +_tracer = None +_meter = None +_logger = None + +try: + from opentelemetry import trace, metrics, _logs + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + from opentelemetry.sdk._logs import LoggerProvider + from opentelemetry.sdk._logs.export import BatchLogRecordProcessor + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, + ) + from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, + ) + from opentelemetry.sdk.resources import Resource + from opentelemetry.trace.propagation.tracecontext import ( + TraceContextTextMapPropagator, + ) + + _otel_available = True +except ImportError: + pass + + +def _is_otel_enabled() -> bool: + """Check if OpenTelemetry is enabled via environment variables.""" + if os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true": + return False + return os.getenv("OTEL_ENABLED", "true").lower() == "true" + + +def _parse_headers(headers_str: str) -> Dict[str, str]: + """Parse headers from JSON string or key=value format.""" + import json + + try: + return json.loads(headers_str) + except (json.JSONDecodeError, TypeError): + headers = {} + for pair in headers_str.split(","): + if "=" in pair: + key, value = pair.split("=", 1) + headers[key.strip()] = value.strip() + return headers + + +def _initialize_otel(): + """Initialize OpenTelemetry SDK with configuration from environment variables.""" + global _tracer, _meter, _logger + + if not _otel_available or not _is_otel_enabled(): + return False + + try: + service_name = os.getenv("OTEL_SERVICE_NAME", "swarms") + resource = Resource.create({"service.name": service_name}) + + trace_provider = TracerProvider(resource=resource) + otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + headers = {} + + if otlp_endpoint: + headers = _parse_headers( + os.getenv("OTEL_EXPORTER_OTLP_HEADERS", "{}") + ) + span_exporter = OTLPSpanExporter( + endpoint=otlp_endpoint, + headers=headers, + ) + trace_provider.add_span_processor(BatchSpanProcessor(span_exporter)) + + trace.set_tracer_provider(trace_provider) + _tracer = trace.get_tracer(__name__) + + if otlp_endpoint: + meter_provider = MeterProvider( + resource=resource, + metric_readers=[ + PeriodicExportingMetricReader( + OTLPMetricExporter( + endpoint=otlp_endpoint, + headers=headers, + ) + ) + ], + ) + metrics.set_meter_provider(meter_provider) + _meter = metrics.get_meter(__name__) + + logger_provider = LoggerProvider(resource=resource) + log_exporter = OTLPLogExporter( + endpoint=otlp_endpoint, + headers=headers, + ) + logger_provider.add_log_record_processor( + BatchLogRecordProcessor(log_exporter) + ) + _logs.set_logger_provider(logger_provider) + _logger = _logs.get_logger(__name__) + + return True + + except Exception as e: + logger.debug(f"Failed to initialize OpenTelemetry: {e}") + return False + + +if _otel_available and _is_otel_enabled(): + _initialize_otel() + + +def get_tracer(name: Optional[str] = None): + """Get OpenTelemetry tracer instance.""" + if not _otel_available or not _is_otel_enabled(): + return None + return trace.get_tracer(name or __name__) + + +def get_meter(name: Optional[str] = None): + """Get OpenTelemetry meter instance.""" + if not _otel_available or not _is_otel_enabled(): + return None + return metrics.get_meter(name or __name__) + + +@contextmanager +def trace_span( + name: str, + attributes: Optional[Dict[str, Any]] = None, + kind: Optional[Any] = None, +): + """ + Context manager for creating a trace span. + + Args: + name: Span name + attributes: Dictionary of span attributes + kind: Span kind (INTERNAL, SERVER, CLIENT, etc.) + """ + if not _otel_available or not _is_otel_enabled(): + yield None + return + + tracer = get_tracer() + if not tracer: + yield None + return + + span_kind = kind if kind is not None else trace.SpanKind.INTERNAL + span = tracer.start_span(name=name, kind=span_kind) + + if attributes: + for key, value in attributes.items(): + try: + span.set_attribute(key, str(value)) + except Exception: + pass + + try: + with trace.use_span(span): + yield span + except Exception as e: + try: + span.record_exception(e) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) + except Exception: + pass + raise + finally: + try: + span.end() + except Exception: + pass + + +def trace_function( + span_name: Optional[str] = None, + attributes: Optional[Dict[str, Any]] = None, + capture_args: bool = True, +): + """ + Decorator to trace function execution. + + Args: + span_name: Custom span name (defaults to function name) + attributes: Additional attributes to add to span + capture_args: Whether to capture function arguments as attributes + """ + + def decorator(func: Callable): + @wraps(func) + def wrapper(*args, **kwargs): + if not _otel_available or not _is_otel_enabled(): + return func(*args, **kwargs) + + name = span_name or f"{func.__module__}.{func.__name__}" + span_attrs = (attributes or {}).copy() + + if capture_args: + import inspect + + try: + sig = inspect.signature(func) + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + + for param_name, param_value in bound.arguments.items(): + if param_name != "self": + try: + span_attrs[ + f"function.{param_name}" + ] = str(param_value)[:200] + except Exception: + pass + except Exception: + pass + + with trace_span(name, span_attrs): + start_time = time.time() + try: + result = func(*args, **kwargs) + execution_time = time.time() - start_time + + record_metric( + "function.execution.time", + execution_time, + {"function": func.__name__}, + ) + return result + except Exception as e: + record_metric( + "function.execution.errors", + 1, + { + "function": func.__name__, + "error_type": type(e).__name__, + }, + ) + raise + + return wrapper + + return decorator + + +def record_metric( + name: str, + value: float, + attributes: Optional[Dict[str, str]] = None, + metric_type: str = "histogram", +): + """ + Record a metric value. + + Args: + name: Metric name + value: Metric value + attributes: Metric attributes/labels + metric_type: Type of metric ("counter", "gauge", "histogram") + """ + if not _otel_available or not _is_otel_enabled() or not _meter: + return + + try: + attrs = attributes or {} + + if metric_type == "counter": + counter = _meter.create_counter(name) + counter.add(value, attributes=attrs) + elif metric_type == "gauge": + gauge = _meter.create_up_down_counter(name) + gauge.add(value, attributes=attrs) + elif metric_type == "histogram": + histogram = _meter.create_histogram(name) + histogram.record(value, attributes=attrs) + except Exception: + pass + + +def get_current_trace_context() -> Optional[Dict[str, str]]: + """Get current trace context for propagation.""" + if not _otel_available or not _is_otel_enabled(): + return None + + try: + propagator = TraceContextTextMapPropagator() + context_dict = {} + propagator.inject(context_dict) + return context_dict + except Exception: + return None + + +def set_trace_context(context: Dict[str, str]): + """Set trace context from external source (for distributed tracing).""" + if not _otel_available or not _is_otel_enabled(): + return + + try: + propagator = TraceContextTextMapPropagator() + propagator.extract(context) + except Exception: + pass + + +def log_event( + message: str, + level: str = "INFO", + attributes: Optional[Dict[str, Any]] = None, +): + """ + Log an event with OpenTelemetry logging. + + Args: + message: Log message + level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + attributes: Additional attributes + """ + if not _otel_available or not _is_otel_enabled() or not _logger: + logger.log(level, message) + return + + try: + from opentelemetry._logs import SeverityNumber + + severity_map = { + "DEBUG": SeverityNumber.DEBUG, + "INFO": SeverityNumber.INFO, + "WARNING": SeverityNumber.WARNING, + "ERROR": SeverityNumber.ERROR, + "CRITICAL": SeverityNumber.CRITICAL, + } + + _logger.emit( + _logs.LogRecord( + body=message, + severity_number=severity_map.get(level, SeverityNumber.INFO), + attributes=attributes or {}, + ) + ) + except Exception: + logger.log(level, message) +