pull/1200/merge
CI-DEV 4 weeks ago committed by GitHub
commit aec3e56aed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -76,6 +76,9 @@ httpx = "*"
mcp = "*"
aiohttp = "*"
schedule = "*"
opentelemetry-api = ">=1.20.0"
opentelemetry-sdk = ">=1.20.0"
opentelemetry-exporter-otlp = ">=1.20.0"
uvloop = {version = "*", markers = "sys_platform == 'linux' or sys_platform == 'darwin'"}
winloop = {version = "*", markers = "sys_platform == 'win32'"}

@ -26,3 +26,6 @@ numpy
schedule
uvloop; sys_platform == 'linux' or sys_platform == 'darwin' # linux or macos only
winloop; sys_platform == 'win32' # windows only
opentelemetry-api>=1.20.0
opentelemetry-sdk>=1.20.0
opentelemetry-exporter-otlp>=1.20.0

@ -75,6 +75,19 @@ from swarms.structs.transforms import (
handle_transforms,
)
from swarms.telemetry.main import log_agent_data
try:
from swarms.telemetry.opentelemetry_integration import (
trace_span,
record_metric,
log_event,
)
_OTEL_AVAILABLE = True
except ImportError:
_OTEL_AVAILABLE = False
trace_span = None
record_metric = lambda *args, **kwargs: None
log_event = lambda *args, **kwargs: None
from swarms.tools.base_tool import BaseTool
from swarms.tools.mcp_client_tools import (
execute_multiple_tools_on_multiple_mcp_servers_sync,
@ -472,6 +485,7 @@ class Agent:
mode: Literal["interactive", "fast", "standard"] = "standard",
publish_to_marketplace: bool = False,
use_cases: Optional[List[Dict[str, Any]]] = None,
enable_telemetry: Optional[bool] = None,
*args,
**kwargs,
):
@ -626,6 +640,15 @@ class Agent:
self.mode = mode
self.publish_to_marketplace = publish_to_marketplace
if enable_telemetry is None:
import os
self.enable_telemetry = (
_OTEL_AVAILABLE
and os.getenv("OTEL_ENABLED", "true").lower() == "true"
)
else:
self.enable_telemetry = enable_telemetry and _OTEL_AVAILABLE
# Initialize transforms
if transforms is None:
self.transforms = None
@ -1218,7 +1241,42 @@ class Agent:
agent(task="Summarize this document.", img="path/to/image.jpg")
agent(task="Analyze this image.", img="path/to/image.jpg", is_last=True)
"""
span_attributes = {}
if self.enable_telemetry and trace_span:
span_attributes = {
"agent.id": self.id,
"agent.name": self.agent_name,
"agent.model": self.get_current_model(),
"agent.max_loops": str(self.max_loops),
"task.length": len(str(task)) if task else 0,
}
span_context = (
trace_span(
f"agent.run.{self.agent_name}",
attributes=span_attributes,
)
if (self.enable_telemetry and trace_span)
else None
)
if span_context:
span_manager = span_context.__enter__()
else:
from contextlib import nullcontext
span_manager = nullcontext()
try:
if self.enable_telemetry and record_metric:
record_metric(
"agent.executions.total",
1,
{
"agent_name": self.agent_name,
"model": self.get_current_model(),
},
metric_type="counter",
)
self.check_if_no_prompt_then_autogenerate(task)
@ -1253,6 +1311,17 @@ class Agent:
):
loop_count += 1
if self.enable_telemetry and record_metric:
record_metric(
"agent.loops",
1,
{
"agent_name": self.agent_name,
"loop_number": str(loop_count),
},
metric_type="counter",
)
# Handle RAG query every loop
if (
self.long_term_memory is not None
@ -1462,15 +1531,63 @@ class Agent:
self.save()
# Output formatting based on output_type
return history_output_formatter(
if self.enable_telemetry and record_metric:
record_metric(
"agent.executions.success",
1,
{"agent_name": self.agent_name},
metric_type="counter",
)
result = history_output_formatter(
self.short_memory, type=self.output_type
)
if span_context:
try:
span_context.__exit__(None, None, None)
except Exception:
pass
return result
except Exception as error:
if self.enable_telemetry:
if record_metric:
record_metric(
"agent.executions.errors",
1,
{
"agent_name": self.agent_name,
"error_type": type(error).__name__,
},
metric_type="counter",
)
if log_event:
log_event(
f"Agent {self.agent_name} execution failed",
level="ERROR",
attributes={
"agent_id": self.id,
"error_type": type(error).__name__,
"error_message": str(error)[:200],
},
)
if span_context:
try:
span_context.__exit__(type(error), error, None)
except Exception:
pass
self._handle_run_error(error)
except KeyboardInterrupt as error:
if span_context:
try:
span_context.__exit__(type(error), error, None)
except Exception:
pass
self._handle_run_error(error)
def _handle_run_error(self, error: any):
@ -2570,6 +2687,33 @@ class Agent:
if "is_last" in kwargs:
del kwargs["is_last"]
span_attrs = {}
if self.enable_telemetry and trace_span:
span_attrs = {
"agent.name": self.agent_name,
"agent.model": self.get_current_model(),
"agent.loop": str(current_loop),
"task.length": len(task),
"has_image": str(img is not None),
}
llm_span_context = (
trace_span(
"agent.llm.call",
attributes=span_attrs,
)
if (self.enable_telemetry and trace_span)
else None
)
if llm_span_context:
llm_span_manager = llm_span_context.__enter__()
else:
from contextlib import nullcontext
llm_span_manager = nullcontext()
start_time = time.time()
try:
if self.stream and hasattr(self.llm, "stream"):
original_stream = self.llm.stream
@ -2780,11 +2924,57 @@ class Agent:
# Restore original stream setting
self.llm.stream = original_stream
# Return the complete response for further processing
if self.enable_telemetry and record_metric:
execution_time = time.time() - start_time
record_metric(
"agent.llm.call.duration",
execution_time,
{
"agent_name": self.agent_name,
"model": self.get_current_model(),
},
)
record_metric(
"agent.llm.calls.total",
1,
{"model": self.get_current_model()},
metric_type="counter",
)
if llm_span_context:
try:
llm_span_context.__exit__(None, None, None)
except Exception:
pass
return complete_response
else:
# Restore original stream setting
self.llm.stream = original_stream
if self.enable_telemetry and record_metric:
execution_time = time.time() - start_time
record_metric(
"agent.llm.call.duration",
execution_time,
{
"agent_name": self.agent_name,
"model": self.get_current_model(),
},
)
record_metric(
"agent.llm.calls.total",
1,
{"model": self.get_current_model()},
metric_type="counter",
)
if llm_span_context:
try:
llm_span_context.__exit__(None, None, None)
except Exception:
pass
return streaming_response
else:
args = {
@ -2796,6 +2986,29 @@ class Agent:
out = self.llm.run(**args, **kwargs)
if self.enable_telemetry and record_metric:
execution_time = time.time() - start_time
record_metric(
"agent.llm.call.duration",
execution_time,
{
"agent_name": self.agent_name,
"model": self.get_current_model(),
},
)
record_metric(
"agent.llm.calls.total",
1,
{"model": self.get_current_model()},
metric_type="counter",
)
if llm_span_context:
try:
llm_span_context.__exit__(None, None, None)
except Exception:
pass
return out
except (
@ -2805,6 +3018,24 @@ class Agent:
AuthenticationError,
Exception,
) as e:
if self.enable_telemetry:
if record_metric:
record_metric(
"agent.llm.call.errors",
1,
{
"agent_name": self.agent_name,
"model": self.get_current_model(),
"error_type": type(e).__name__,
},
metric_type="counter",
)
if llm_span_context:
try:
llm_span_context.__exit__(type(e), e, None)
except Exception:
pass
logger.error(
f"Error calling LLM with model '{self.get_current_model()}': {e}. "
f"Task: {task}, Args: {args}, Kwargs: {kwargs} Traceback: {traceback.format_exc()}"

@ -34,6 +34,19 @@ from swarms.structs.mixture_of_agents import MixtureOfAgents
from swarms.structs.multi_agent_router import MultiAgentRouter
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.telemetry.log_executions import log_execution
try:
from swarms.telemetry.opentelemetry_integration import (
trace_span,
record_metric,
get_current_trace_context,
)
_OTEL_AVAILABLE = True
except ImportError:
_OTEL_AVAILABLE = False
trace_span = None
record_metric = lambda *args, **kwargs: None
get_current_trace_context = lambda: None
from swarms.utils.generate_keys import generate_api_key
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType
@ -702,27 +715,74 @@ class SwarmRouter:
Raises:
Exception: If an error occurs during task execution.
"""
self.swarm = self._create_swarm(task, *args, **kwargs)
log_execution(
swarm_id=self.id,
status="start",
swarm_config=self.to_dict(),
swarm_architecture="swarm_router",
enabled_on=self.telemetry_enabled,
span_attributes = {}
if self.telemetry_enabled and _OTEL_AVAILABLE and trace_span:
span_attributes = {
"swarm_router.id": self.id,
"swarm_router.name": self.name,
"swarm_type": str(self.swarm_type),
"task.length": len(str(task)) if task else 0,
}
span_context = (
trace_span(
f"swarm_router.run.{self.name}",
attributes=span_attributes,
)
if (self.telemetry_enabled and _OTEL_AVAILABLE and trace_span)
else None
)
args = {}
if tasks is not None:
args["tasks"] = tasks
if span_context:
span_manager = span_context.__enter__()
else:
args["task"] = task
if img is not None:
args["img"] = img
from contextlib import nullcontext
span_manager = nullcontext()
try:
self.swarm = self._create_swarm(task, *args, **kwargs)
log_execution(
swarm_id=self.id,
status="start",
swarm_config=self.to_dict(),
swarm_architecture="swarm_router",
enabled_on=self.telemetry_enabled,
)
if self.telemetry_enabled and _OTEL_AVAILABLE and record_metric:
record_metric(
"swarm_router.executions.total",
1,
{
"swarm_type": str(self.swarm_type),
"router_name": self.name,
},
metric_type="counter",
)
if (
self.telemetry_enabled
and _OTEL_AVAILABLE
and get_current_trace_context
):
context = get_current_trace_context()
if context and hasattr(self.swarm, "set_trace_context"):
try:
self.swarm.set_trace_context(context)
except Exception:
pass
args = {}
if tasks is not None:
args["tasks"] = tasks
else:
args["task"] = task
if img is not None:
args["img"] = img
if self.swarm_type == "BatchedGridWorkflow":
result = self.swarm.run(**args, **kwargs)
else:
@ -736,8 +796,32 @@ class SwarmRouter:
enabled_on=self.telemetry_enabled,
)
if span_context:
try:
span_context.__exit__(None, None, None)
except Exception:
pass
return result
except SwarmRouterRunError as e:
if self.telemetry_enabled and _OTEL_AVAILABLE:
if record_metric:
record_metric(
"swarm_router.executions.errors",
1,
{
"swarm_type": str(self.swarm_type),
"router_name": self.name,
"error_type": type(e).__name__,
},
metric_type="counter",
)
if span_context:
try:
span_context.__exit__(type(e), e, None)
except Exception:
pass
logger.error(
f"\n[SwarmRouter ERROR] '{self.name}' failed to execute the task on the selected swarm.\n"
f"Reason: {str(e)}\n"

@ -5,9 +5,35 @@ from swarms.telemetry.main import (
log_agent_data,
)
__all__ = [
"generate_user_id",
"get_machine_id",
"get_comprehensive_system_info",
"log_agent_data",
]
try:
from swarms.telemetry.opentelemetry_integration import (
get_tracer,
get_meter,
trace_span,
trace_function,
record_metric,
get_current_trace_context,
set_trace_context,
log_event,
)
__all__ = [
"generate_user_id",
"get_machine_id",
"get_comprehensive_system_info",
"log_agent_data",
"get_tracer",
"get_meter",
"trace_span",
"trace_function",
"record_metric",
"get_current_trace_context",
"set_trace_context",
"log_event",
]
except ImportError:
__all__ = [
"generate_user_id",
"get_machine_id",
"get_comprehensive_system_info",
"log_agent_data",
]

@ -0,0 +1,376 @@
"""
OpenTelemetry integration for Swarms framework.
Provides distributed tracing, metrics, and logging capabilities across
agents and multi-agent structures using OpenTelemetry standards.
Configuration via environment variables:
OTEL_SERVICE_NAME: Service name (default: "swarms")
OTEL_EXPORTER_OTLP_ENDPOINT: OTLP endpoint URL
OTEL_EXPORTER_OTLP_HEADERS: Headers for OTLP exporter (JSON format)
OTEL_TRACES_EXPORTER: Traces exporter (default: "otlp")
OTEL_METRICS_EXPORTER: Metrics exporter (default: "otlp")
OTEL_LOGS_EXPORTER: Logs exporter (default: "otlp")
OTEL_ENABLED: Enable/disable OpenTelemetry (default: "true")
OTEL_SDK_DISABLED: Disable OpenTelemetry SDK (default: "false")
"""
import os
import time
from contextlib import contextmanager
from typing import Any, Callable, Dict, Optional
from functools import wraps
from loguru import logger
_otel_available = False
_tracer = None
_meter = None
_logger = None
try:
from opentelemetry import trace, metrics, _logs
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace.propagation.tracecontext import (
TraceContextTextMapPropagator,
)
_otel_available = True
except ImportError:
pass
def _is_otel_enabled() -> bool:
"""Check if OpenTelemetry is enabled via environment variables."""
if os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true":
return False
return os.getenv("OTEL_ENABLED", "true").lower() == "true"
def _parse_headers(headers_str: str) -> Dict[str, str]:
"""Parse headers from JSON string or key=value format."""
import json
try:
return json.loads(headers_str)
except (json.JSONDecodeError, TypeError):
headers = {}
for pair in headers_str.split(","):
if "=" in pair:
key, value = pair.split("=", 1)
headers[key.strip()] = value.strip()
return headers
def _initialize_otel():
"""Initialize OpenTelemetry SDK with configuration from environment variables."""
global _tracer, _meter, _logger
if not _otel_available or not _is_otel_enabled():
return False
try:
service_name = os.getenv("OTEL_SERVICE_NAME", "swarms")
resource = Resource.create({"service.name": service_name})
trace_provider = TracerProvider(resource=resource)
otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
headers = {}
if otlp_endpoint:
headers = _parse_headers(
os.getenv("OTEL_EXPORTER_OTLP_HEADERS", "{}")
)
span_exporter = OTLPSpanExporter(
endpoint=otlp_endpoint,
headers=headers,
)
trace_provider.add_span_processor(BatchSpanProcessor(span_exporter))
trace.set_tracer_provider(trace_provider)
_tracer = trace.get_tracer(__name__)
if otlp_endpoint:
meter_provider = MeterProvider(
resource=resource,
metric_readers=[
PeriodicExportingMetricReader(
OTLPMetricExporter(
endpoint=otlp_endpoint,
headers=headers,
)
)
],
)
metrics.set_meter_provider(meter_provider)
_meter = metrics.get_meter(__name__)
logger_provider = LoggerProvider(resource=resource)
log_exporter = OTLPLogExporter(
endpoint=otlp_endpoint,
headers=headers,
)
logger_provider.add_log_record_processor(
BatchLogRecordProcessor(log_exporter)
)
_logs.set_logger_provider(logger_provider)
_logger = _logs.get_logger(__name__)
return True
except Exception as e:
logger.debug(f"Failed to initialize OpenTelemetry: {e}")
return False
if _otel_available and _is_otel_enabled():
_initialize_otel()
def get_tracer(name: Optional[str] = None):
"""Get OpenTelemetry tracer instance."""
if not _otel_available or not _is_otel_enabled():
return None
return trace.get_tracer(name or __name__)
def get_meter(name: Optional[str] = None):
"""Get OpenTelemetry meter instance."""
if not _otel_available or not _is_otel_enabled():
return None
return metrics.get_meter(name or __name__)
@contextmanager
def trace_span(
name: str,
attributes: Optional[Dict[str, Any]] = None,
kind: Optional[Any] = None,
):
"""
Context manager for creating a trace span.
Args:
name: Span name
attributes: Dictionary of span attributes
kind: Span kind (INTERNAL, SERVER, CLIENT, etc.)
"""
if not _otel_available or not _is_otel_enabled():
yield None
return
tracer = get_tracer()
if not tracer:
yield None
return
span_kind = kind if kind is not None else trace.SpanKind.INTERNAL
span = tracer.start_span(name=name, kind=span_kind)
if attributes:
for key, value in attributes.items():
try:
span.set_attribute(key, str(value))
except Exception:
pass
try:
with trace.use_span(span):
yield span
except Exception as e:
try:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
except Exception:
pass
raise
finally:
try:
span.end()
except Exception:
pass
def trace_function(
span_name: Optional[str] = None,
attributes: Optional[Dict[str, Any]] = None,
capture_args: bool = True,
):
"""
Decorator to trace function execution.
Args:
span_name: Custom span name (defaults to function name)
attributes: Additional attributes to add to span
capture_args: Whether to capture function arguments as attributes
"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
if not _otel_available or not _is_otel_enabled():
return func(*args, **kwargs)
name = span_name or f"{func.__module__}.{func.__name__}"
span_attrs = (attributes or {}).copy()
if capture_args:
import inspect
try:
sig = inspect.signature(func)
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
for param_name, param_value in bound.arguments.items():
if param_name != "self":
try:
span_attrs[
f"function.{param_name}"
] = str(param_value)[:200]
except Exception:
pass
except Exception:
pass
with trace_span(name, span_attrs):
start_time = time.time()
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
record_metric(
"function.execution.time",
execution_time,
{"function": func.__name__},
)
return result
except Exception as e:
record_metric(
"function.execution.errors",
1,
{
"function": func.__name__,
"error_type": type(e).__name__,
},
)
raise
return wrapper
return decorator
def record_metric(
name: str,
value: float,
attributes: Optional[Dict[str, str]] = None,
metric_type: str = "histogram",
):
"""
Record a metric value.
Args:
name: Metric name
value: Metric value
attributes: Metric attributes/labels
metric_type: Type of metric ("counter", "gauge", "histogram")
"""
if not _otel_available or not _is_otel_enabled() or not _meter:
return
try:
attrs = attributes or {}
if metric_type == "counter":
counter = _meter.create_counter(name)
counter.add(value, attributes=attrs)
elif metric_type == "gauge":
gauge = _meter.create_up_down_counter(name)
gauge.add(value, attributes=attrs)
elif metric_type == "histogram":
histogram = _meter.create_histogram(name)
histogram.record(value, attributes=attrs)
except Exception:
pass
def get_current_trace_context() -> Optional[Dict[str, str]]:
"""Get current trace context for propagation."""
if not _otel_available or not _is_otel_enabled():
return None
try:
propagator = TraceContextTextMapPropagator()
context_dict = {}
propagator.inject(context_dict)
return context_dict
except Exception:
return None
def set_trace_context(context: Dict[str, str]):
"""Set trace context from external source (for distributed tracing)."""
if not _otel_available or not _is_otel_enabled():
return
try:
propagator = TraceContextTextMapPropagator()
propagator.extract(context)
except Exception:
pass
def log_event(
message: str,
level: str = "INFO",
attributes: Optional[Dict[str, Any]] = None,
):
"""
Log an event with OpenTelemetry logging.
Args:
message: Log message
level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
attributes: Additional attributes
"""
if not _otel_available or not _is_otel_enabled() or not _logger:
logger.log(level, message)
return
try:
from opentelemetry._logs import SeverityNumber
severity_map = {
"DEBUG": SeverityNumber.DEBUG,
"INFO": SeverityNumber.INFO,
"WARNING": SeverityNumber.WARNING,
"ERROR": SeverityNumber.ERROR,
"CRITICAL": SeverityNumber.CRITICAL,
}
_logger.emit(
_logs.LogRecord(
body=message,
severity_number=severity_map.get(level, SeverityNumber.INFO),
attributes=attributes or {},
)
)
except Exception:
logger.log(level, message)
Loading…
Cancel
Save