diff --git a/pyproject.toml b/pyproject.toml index d947301a..a9b59f1b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,9 @@ httpx = "*" mcp = "*" aiohttp = "*" schedule = "*" +opentelemetry-api = ">=1.20.0" +opentelemetry-sdk = ">=1.20.0" +opentelemetry-exporter-otlp = ">=1.20.0" uvloop = {version = "*", markers = "sys_platform == 'linux' or sys_platform == 'darwin'"} winloop = {version = "*", markers = "sys_platform == 'win32'"} diff --git a/requirements.txt b/requirements.txt index 279d5538..1e7d59bb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,6 @@ numpy schedule uvloop; sys_platform == 'linux' or sys_platform == 'darwin' # linux or macos only winloop; sys_platform == 'win32' # windows only +opentelemetry-api>=1.20.0 +opentelemetry-sdk>=1.20.0 +opentelemetry-exporter-otlp>=1.20.0 diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 32687894..0c302a2d 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -75,6 +75,19 @@ from swarms.structs.transforms import ( handle_transforms, ) from swarms.telemetry.main import log_agent_data + +try: + from swarms.telemetry.opentelemetry_integration import ( + trace_span, + record_metric, + log_event, + ) + _OTEL_AVAILABLE = True +except ImportError: + _OTEL_AVAILABLE = False + trace_span = None + record_metric = lambda *args, **kwargs: None + log_event = lambda *args, **kwargs: None from swarms.tools.base_tool import BaseTool from swarms.tools.mcp_client_tools import ( execute_multiple_tools_on_multiple_mcp_servers_sync, @@ -472,6 +485,7 @@ class Agent: mode: Literal["interactive", "fast", "standard"] = "standard", publish_to_marketplace: bool = False, use_cases: Optional[List[Dict[str, Any]]] = None, + enable_telemetry: Optional[bool] = None, *args, **kwargs, ): @@ -626,6 +640,15 @@ class Agent: self.mode = mode self.publish_to_marketplace = publish_to_marketplace + if enable_telemetry is None: + import os + self.enable_telemetry = ( + _OTEL_AVAILABLE + and os.getenv("OTEL_ENABLED", "true").lower() == "true" + ) + else: + self.enable_telemetry = enable_telemetry and _OTEL_AVAILABLE + # Initialize transforms if transforms is None: self.transforms = None @@ -1218,7 +1241,42 @@ class Agent: agent(task="Summarize this document.", img="path/to/image.jpg") agent(task="Analyze this image.", img="path/to/image.jpg", is_last=True) """ + span_attributes = {} + if self.enable_telemetry and trace_span: + span_attributes = { + "agent.id": self.id, + "agent.name": self.agent_name, + "agent.model": self.get_current_model(), + "agent.max_loops": str(self.max_loops), + "task.length": len(str(task)) if task else 0, + } + + span_context = ( + trace_span( + f"agent.run.{self.agent_name}", + attributes=span_attributes, + ) + if (self.enable_telemetry and trace_span) + else None + ) + + if span_context: + span_manager = span_context.__enter__() + else: + from contextlib import nullcontext + span_manager = nullcontext() + try: + if self.enable_telemetry and record_metric: + record_metric( + "agent.executions.total", + 1, + { + "agent_name": self.agent_name, + "model": self.get_current_model(), + }, + metric_type="counter", + ) self.check_if_no_prompt_then_autogenerate(task) @@ -1253,6 +1311,17 @@ class Agent: ): loop_count += 1 + if self.enable_telemetry and record_metric: + record_metric( + "agent.loops", + 1, + { + "agent_name": self.agent_name, + "loop_number": str(loop_count), + }, + metric_type="counter", + ) + # Handle RAG query every loop if ( self.long_term_memory is not None @@ -1462,15 +1531,63 @@ class Agent: self.save() - # Output formatting based on output_type - return history_output_formatter( + if self.enable_telemetry and record_metric: + record_metric( + "agent.executions.success", + 1, + {"agent_name": self.agent_name}, + metric_type="counter", + ) + + result = history_output_formatter( self.short_memory, type=self.output_type ) + if span_context: + try: + span_context.__exit__(None, None, None) + except Exception: + pass + + return result + except Exception as error: + if self.enable_telemetry: + if record_metric: + record_metric( + "agent.executions.errors", + 1, + { + "agent_name": self.agent_name, + "error_type": type(error).__name__, + }, + metric_type="counter", + ) + if log_event: + log_event( + f"Agent {self.agent_name} execution failed", + level="ERROR", + attributes={ + "agent_id": self.id, + "error_type": type(error).__name__, + "error_message": str(error)[:200], + }, + ) + + if span_context: + try: + span_context.__exit__(type(error), error, None) + except Exception: + pass + self._handle_run_error(error) except KeyboardInterrupt as error: + if span_context: + try: + span_context.__exit__(type(error), error, None) + except Exception: + pass self._handle_run_error(error) def _handle_run_error(self, error: any): @@ -2570,6 +2687,33 @@ class Agent: if "is_last" in kwargs: del kwargs["is_last"] + span_attrs = {} + if self.enable_telemetry and trace_span: + span_attrs = { + "agent.name": self.agent_name, + "agent.model": self.get_current_model(), + "agent.loop": str(current_loop), + "task.length": len(task), + "has_image": str(img is not None), + } + + llm_span_context = ( + trace_span( + "agent.llm.call", + attributes=span_attrs, + ) + if (self.enable_telemetry and trace_span) + else None + ) + + if llm_span_context: + llm_span_manager = llm_span_context.__enter__() + else: + from contextlib import nullcontext + llm_span_manager = nullcontext() + + start_time = time.time() + try: if self.stream and hasattr(self.llm, "stream"): original_stream = self.llm.stream @@ -2780,11 +2924,57 @@ class Agent: # Restore original stream setting self.llm.stream = original_stream - # Return the complete response for further processing + if self.enable_telemetry and record_metric: + execution_time = time.time() - start_time + record_metric( + "agent.llm.call.duration", + execution_time, + { + "agent_name": self.agent_name, + "model": self.get_current_model(), + }, + ) + record_metric( + "agent.llm.calls.total", + 1, + {"model": self.get_current_model()}, + metric_type="counter", + ) + + if llm_span_context: + try: + llm_span_context.__exit__(None, None, None) + except Exception: + pass + return complete_response else: # Restore original stream setting self.llm.stream = original_stream + + if self.enable_telemetry and record_metric: + execution_time = time.time() - start_time + record_metric( + "agent.llm.call.duration", + execution_time, + { + "agent_name": self.agent_name, + "model": self.get_current_model(), + }, + ) + record_metric( + "agent.llm.calls.total", + 1, + {"model": self.get_current_model()}, + metric_type="counter", + ) + + if llm_span_context: + try: + llm_span_context.__exit__(None, None, None) + except Exception: + pass + return streaming_response else: args = { @@ -2796,6 +2986,29 @@ class Agent: out = self.llm.run(**args, **kwargs) + if self.enable_telemetry and record_metric: + execution_time = time.time() - start_time + record_metric( + "agent.llm.call.duration", + execution_time, + { + "agent_name": self.agent_name, + "model": self.get_current_model(), + }, + ) + record_metric( + "agent.llm.calls.total", + 1, + {"model": self.get_current_model()}, + metric_type="counter", + ) + + if llm_span_context: + try: + llm_span_context.__exit__(None, None, None) + except Exception: + pass + return out except ( @@ -2805,6 +3018,24 @@ class Agent: AuthenticationError, Exception, ) as e: + if self.enable_telemetry: + if record_metric: + record_metric( + "agent.llm.call.errors", + 1, + { + "agent_name": self.agent_name, + "model": self.get_current_model(), + "error_type": type(e).__name__, + }, + metric_type="counter", + ) + if llm_span_context: + try: + llm_span_context.__exit__(type(e), e, None) + except Exception: + pass + logger.error( f"Error calling LLM with model '{self.get_current_model()}': {e}. " f"Task: {task}, Args: {args}, Kwargs: {kwargs} Traceback: {traceback.format_exc()}" diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index 57a67a12..5e739e8c 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -34,6 +34,19 @@ from swarms.structs.mixture_of_agents import MixtureOfAgents from swarms.structs.multi_agent_router import MultiAgentRouter from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.telemetry.log_executions import log_execution + +try: + from swarms.telemetry.opentelemetry_integration import ( + trace_span, + record_metric, + get_current_trace_context, + ) + _OTEL_AVAILABLE = True +except ImportError: + _OTEL_AVAILABLE = False + trace_span = None + record_metric = lambda *args, **kwargs: None + get_current_trace_context = lambda: None from swarms.utils.generate_keys import generate_api_key from swarms.utils.loguru_logger import initialize_logger from swarms.utils.output_types import OutputType @@ -702,27 +715,74 @@ class SwarmRouter: Raises: Exception: If an error occurs during task execution. """ - self.swarm = self._create_swarm(task, *args, **kwargs) - - log_execution( - swarm_id=self.id, - status="start", - swarm_config=self.to_dict(), - swarm_architecture="swarm_router", - enabled_on=self.telemetry_enabled, + span_attributes = {} + if self.telemetry_enabled and _OTEL_AVAILABLE and trace_span: + span_attributes = { + "swarm_router.id": self.id, + "swarm_router.name": self.name, + "swarm_type": str(self.swarm_type), + "task.length": len(str(task)) if task else 0, + } + + span_context = ( + trace_span( + f"swarm_router.run.{self.name}", + attributes=span_attributes, + ) + if (self.telemetry_enabled and _OTEL_AVAILABLE and trace_span) + else None ) - args = {} - - if tasks is not None: - args["tasks"] = tasks + if span_context: + span_manager = span_context.__enter__() else: - args["task"] = task - - if img is not None: - args["img"] = img + from contextlib import nullcontext + span_manager = nullcontext() try: + self.swarm = self._create_swarm(task, *args, **kwargs) + + log_execution( + swarm_id=self.id, + status="start", + swarm_config=self.to_dict(), + swarm_architecture="swarm_router", + enabled_on=self.telemetry_enabled, + ) + + if self.telemetry_enabled and _OTEL_AVAILABLE and record_metric: + record_metric( + "swarm_router.executions.total", + 1, + { + "swarm_type": str(self.swarm_type), + "router_name": self.name, + }, + metric_type="counter", + ) + + if ( + self.telemetry_enabled + and _OTEL_AVAILABLE + and get_current_trace_context + ): + context = get_current_trace_context() + if context and hasattr(self.swarm, "set_trace_context"): + try: + self.swarm.set_trace_context(context) + except Exception: + pass + + args = {} + + if tasks is not None: + args["tasks"] = tasks + else: + args["task"] = task + + if img is not None: + args["img"] = img + if self.swarm_type == "BatchedGridWorkflow": result = self.swarm.run(**args, **kwargs) else: @@ -736,8 +796,32 @@ class SwarmRouter: enabled_on=self.telemetry_enabled, ) + if span_context: + try: + span_context.__exit__(None, None, None) + except Exception: + pass + return result except SwarmRouterRunError as e: + if self.telemetry_enabled and _OTEL_AVAILABLE: + if record_metric: + record_metric( + "swarm_router.executions.errors", + 1, + { + "swarm_type": str(self.swarm_type), + "router_name": self.name, + "error_type": type(e).__name__, + }, + metric_type="counter", + ) + if span_context: + try: + span_context.__exit__(type(e), e, None) + except Exception: + pass + logger.error( f"\n[SwarmRouter ERROR] '{self.name}' failed to execute the task on the selected swarm.\n" f"Reason: {str(e)}\n" diff --git a/swarms/telemetry/__init__.py b/swarms/telemetry/__init__.py index a7f92a78..5a13025d 100644 --- a/swarms/telemetry/__init__.py +++ b/swarms/telemetry/__init__.py @@ -5,9 +5,35 @@ from swarms.telemetry.main import ( log_agent_data, ) -__all__ = [ - "generate_user_id", - "get_machine_id", - "get_comprehensive_system_info", - "log_agent_data", -] +try: + from swarms.telemetry.opentelemetry_integration import ( + get_tracer, + get_meter, + trace_span, + trace_function, + record_metric, + get_current_trace_context, + set_trace_context, + log_event, + ) + __all__ = [ + "generate_user_id", + "get_machine_id", + "get_comprehensive_system_info", + "log_agent_data", + "get_tracer", + "get_meter", + "trace_span", + "trace_function", + "record_metric", + "get_current_trace_context", + "set_trace_context", + "log_event", + ] +except ImportError: + __all__ = [ + "generate_user_id", + "get_machine_id", + "get_comprehensive_system_info", + "log_agent_data", + ] diff --git a/swarms/telemetry/opentelemetry_intergation.py b/swarms/telemetry/opentelemetry_intergation.py new file mode 100644 index 00000000..8c67c3fb --- /dev/null +++ b/swarms/telemetry/opentelemetry_intergation.py @@ -0,0 +1,376 @@ +""" +OpenTelemetry integration for Swarms framework. + +Provides distributed tracing, metrics, and logging capabilities across +agents and multi-agent structures using OpenTelemetry standards. + +Configuration via environment variables: + OTEL_SERVICE_NAME: Service name (default: "swarms") + OTEL_EXPORTER_OTLP_ENDPOINT: OTLP endpoint URL + OTEL_EXPORTER_OTLP_HEADERS: Headers for OTLP exporter (JSON format) + OTEL_TRACES_EXPORTER: Traces exporter (default: "otlp") + OTEL_METRICS_EXPORTER: Metrics exporter (default: "otlp") + OTEL_LOGS_EXPORTER: Logs exporter (default: "otlp") + OTEL_ENABLED: Enable/disable OpenTelemetry (default: "true") + OTEL_SDK_DISABLED: Disable OpenTelemetry SDK (default: "false") +""" + +import os +import time +from contextlib import contextmanager +from typing import Any, Callable, Dict, Optional +from functools import wraps + +from loguru import logger + +_otel_available = False +_tracer = None +_meter = None +_logger = None + +try: + from opentelemetry import trace, metrics, _logs + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + from opentelemetry.sdk._logs import LoggerProvider + from opentelemetry.sdk._logs.export import BatchLogRecordProcessor + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, + ) + from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, + ) + from opentelemetry.sdk.resources import Resource + from opentelemetry.trace.propagation.tracecontext import ( + TraceContextTextMapPropagator, + ) + + _otel_available = True +except ImportError: + pass + + +def _is_otel_enabled() -> bool: + """Check if OpenTelemetry is enabled via environment variables.""" + if os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true": + return False + return os.getenv("OTEL_ENABLED", "true").lower() == "true" + + +def _parse_headers(headers_str: str) -> Dict[str, str]: + """Parse headers from JSON string or key=value format.""" + import json + + try: + return json.loads(headers_str) + except (json.JSONDecodeError, TypeError): + headers = {} + for pair in headers_str.split(","): + if "=" in pair: + key, value = pair.split("=", 1) + headers[key.strip()] = value.strip() + return headers + + +def _initialize_otel(): + """Initialize OpenTelemetry SDK with configuration from environment variables.""" + global _tracer, _meter, _logger + + if not _otel_available or not _is_otel_enabled(): + return False + + try: + service_name = os.getenv("OTEL_SERVICE_NAME", "swarms") + resource = Resource.create({"service.name": service_name}) + + trace_provider = TracerProvider(resource=resource) + otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + headers = {} + + if otlp_endpoint: + headers = _parse_headers( + os.getenv("OTEL_EXPORTER_OTLP_HEADERS", "{}") + ) + span_exporter = OTLPSpanExporter( + endpoint=otlp_endpoint, + headers=headers, + ) + trace_provider.add_span_processor(BatchSpanProcessor(span_exporter)) + + trace.set_tracer_provider(trace_provider) + _tracer = trace.get_tracer(__name__) + + if otlp_endpoint: + meter_provider = MeterProvider( + resource=resource, + metric_readers=[ + PeriodicExportingMetricReader( + OTLPMetricExporter( + endpoint=otlp_endpoint, + headers=headers, + ) + ) + ], + ) + metrics.set_meter_provider(meter_provider) + _meter = metrics.get_meter(__name__) + + logger_provider = LoggerProvider(resource=resource) + log_exporter = OTLPLogExporter( + endpoint=otlp_endpoint, + headers=headers, + ) + logger_provider.add_log_record_processor( + BatchLogRecordProcessor(log_exporter) + ) + _logs.set_logger_provider(logger_provider) + _logger = _logs.get_logger(__name__) + + return True + + except Exception as e: + logger.debug(f"Failed to initialize OpenTelemetry: {e}") + return False + + +if _otel_available and _is_otel_enabled(): + _initialize_otel() + + +def get_tracer(name: Optional[str] = None): + """Get OpenTelemetry tracer instance.""" + if not _otel_available or not _is_otel_enabled(): + return None + return trace.get_tracer(name or __name__) + + +def get_meter(name: Optional[str] = None): + """Get OpenTelemetry meter instance.""" + if not _otel_available or not _is_otel_enabled(): + return None + return metrics.get_meter(name or __name__) + + +@contextmanager +def trace_span( + name: str, + attributes: Optional[Dict[str, Any]] = None, + kind: Optional[Any] = None, +): + """ + Context manager for creating a trace span. + + Args: + name: Span name + attributes: Dictionary of span attributes + kind: Span kind (INTERNAL, SERVER, CLIENT, etc.) + """ + if not _otel_available or not _is_otel_enabled(): + yield None + return + + tracer = get_tracer() + if not tracer: + yield None + return + + span_kind = kind if kind is not None else trace.SpanKind.INTERNAL + span = tracer.start_span(name=name, kind=span_kind) + + if attributes: + for key, value in attributes.items(): + try: + span.set_attribute(key, str(value)) + except Exception: + pass + + try: + with trace.use_span(span): + yield span + except Exception as e: + try: + span.record_exception(e) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) + except Exception: + pass + raise + finally: + try: + span.end() + except Exception: + pass + + +def trace_function( + span_name: Optional[str] = None, + attributes: Optional[Dict[str, Any]] = None, + capture_args: bool = True, +): + """ + Decorator to trace function execution. + + Args: + span_name: Custom span name (defaults to function name) + attributes: Additional attributes to add to span + capture_args: Whether to capture function arguments as attributes + """ + + def decorator(func: Callable): + @wraps(func) + def wrapper(*args, **kwargs): + if not _otel_available or not _is_otel_enabled(): + return func(*args, **kwargs) + + name = span_name or f"{func.__module__}.{func.__name__}" + span_attrs = (attributes or {}).copy() + + if capture_args: + import inspect + + try: + sig = inspect.signature(func) + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + + for param_name, param_value in bound.arguments.items(): + if param_name != "self": + try: + span_attrs[ + f"function.{param_name}" + ] = str(param_value)[:200] + except Exception: + pass + except Exception: + pass + + with trace_span(name, span_attrs): + start_time = time.time() + try: + result = func(*args, **kwargs) + execution_time = time.time() - start_time + + record_metric( + "function.execution.time", + execution_time, + {"function": func.__name__}, + ) + return result + except Exception as e: + record_metric( + "function.execution.errors", + 1, + { + "function": func.__name__, + "error_type": type(e).__name__, + }, + ) + raise + + return wrapper + + return decorator + + +def record_metric( + name: str, + value: float, + attributes: Optional[Dict[str, str]] = None, + metric_type: str = "histogram", +): + """ + Record a metric value. + + Args: + name: Metric name + value: Metric value + attributes: Metric attributes/labels + metric_type: Type of metric ("counter", "gauge", "histogram") + """ + if not _otel_available or not _is_otel_enabled() or not _meter: + return + + try: + attrs = attributes or {} + + if metric_type == "counter": + counter = _meter.create_counter(name) + counter.add(value, attributes=attrs) + elif metric_type == "gauge": + gauge = _meter.create_up_down_counter(name) + gauge.add(value, attributes=attrs) + elif metric_type == "histogram": + histogram = _meter.create_histogram(name) + histogram.record(value, attributes=attrs) + except Exception: + pass + + +def get_current_trace_context() -> Optional[Dict[str, str]]: + """Get current trace context for propagation.""" + if not _otel_available or not _is_otel_enabled(): + return None + + try: + propagator = TraceContextTextMapPropagator() + context_dict = {} + propagator.inject(context_dict) + return context_dict + except Exception: + return None + + +def set_trace_context(context: Dict[str, str]): + """Set trace context from external source (for distributed tracing).""" + if not _otel_available or not _is_otel_enabled(): + return + + try: + propagator = TraceContextTextMapPropagator() + propagator.extract(context) + except Exception: + pass + + +def log_event( + message: str, + level: str = "INFO", + attributes: Optional[Dict[str, Any]] = None, +): + """ + Log an event with OpenTelemetry logging. + + Args: + message: Log message + level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + attributes: Additional attributes + """ + if not _otel_available or not _is_otel_enabled() or not _logger: + logger.log(level, message) + return + + try: + from opentelemetry._logs import SeverityNumber + + severity_map = { + "DEBUG": SeverityNumber.DEBUG, + "INFO": SeverityNumber.INFO, + "WARNING": SeverityNumber.WARNING, + "ERROR": SeverityNumber.ERROR, + "CRITICAL": SeverityNumber.CRITICAL, + } + + _logger.emit( + _logs.LogRecord( + body=message, + severity_number=severity_map.get(level, SeverityNumber.INFO), + attributes=attributes or {}, + ) + ) + except Exception: + logger.log(level, message) +