diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index f8c84d73..b94bc44d 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -75,6 +75,19 @@ from swarms.structs.transforms import ( handle_transforms, ) from swarms.telemetry.main import log_agent_data + +try: + from swarms.telemetry.opentelemetry_integration import ( + trace_span, + record_metric, + log_event, + ) + _OTEL_AVAILABLE = True +except ImportError: + _OTEL_AVAILABLE = False + trace_span = None + record_metric = lambda *args, **kwargs: None + log_event = lambda *args, **kwargs: None from swarms.tools.base_tool import BaseTool from swarms.tools.mcp_client_tools import ( execute_multiple_tools_on_multiple_mcp_servers_sync, @@ -470,6 +483,7 @@ class Agent: mode: Literal["interactive", "fast", "standard"] = "standard", publish_to_marketplace: bool = False, use_cases: Optional[List[Dict[str, Any]]] = None, + enable_telemetry: Optional[bool] = None, *args, **kwargs, ): @@ -623,6 +637,15 @@ class Agent: self.mode = mode self.publish_to_marketplace = publish_to_marketplace + if enable_telemetry is None: + import os + self.enable_telemetry = ( + _OTEL_AVAILABLE + and os.getenv("OTEL_ENABLED", "true").lower() == "true" + ) + else: + self.enable_telemetry = enable_telemetry and _OTEL_AVAILABLE + # Initialize transforms if transforms is None: self.transforms = None @@ -1215,7 +1238,42 @@ class Agent: agent(task="Summarize this document.", img="path/to/image.jpg") agent(task="Analyze this image.", img="path/to/image.jpg", is_last=True) """ + span_attributes = {} + if self.enable_telemetry and trace_span: + span_attributes = { + "agent.id": self.id, + "agent.name": self.agent_name, + "agent.model": self.get_current_model(), + "agent.max_loops": str(self.max_loops), + "task.length": len(str(task)) if task else 0, + } + + span_context = ( + trace_span( + f"agent.run.{self.agent_name}", + attributes=span_attributes, + ) + if (self.enable_telemetry and trace_span) + else None + ) + + if span_context: + span_manager = span_context.__enter__() + else: + from contextlib import nullcontext + span_manager = nullcontext() + try: + if self.enable_telemetry and record_metric: + record_metric( + "agent.executions.total", + 1, + { + "agent_name": self.agent_name, + "model": self.get_current_model(), + }, + metric_type="counter", + ) self.check_if_no_prompt_then_autogenerate(task) @@ -1250,6 +1308,17 @@ class Agent: ): loop_count += 1 + if self.enable_telemetry and record_metric: + record_metric( + "agent.loops", + 1, + { + "agent_name": self.agent_name, + "loop_number": str(loop_count), + }, + metric_type="counter", + ) + # Handle RAG query every loop if ( self.long_term_memory is not None @@ -1457,15 +1526,63 @@ class Agent: self.save() - # Output formatting based on output_type - return history_output_formatter( + if self.enable_telemetry and record_metric: + record_metric( + "agent.executions.success", + 1, + {"agent_name": self.agent_name}, + metric_type="counter", + ) + + result = history_output_formatter( self.short_memory, type=self.output_type ) + if span_context: + try: + span_context.__exit__(None, None, None) + except Exception: + pass + + return result + except Exception as error: + if self.enable_telemetry: + if record_metric: + record_metric( + "agent.executions.errors", + 1, + { + "agent_name": self.agent_name, + "error_type": type(error).__name__, + }, + metric_type="counter", + ) + if log_event: + log_event( + f"Agent {self.agent_name} execution failed", + level="ERROR", + attributes={ + "agent_id": self.id, + "error_type": type(error).__name__, + "error_message": str(error)[:200], + }, + ) + + if span_context: + try: + span_context.__exit__(type(error), error, None) + except Exception: + pass + self._handle_run_error(error) except KeyboardInterrupt as error: + if span_context: + try: + span_context.__exit__(type(error), error, None) + except Exception: + pass self._handle_run_error(error) def _handle_run_error(self, error: any): @@ -2565,6 +2682,33 @@ class Agent: if "is_last" in kwargs: del kwargs["is_last"] + span_attrs = {} + if self.enable_telemetry and trace_span: + span_attrs = { + "agent.name": self.agent_name, + "agent.model": self.get_current_model(), + "agent.loop": str(current_loop), + "task.length": len(task), + "has_image": str(img is not None), + } + + llm_span_context = ( + trace_span( + "agent.llm.call", + attributes=span_attrs, + ) + if (self.enable_telemetry and trace_span) + else None + ) + + if llm_span_context: + llm_span_manager = llm_span_context.__enter__() + else: + from contextlib import nullcontext + llm_span_manager = nullcontext() + + start_time = time.time() + try: # Set streaming parameter in LLM if streaming is enabled if self.streaming_on and hasattr(self.llm, "stream"): @@ -2640,11 +2784,57 @@ class Agent: # Restore original stream setting self.llm.stream = original_stream - # Return the complete response for further processing + if self.enable_telemetry and record_metric: + execution_time = time.time() - start_time + record_metric( + "agent.llm.call.duration", + execution_time, + { + "agent_name": self.agent_name, + "model": self.get_current_model(), + }, + ) + record_metric( + "agent.llm.calls.total", + 1, + {"model": self.get_current_model()}, + metric_type="counter", + ) + + if llm_span_context: + try: + llm_span_context.__exit__(None, None, None) + except Exception: + pass + return complete_response else: # Restore original stream setting self.llm.stream = original_stream + + if self.enable_telemetry and record_metric: + execution_time = time.time() - start_time + record_metric( + "agent.llm.call.duration", + execution_time, + { + "agent_name": self.agent_name, + "model": self.get_current_model(), + }, + ) + record_metric( + "agent.llm.calls.total", + 1, + {"model": self.get_current_model()}, + metric_type="counter", + ) + + if llm_span_context: + try: + llm_span_context.__exit__(None, None, None) + except Exception: + pass + return streaming_response else: args = { @@ -2656,6 +2846,29 @@ class Agent: out = self.llm.run(**args, **kwargs) + if self.enable_telemetry and record_metric: + execution_time = time.time() - start_time + record_metric( + "agent.llm.call.duration", + execution_time, + { + "agent_name": self.agent_name, + "model": self.get_current_model(), + }, + ) + record_metric( + "agent.llm.calls.total", + 1, + {"model": self.get_current_model()}, + metric_type="counter", + ) + + if llm_span_context: + try: + llm_span_context.__exit__(None, None, None) + except Exception: + pass + return out except ( @@ -2665,6 +2878,24 @@ class Agent: AuthenticationError, Exception, ) as e: + if self.enable_telemetry: + if record_metric: + record_metric( + "agent.llm.call.errors", + 1, + { + "agent_name": self.agent_name, + "model": self.get_current_model(), + "error_type": type(e).__name__, + }, + metric_type="counter", + ) + if llm_span_context: + try: + llm_span_context.__exit__(type(e), e, None) + except Exception: + pass + logger.error( f"Error calling LLM with model '{self.get_current_model()}': {e}. " f"Task: {task}, Args: {args}, Kwargs: {kwargs} Traceback: {traceback.format_exc()}"