pull/1029/head
Devdatta Talele 2 months ago
parent a2ccaab260
commit c7d32ac6e8

@ -1,172 +0,0 @@
# Fix for Issue #936: Agent Tool Usage with Streaming Enabled
## Problem Summary
- Agent tool usage callable doesn't work with streaming enabled
- Works without streaming well
- Tool execution logging disappeared
## Root Cause Analysis
When streaming is enabled, the LLM response chunks are collected as plain text strings, losing the structured API response format that contains tool call metadata. The `tool_struct.execute_function_calls_from_api_response()` method expects structured responses with `tool_calls` attributes, but streaming responses only contained concatenated text content.
## Solution Implementation
### 1. Created StreamingToolResponse Class
**File**: `swarms/structs/agent.py` (lines 91-104)
```python
class StreamingToolResponse:
"""
Response wrapper that preserves both content and tool calls from streaming responses.
This enables tool execution when streaming is enabled.
"""
def __init__(self, content: str, tool_calls: List[Any] = None):
self.content = content
self.tool_calls = tool_calls or []
def __str__(self):
return self.content
def __repr__(self):
return f"StreamingToolResponse(content='{self.content[:50]}...', tool_calls={len(self.tool_calls)} calls)"
```
**Purpose**: Replace dynamic type creation with a proper class that preserves both content and tool calls from streaming responses.
### 2. Enhanced Streaming Chunk Processing
**File**: `swarms/structs/agent.py` (lines 2600-2690)
**Modified all three streaming paths in `call_llm` method**:
#### A. Streaming with Callback (lines 2615-2625)
```python
# Preserve tool calls from streaming chunks
try:
if (hasattr(chunk, "choices") and
len(chunk.choices) > 0 and
hasattr(chunk.choices[0], "delta") and
hasattr(chunk.choices[0].delta, "tool_calls") and
chunk.choices[0].delta.tool_calls):
tool_calls.extend(chunk.choices[0].delta.tool_calls)
except (AttributeError, IndexError) as e:
logger.debug(f"Could not extract tool calls from chunk: {e}")
```
#### B. Silent Streaming (lines 2636-2646)
- Same tool call preservation logic as above
- Maintains streaming behavior while capturing tool calls
#### C. Streaming Panel (lines 2658-2688)
```python
# Create a tool-aware streaming processor to preserve tool calls
def tool_aware_streaming(stream_response):
for chunk in stream_response:
# Preserve tool calls from streaming chunks with error handling
try:
if (hasattr(chunk, "choices") and
len(chunk.choices) > 0 and
hasattr(chunk.choices[0], "delta") and
hasattr(chunk.choices[0].delta, "tool_calls") and
chunk.choices[0].delta.tool_calls):
tool_calls.extend(chunk.choices[0].delta.tool_calls)
except (AttributeError, IndexError) as e:
logger.debug(f"Could not extract tool calls from chunk: {e}")
yield chunk
```
**Purpose**: Prevents iterator consumption bug while preserving tool calls.
### 3. Enhanced Tool Execution Logging
**File**: `swarms/structs/agent.py` (lines 3109-3133)
```python
# Add tool execution logging
logger.info(f"Starting tool execution for agent '{self.agent_name}' in loop {loop_count}")
# Enhanced retry logic with logging
try:
output = self.tool_struct.execute_function_calls_from_api_response(response)
except Exception as e:
logger.warning(f"First attempt at tool execution failed: {e}. Retrying...")
try:
output = self.tool_struct.execute_function_calls_from_api_response(response)
except Exception as retry_error:
logger.error(f"Tool execution failed after retry: {retry_error}")
if output is None:
raise retry_error
# Log successful tool execution
if output is not None:
logger.info(f"Tool execution successful for agent '{self.agent_name}' in loop {loop_count}. Output length: {len(str(output)) if output else 0}")
else:
logger.warning(f"Tool execution completed but returned None output for agent '{self.agent_name}' in loop {loop_count}")
```
**Purpose**: Restore missing tool execution logging with comprehensive status reporting.
## Key Improvements
### 1. **Robust Error Handling**
- Added try-catch blocks around tool call extraction
- Graceful handling of malformed chunks
- Protection against `AttributeError` and `IndexError`
### 2. **Iterator Safety**
- Fixed streaming iterator consumption bug
- Proper generator pattern to avoid iterator exhaustion
### 3. **Comprehensive Logging**
- Tool execution start/success/failure logging
- Retry attempt logging
- Debug-level logging for chunk processing errors
### 4. **Backward Compatibility**
- No changes to existing non-streaming behavior
- Maintains all existing API contracts
- Falls back gracefully when no tool calls present
## Testing
Created two test files:
### 1. `test_streaming_tools.py`
- Tests streaming behavior with and without tools
- Validates tool execution occurs with streaming enabled
- Checks memory history for tool execution evidence
### 2. `test_original_issue.py`
- Reproduces exact code from GitHub issue #936
- Uses original function signatures and agent configuration
- Validates the specific use case reported in the issue
## Files Modified
1. **`swarms/structs/agent.py`**
- Added `StreamingToolResponse` class
- Enhanced streaming chunk processing in `call_llm` method
- Improved tool execution logging in `execute_tools` method
2. **Created Test Files**
- `test_streaming_tools.py` - Comprehensive streaming + tool tests
- `test_original_issue.py` - Reproduction of original issue scenario
## Verification
The solution addresses both reported issues:
**Tool Usage with Streaming**: Tool calls are now preserved and executed when streaming is enabled
**Tool Execution Logging**: Comprehensive logging is now present throughout the tool execution process
## Edge Cases Handled
1. **Malformed Chunks**: Graceful error handling prevents crashes
2. **Empty Tool Calls**: Proper validation before processing
3. **Iterator Consumption**: Safe streaming processing without iterator exhaustion
4. **Mixed Content**: Handles chunks with both content and tool calls
5. **Multiple Tool Calls**: Supports multiple tool calls in single or multiple chunks
## Performance Impact
- **Minimal**: Only additional memory for tool call arrays during streaming
- **Efficient**: Tool call extraction only occurs when chunks contain them
- **Scalable**: Handles multiple concurrent streaming agents safely

@ -1,173 +0,0 @@
#!/usr/bin/env python3
"""
Reproduce the exact code from GitHub issue #936 to test the fix.
"""
from typing import List
import http.client
import json
from swarms import Agent
import os
def get_realtor_data_from_one_source(location: str):
"""
Fetch rental property data from the Realtor API for a specified location.
Args:
location (str): The location to search for rental properties (e.g., "Menlo Park, CA")
Returns:
str: JSON-formatted string containing rental property data
Raises:
http.client.HTTPException: If the API request fails
json.JSONDecodeError: If the response cannot be parsed as JSON
"""
# Mock implementation since we don't have API key
return json.dumps({
"properties": [
{
"name": f"Sample Property in {location}",
"address": f"123 Main St, {location}",
"price": 2800,
"bedrooms": 2,
"bathrooms": 1
}
]
}, indent=2)
def get_realtor_data_from_multiple_sources(locations: List[str]):
"""
Fetch rental property data from multiple sources for a specified location.
Args:
location (List[str]): List of locations to search for rental properties (e.g., ["Menlo Park, CA", "Palo Alto, CA"])
"""
output = []
for location in locations:
data = get_realtor_data_from_one_source(location)
output.append(data)
return output
def test_original_issue():
"""Test the exact scenario from the GitHub issue"""
print("🧪 Testing Original Issue #936 Code...")
agent = Agent(
agent_name="Rental-Property-Specialist",
system_prompt="""
You are an expert rental property specialist with deep expertise in real estate analysis and tenant matching. Your core responsibilities include:
1. Property Analysis & Evaluation
- Analyze rental property features and amenities
- Evaluate location benefits and drawbacks
- Assess property condition and maintenance needs
- Compare rental rates with market standards
- Review lease terms and conditions
- Identify potential red flags or issues
2. Location Assessment
- Analyze neighborhood safety and demographics
- Evaluate proximity to amenities (schools, shopping, transit)
- Research local market trends and development plans
- Consider noise levels and traffic patterns
- Assess parking availability and restrictions
- Review zoning regulations and restrictions
3. Financial Analysis
- Calculate price-to-rent ratios
- Analyze utility costs and included services
- Evaluate security deposit requirements
- Consider additional fees (pet rent, parking, etc.)
- Compare with similar properties in the area
- Assess potential for rent increases
4. Tenant Matching
- Match properties to tenant requirements
- Consider commute distances
- Evaluate pet policies and restrictions
- Assess lease term flexibility
- Review application requirements
- Consider special accommodations needed
5. Documentation & Compliance
- Review lease agreement terms
- Verify property certifications
- Check compliance with local regulations
- Assess insurance requirements
- Review maintenance responsibilities
- Document property condition
When analyzing properties, always consider:
- Value for money
- Location quality
- Property condition
- Lease terms fairness
- Safety and security
- Maintenance and management quality
- Future market potential
- Tenant satisfaction factors
Provide clear, objective analysis while maintaining professional standards and ethical considerations.""",
model_name="claude-3-sonnet-20240229",
max_loops=2,
verbose=True,
streaming_on=True, # THIS WAS CAUSING THE ISSUE
print_on=True,
tools=[get_realtor_data_from_multiple_sources],
api_key=os.getenv("ANTHROPIC_API_KEY"), # Use appropriate API key
)
task = "What are the best properties in Menlo park and palo alto for rent under 3,000$"
try:
print(f"📝 Running task: {task}")
print("🔄 With streaming=True and tools enabled...")
result = agent.run(task)
print(f"\n✅ Result: {result}")
# Check if tool was executed
memory_history = agent.short_memory.return_history_as_string()
if "Tool Executor" in memory_history:
print("\n✅ SUCCESS: Tool was executed successfully with streaming enabled!")
print("🎉 Issue #936 appears to be FIXED!")
return True
else:
print("\n❌ FAILURE: Tool execution was not detected")
print("🚨 Issue #936 is NOT fixed yet")
print("\nMemory History:")
print(memory_history)
return False
except Exception as e:
print(f"\n❌ FAILURE: Exception occurred: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
print("🔧 Testing the Exact Code from GitHub Issue #936")
print("=" * 60)
# Check if API key is available
if not os.getenv("ANTHROPIC_API_KEY") and not os.getenv("OPENAI_API_KEY"):
print("⚠️ Warning: Neither ANTHROPIC_API_KEY nor OPENAI_API_KEY are set.")
print("Setting a dummy key for testing purposes...")
os.environ["ANTHROPIC_API_KEY"] = "dummy-key-for-testing"
success = test_original_issue()
print("\n" + "=" * 60)
if success:
print("🎉 SUCCESS: The original issue appears to be RESOLVED!")
print("✅ Agent tool usage now works with streaming enabled")
print("✅ Tool execution logging is now present")
else:
print("❌ FAILURE: The original issue is NOT fully resolved yet")
print("🔧 Additional fixes may be needed")

@ -1,164 +0,0 @@
#!/usr/bin/env python3
"""
Test script to reproduce and verify the fix for issue #936:
Agent tool usage fails when streaming is enabled.
"""
from typing import List
import json
from swarms import Agent
import os
import logging
# Set up logging to see the tool execution logs
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def simple_calculator_tool(operation: str, num1: float, num2: float) -> str:
"""
Simple calculator tool for testing.
Args:
operation: The operation to perform (add, subtract, multiply, divide)
num1: First number
num2: Second number
Returns:
str: Result of the calculation
"""
logger.info(f"Calculator tool called: {operation}({num1}, {num2})")
if operation == "add":
result = num1 + num2
elif operation == "subtract":
result = num1 - num2
elif operation == "multiply":
result = num1 * num2
elif operation == "divide":
if num2 == 0:
return "Error: Division by zero"
result = num1 / num2
else:
return f"Error: Unknown operation {operation}"
return f"The result of {operation}({num1}, {num2}) is {result}"
def test_agent_streaming_with_tools():
"""Test that agent can use tools when streaming is enabled"""
print("🧪 Testing Agent with Streaming + Tools...")
# Create agent with streaming enabled
agent = Agent(
agent_name="Calculator-Agent",
system_prompt="""
You are a helpful calculator assistant. When asked to perform calculations,
use the simple_calculator_tool to compute the result.
Available tool:
- simple_calculator_tool(operation, num1, num2): Performs basic calculations
Always use the tool for calculations instead of doing them yourself.
""",
model_name="gpt-3.5-turbo", # Using a common model for testing
max_loops=2, # Allow for tool execution + response
verbose=True,
streaming_on=True, # THIS IS THE KEY - streaming enabled
print_on=True,
tools=[simple_calculator_tool],
# Add any necessary API keys from environment
api_key=os.getenv("OPENAI_API_KEY"),
)
# Test task that should trigger tool usage
task = "Please calculate 25 + 17 for me using the calculator tool"
print(f"\n📝 Task: {task}")
print("\n🔄 Running agent with streaming + tools...")
try:
result = agent.run(task)
print(f"\n✅ Result: {result}")
# Check if the tool was actually executed by looking at memory
memory_history = agent.short_memory.return_history_as_string()
if "Tool Executor" in memory_history:
print("✅ SUCCESS: Tool execution found in memory history!")
return True
else:
print("❌ FAILURE: No tool execution found in memory history")
print("Memory history:")
print(memory_history)
return False
except Exception as e:
print(f"❌ FAILURE: Exception occurred: {e}")
import traceback
traceback.print_exc()
return False
def test_agent_streaming_without_tools():
"""Test that agent works normally when streaming is enabled but no tools needed"""
print("\n🧪 Testing Agent with Streaming (No Tools)...")
agent = Agent(
agent_name="Simple-Agent",
system_prompt="You are a helpful assistant.",
model_name="gpt-3.5-turbo",
max_loops=1,
verbose=True,
streaming_on=True, # Streaming enabled
print_on=True,
api_key=os.getenv("OPENAI_API_KEY"),
)
task = "What is the capital of France?"
print(f"\n📝 Task: {task}")
print("\n🔄 Running agent with streaming (no tools)...")
try:
result = agent.run(task)
print(f"\n✅ Result: {result}")
if "Paris" in str(result):
print("✅ SUCCESS: Agent responded correctly without tools")
return True
else:
print("❌ FAILURE: Agent didn't provide expected response")
return False
except Exception as e:
print(f"❌ FAILURE: Exception occurred: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
print("🔧 Testing Fix for Issue #936: Agent Tool Usage with Streaming")
print("=" * 60)
# Check if API key is available
if not os.getenv("OPENAI_API_KEY"):
print("⚠️ Warning: OPENAI_API_KEY not set. Tests may fail.")
print("Please set OPENAI_API_KEY environment variable to run tests.")
# Run tests
test1_passed = test_agent_streaming_without_tools()
test2_passed = test_agent_streaming_with_tools()
print("\n" + "=" * 60)
print("📊 Test Results:")
print(f" Test 1 (Streaming without tools): {'✅ PASSED' if test1_passed else '❌ FAILED'}")
print(f" Test 2 (Streaming with tools): {'✅ PASSED' if test2_passed else '❌ FAILED'}")
if test1_passed and test2_passed:
print("\n🎉 ALL TESTS PASSED! The fix appears to be working correctly.")
else:
print("\n⚠️ SOME TESTS FAILED! The fix may need additional work.")
Loading…
Cancel
Save