Merge pull request #615 from sambhavnoobcoder/JSON-Output-Support-Agent

Enhance Agent Logging: Comprehensive Step, Tool, and Memory Tracking in JSON format
pull/620/head
Kye Gomez 3 months ago committed by GitHub
commit 65977d5dfd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -509,6 +509,13 @@ logging.basicConfig(level=logging.INFO)
monitored_agent = MonitoredAgent("MonitoredGriptapeAgent") monitored_agent = MonitoredAgent("MonitoredGriptapeAgent")
result = monitored_agent.run("Summarize the latest AI research papers") result = monitored_agent.run("Summarize the latest AI research papers")
``` ```
Additionally the Agent class now includes built-in logging functionality and the ability to switch between JSON and string output.
To switch between JSON and string output:
- Use `output_type="str"` for string output (default)
- Use `output_type="json"` for JSON output
The `output_type` parameter determines the format of the final result returned by the `run` method. When set to "str", it returns a string representation of the agent's response. When set to "json", it returns a JSON object containing detailed information about the agent's run, including all steps and metadata.
## 6. Best Practices for Custom Agent Development ## 6. Best Practices for Custom Agent Development

@ -781,6 +781,8 @@ class Agent:
or loop_count < self.max_loops or loop_count < self.max_loops
): ):
loop_count += 1 loop_count += 1
# Log step start
current_step_id = f"step_{loop_count}_{uuid.uuid4().hex}"
self.loop_count_print(loop_count, self.max_loops) self.loop_count_print(loop_count, self.max_loops)
print("\n") print("\n")
@ -814,6 +816,8 @@ class Agent:
*response_args, **kwargs *response_args, **kwargs
) )
# Log step metadata
step_meta = self.log_step_metadata(loop_count, task_prompt, response)
# Check if response is a dictionary and has 'choices' key # Check if response is a dictionary and has 'choices' key
if ( if (
isinstance(response, dict) isinstance(response, dict)
@ -832,10 +836,18 @@ class Agent:
# Check and execute tools # Check and execute tools
if self.tools is not None: if self.tools is not None:
print( tool_result = self.parse_and_execute_tools(response)
f"self.tools is not None: {response}" if tool_result:
self.update_tool_usage(
step_meta["step_id"],
tool_result["tool"],
tool_result["args"],
tool_result["response"]
) )
self.parse_and_execute_tools(response)
# Update agent output history
self.agent_output.full_history = self.short_memory.return_history_as_string()
# Log the step metadata # Log the step metadata
logged = self.log_step_metadata( logged = self.log_step_metadata(
@ -1969,13 +1981,48 @@ class Agent:
def log_step_metadata( def log_step_metadata(
self, loop: int, task: str, response: str self, loop: int, task: str, response: str
) -> Step: ) -> Step:
# # # Step Metadata """Log metadata for each step of agent execution."""
# Generate unique step ID
step_id = f"step_{loop}_{uuid.uuid4().hex}"
# Calculate token usage
# full_memory = self.short_memory.return_history_as_string() # full_memory = self.short_memory.return_history_as_string()
# prompt_tokens = self.tokenizer.count_tokens(full_memory) # prompt_tokens = self.tokenizer.count_tokens(full_memory)
# completion_tokens = self.tokenizer.count_tokens(response) # completion_tokens = self.tokenizer.count_tokens(response)
# self.tokenizer.count_tokens(prompt_tokens + completion_tokens) # total_tokens = prompt_tokens + completion_tokens
total_tokens=self.tokenizer.count_tokens(task) + self.tokenizer.count_tokens(response),
# Get memory responses
memory_responses = {
"short_term": self.short_memory.return_history_as_string() if self.short_memory else None,
"long_term": self.long_term_memory.query(task) if self.long_term_memory else None
}
# Get tool responses if tool was used
tool_response = None
if self.tools:
try:
tool_call_output = parse_and_execute_json(self.tools, response, parse_md=True)
if tool_call_output:
tool_response = {
"tool_name": tool_call_output.get("tool_name", "unknown"),
"tool_args": tool_call_output.get("args", {}),
"tool_output": str(tool_call_output.get("output", ""))
}
except Exception as e:
logger.debug(f"No tool call detected in response: {e}")
# Create memory usage tracking
memory_usage = {
"short_term": len(self.short_memory.messages) if self.short_memory else 0,
"long_term": self.long_term_memory.count if self.long_term_memory else 0,
"responses": memory_responses
}
step_log = Step( step_log = Step(
step_id=step_id,
time=time.time(),
tokens = total_tokens,
response=AgentChatCompletionResponse( response=AgentChatCompletionResponse(
id=self.agent_id, id=self.agent_id,
agent_name=self.agent_name, agent_name=self.agent_name,
@ -1990,14 +2037,33 @@ class Agent:
), ),
# usage=UsageInfo( # usage=UsageInfo(
# prompt_tokens=prompt_tokens, # prompt_tokens=prompt_tokens,
# total_tokens=total_tokens,
# completion_tokens=completion_tokens, # completion_tokens=completion_tokens,
# total_tokens=total_tokens,
# ), # ),
tool_calls=[] if tool_response is None else [tool_response],
memory_usage=memory_usage
), ),
) )
# Update total tokens if agent_output exists
if hasattr(self, 'agent_output'):
self.agent_output.total_tokens += self.response.total_tokens
# Add step to agent output tracking
self.step_pool.append(step_log) self.step_pool.append(step_log)
def update_tool_usage(self, step_id: str, tool_name: str, tool_args: dict, tool_response: Any):
"""Update tool usage information for a specific step."""
for step in self.agent_output.steps:
if step.step_id == step_id:
step.response.tool_calls.append({
"tool": tool_name,
"arguments": tool_args,
"response": str(tool_response)
})
break
def _serialize_callable( def _serialize_callable(
self, attr_value: Callable self, attr_value: Callable
) -> Dict[str, Any]: ) -> Dict[str, Any]:

@ -0,0 +1,98 @@
from unittest.mock import Mock, MagicMock
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Any
from datetime import datetime
import unittest
from swarms.schemas.agent_step_schemas import ManySteps, Step
from swarms.structs.agent import Agent
from swarms.tools.tool_parse_exec import parse_and_execute_json
# Mock parse_and_execute_json for testing
parse_and_execute_json = MagicMock()
parse_and_execute_json.return_value = {
"tool_name": "calculator",
"args": {"numbers": [2, 2]},
"output": "4"
}
class TestAgentLogging(unittest.TestCase):
def setUp(self):
self.mock_tokenizer = MagicMock()
self.mock_tokenizer.count_tokens.return_value = 100
self.mock_short_memory = MagicMock()
self.mock_short_memory.get_memory_stats.return_value = {"message_count": 2}
self.mock_long_memory = MagicMock()
self.mock_long_memory.get_memory_stats.return_value = {"item_count": 5}
self.agent = Agent(
tokenizer=self.mock_tokenizer,
short_memory=self.mock_short_memory,
long_term_memory=self.mock_long_memory
)
def test_log_step_metadata_basic(self):
log_result = self.agent.log_step_metadata(1, "Test prompt", "Test response")
self.assertIn('step_id', log_result)
self.assertIn('timestamp', log_result)
self.assertIn('tokens', log_result)
self.assertIn('memory_usage', log_result)
self.assertEqual(log_result['tokens']['total'], 200)
def test_log_step_metadata_no_long_term_memory(self):
self.agent.long_term_memory = None
log_result = self.agent.log_step_metadata(1, "prompt", "response")
self.assertEqual(log_result['memory_usage']['long_term'], {})
def test_log_step_metadata_timestamp(self):
log_result = self.agent.log_step_metadata(1, "prompt", "response")
self.assertIn('timestamp', log_result)
def test_token_counting_integration(self):
self.mock_tokenizer.count_tokens.side_effect = [150, 250]
log_result = self.agent.log_step_metadata(1, "prompt", "response")
self.assertEqual(log_result['tokens']['total'], 400)
def test_agent_output_updating(self):
initial_total_tokens = sum(step['tokens']['total'] for step in self.agent.agent_output.steps)
self.agent.log_step_metadata(1, "prompt", "response")
final_total_tokens = sum(step['tokens']['total'] for step in self.agent.agent_output.steps)
self.assertEqual(
final_total_tokens - initial_total_tokens,
200
)
self.assertEqual(len(self.agent.agent_output.steps), 1)
class TestAgentLoggingIntegration(unittest.TestCase):
def setUp(self):
self.agent = Agent(agent_name="test-agent")
def test_full_logging_cycle(self):
task = "Test task"
max_loops = 1
result = self.agent._run(task, max_loops=max_loops)
self.assertIsInstance(result, dict)
self.assertIn('steps', result)
self.assertIsInstance(result['steps'], list)
self.assertEqual(len(result['steps']), max_loops)
if result['steps']:
step = result['steps'][0]
self.assertIn('step_id', step)
self.assertIn('timestamp', step)
self.assertIn('task', step)
self.assertIn('response', step)
self.assertEqual(step['task'], task)
self.assertEqual(step['response'], f"Response for loop 1")
self.assertTrue(len(self.agent.agent_output.steps) > 0)
if __name__ == '__main__':
unittest.main()
Loading…
Cancel
Save