From bd5b497e4580aeda3849d30dbea911fd0cacda9a Mon Sep 17 00:00:00 2001 From: CI-DEV <154627941+IlumCI@users.noreply.github.com> Date: Wed, 12 Nov 2025 19:03:54 +0200 Subject: [PATCH] Integrate OpenTelemetry for telemetry support Added OpenTelemetry integration for tracing and metrics. --- swarms/structs/swarm_router.py | 116 ++++++++++++++++++++++++++++----- 1 file changed, 100 insertions(+), 16 deletions(-) diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index 84256d8f..32f2ed04 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -25,6 +25,19 @@ from swarms.structs.mixture_of_agents import MixtureOfAgents from swarms.structs.multi_agent_router import MultiAgentRouter from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.telemetry.log_executions import log_execution + +try: + from swarms.telemetry.opentelemetry_integration import ( + trace_span, + record_metric, + get_current_trace_context, + ) + _OTEL_AVAILABLE = True +except ImportError: + _OTEL_AVAILABLE = False + trace_span = None + record_metric = lambda *args, **kwargs: None + get_current_trace_context = lambda: None from swarms.utils.generate_keys import generate_api_key from swarms.utils.loguru_logger import initialize_logger from swarms.utils.output_types import OutputType @@ -660,27 +673,74 @@ class SwarmRouter: Raises: Exception: If an error occurs during task execution. """ - self.swarm = self._create_swarm(task, *args, **kwargs) - - log_execution( - swarm_id=self.id, - status="start", - swarm_config=self.to_dict(), - swarm_architecture="swarm_router", - enabled_on=self.telemetry_enabled, + span_attributes = {} + if self.telemetry_enabled and _OTEL_AVAILABLE and trace_span: + span_attributes = { + "swarm_router.id": self.id, + "swarm_router.name": self.name, + "swarm_type": str(self.swarm_type), + "task.length": len(str(task)) if task else 0, + } + + span_context = ( + trace_span( + f"swarm_router.run.{self.name}", + attributes=span_attributes, + ) + if (self.telemetry_enabled and _OTEL_AVAILABLE and trace_span) + else None ) - args = {} - - if tasks is not None: - args["tasks"] = tasks + if span_context: + span_manager = span_context.__enter__() else: - args["task"] = task - - if img is not None: - args["img"] = img + from contextlib import nullcontext + span_manager = nullcontext() try: + self.swarm = self._create_swarm(task, *args, **kwargs) + + log_execution( + swarm_id=self.id, + status="start", + swarm_config=self.to_dict(), + swarm_architecture="swarm_router", + enabled_on=self.telemetry_enabled, + ) + + if self.telemetry_enabled and _OTEL_AVAILABLE and record_metric: + record_metric( + "swarm_router.executions.total", + 1, + { + "swarm_type": str(self.swarm_type), + "router_name": self.name, + }, + metric_type="counter", + ) + + if ( + self.telemetry_enabled + and _OTEL_AVAILABLE + and get_current_trace_context + ): + context = get_current_trace_context() + if context and hasattr(self.swarm, "set_trace_context"): + try: + self.swarm.set_trace_context(context) + except Exception: + pass + + args = {} + + if tasks is not None: + args["tasks"] = tasks + else: + args["task"] = task + + if img is not None: + args["img"] = img + if self.swarm_type == "BatchedGridWorkflow": result = self.swarm.run(**args, **kwargs) else: @@ -694,8 +754,32 @@ class SwarmRouter: enabled_on=self.telemetry_enabled, ) + if span_context: + try: + span_context.__exit__(None, None, None) + except Exception: + pass + return result except SwarmRouterRunError as e: + if self.telemetry_enabled and _OTEL_AVAILABLE: + if record_metric: + record_metric( + "swarm_router.executions.errors", + 1, + { + "swarm_type": str(self.swarm_type), + "router_name": self.name, + "error_type": type(e).__name__, + }, + metric_type="counter", + ) + if span_context: + try: + span_context.__exit__(type(e), e, None) + except Exception: + pass + logger.error( f"\n[SwarmRouter ERROR] '{self.name}' failed to execute the task on the selected swarm.\n" f"Reason: {str(e)}\n"