Integrate OpenTelemetry for telemetry support

Added OpenTelemetry integration for tracing and metrics.
pull/1200/head
CI-DEV 1 month ago committed by GitHub
parent 243df297ef
commit bd5b497e45
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -25,6 +25,19 @@ from swarms.structs.mixture_of_agents import MixtureOfAgents
from swarms.structs.multi_agent_router import MultiAgentRouter
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.telemetry.log_executions import log_execution
try:
from swarms.telemetry.opentelemetry_integration import (
trace_span,
record_metric,
get_current_trace_context,
)
_OTEL_AVAILABLE = True
except ImportError:
_OTEL_AVAILABLE = False
trace_span = None
record_metric = lambda *args, **kwargs: None
get_current_trace_context = lambda: None
from swarms.utils.generate_keys import generate_api_key
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType
@ -660,27 +673,74 @@ class SwarmRouter:
Raises:
Exception: If an error occurs during task execution.
"""
self.swarm = self._create_swarm(task, *args, **kwargs)
log_execution(
swarm_id=self.id,
status="start",
swarm_config=self.to_dict(),
swarm_architecture="swarm_router",
enabled_on=self.telemetry_enabled,
span_attributes = {}
if self.telemetry_enabled and _OTEL_AVAILABLE and trace_span:
span_attributes = {
"swarm_router.id": self.id,
"swarm_router.name": self.name,
"swarm_type": str(self.swarm_type),
"task.length": len(str(task)) if task else 0,
}
span_context = (
trace_span(
f"swarm_router.run.{self.name}",
attributes=span_attributes,
)
if (self.telemetry_enabled and _OTEL_AVAILABLE and trace_span)
else None
)
args = {}
if tasks is not None:
args["tasks"] = tasks
if span_context:
span_manager = span_context.__enter__()
else:
args["task"] = task
if img is not None:
args["img"] = img
from contextlib import nullcontext
span_manager = nullcontext()
try:
self.swarm = self._create_swarm(task, *args, **kwargs)
log_execution(
swarm_id=self.id,
status="start",
swarm_config=self.to_dict(),
swarm_architecture="swarm_router",
enabled_on=self.telemetry_enabled,
)
if self.telemetry_enabled and _OTEL_AVAILABLE and record_metric:
record_metric(
"swarm_router.executions.total",
1,
{
"swarm_type": str(self.swarm_type),
"router_name": self.name,
},
metric_type="counter",
)
if (
self.telemetry_enabled
and _OTEL_AVAILABLE
and get_current_trace_context
):
context = get_current_trace_context()
if context and hasattr(self.swarm, "set_trace_context"):
try:
self.swarm.set_trace_context(context)
except Exception:
pass
args = {}
if tasks is not None:
args["tasks"] = tasks
else:
args["task"] = task
if img is not None:
args["img"] = img
if self.swarm_type == "BatchedGridWorkflow":
result = self.swarm.run(**args, **kwargs)
else:
@ -694,8 +754,32 @@ class SwarmRouter:
enabled_on=self.telemetry_enabled,
)
if span_context:
try:
span_context.__exit__(None, None, None)
except Exception:
pass
return result
except SwarmRouterRunError as e:
if self.telemetry_enabled and _OTEL_AVAILABLE:
if record_metric:
record_metric(
"swarm_router.executions.errors",
1,
{
"swarm_type": str(self.swarm_type),
"router_name": self.name,
"error_type": type(e).__name__,
},
metric_type="counter",
)
if span_context:
try:
span_context.__exit__(type(e), e, None)
except Exception:
pass
logger.error(
f"\n[SwarmRouter ERROR] '{self.name}' failed to execute the task on the selected swarm.\n"
f"Reason: {str(e)}\n"

Loading…
Cancel
Save