pull/938/merge
harshalmore31 1 month ago committed by GitHub
commit d6d1c4b162
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -46,6 +46,71 @@ response = agent.run("Tell me a detailed story about humanity colonizing the sta
print(response)
```
## Streaming with Tools Execution
Swarms also supports real-time streaming while executing tools, providing immediate feedback on both the thinking process and tool execution results:
```python
from swarms import Agent
def get_weather(location: str, units: str = "celsius") -> str:
"""
Get the current weather for a location.
Args:
location (str): The city/location to get weather for
units (str): Temperature units (celsius or fahrenheit)
Returns:
str: Weather information
"""
weather_data = {
"New York": {"temperature": "22°C", "condition": "sunny", "humidity": "65%"},
"London": {"temperature": "15°C", "condition": "cloudy", "humidity": "80%"},
"Tokyo": {"temperature": "28°C", "condition": "rainy", "humidity": "90%"},
}
location_key = location.title()
if location_key in weather_data:
data = weather_data[location_key]
temp = data["temperature"]
if units == "fahrenheit" and "°C" in temp:
celsius = int(temp.replace("°C", ""))
fahrenheit = (celsius * 9/5) + 32
temp = f"{fahrenheit}°F"
return f"Weather in {location}: {temp}, {data['condition']}, humidity: {data['humidity']}"
else:
return f"Weather data not available for {location}"
# Create agent with streaming and tool support
agent = Agent(
model_name="gpt-4o",
max_loops=1,
verbose=True,
streaming_on=True, # Enable streaming
print_on=True, # Enable pretty printing
tools=[get_weather], # Add tools
)
# This will stream both the reasoning and tool execution results
agent.run("What is the weather in Tokyo? ")
```
### Key Features of Streaming with Tools:
- **Real-time tool execution**: See tool calls happen as they're invoked
- **Streaming responses**: Get immediate feedback on the agent's reasoning
- **Tool result integration**: Watch how tools results are incorporated into the final response
- **Interactive debugging**: Monitor the complete workflow from thought to action
### Best Practices:
1. **Set appropriate max_loops**: Use `max_loops=1` for simple tasks or higher values for complex multi-step operations
2. **Enable verbose mode**: Use `verbose=True` to see detailed tool execution logs
3. **Use print_on for UI**: Enable `print_on=True` for better visual streaming experience
4. **Monitor performance**: Streaming with tools may be slower due to real-time processing
## Connect With Us
If you'd like technical support, join our Discord below and stay updated on our Twitter for new updates!

@ -0,0 +1,44 @@
from swarms import Agent
def get_weather(location: str, units: str = "celsius") -> str:
"""
Get the current weather for a location.
Args:
location (str): The city/location to get weather for
units (str): Temperature units (celsius or fahrenheit)
Returns:
str: Weather information
"""
# Simulated weather data
weather_data = {
"New York": {"temperature": "22°C", "condition": "sunny", "humidity": "65%"},
"London": {"temperature": "15°C", "condition": "cloudy", "humidity": "80%"},
"Tokyo": {"temperature": "28°C", "condition": "rainy", "humidity": "90%"},
}
location_key = location.title()
if location_key in weather_data:
data = weather_data[location_key]
temp = data["temperature"]
if units == "fahrenheit" and "°C" in temp:
# Convert to Fahrenheit for demo
celsius = int(temp.replace("°C", ""))
fahrenheit = (celsius * 9/5) + 32
temp = f"{fahrenheit}°F"
return f"Weather in {location}: {temp}, {data['condition']}, humidity: {data['humidity']}"
else:
return f"Weather data not available for {location}"
agent = Agent(
model_name="gpt-4o",
max_loops=1,
verbose=True,
streaming_on=True,
print_on=True,
tools=[get_weather],
)
agent.run("What is the weather in Tokyo? ")

@ -1174,20 +1174,24 @@ class Agent:
**kwargs,
)
# If streaming is enabled, then don't print the response
# Handle streaming response with tools
if self.streaming_on and exists(self.tools_list_dictionary) and hasattr(response, "__iter__") and not isinstance(response, str):
response = self.tool_struct.handle_streaming_with_tools(
response=response,
llm=self.llm,
agent_name=self.agent_name,
print_on=self.print_on
)
else:
# Parse the response from the agent with the output type
if exists(self.tools_list_dictionary):
if isinstance(response, BaseModel):
response = response.model_dump()
# Parse the response from the agent with the output type
response = self.parse_llm_output(response)
self.short_memory.add(
role=self.agent_name,
content=response,
)
if isinstance(response, str) and response.strip():
self.short_memory.add(role=self.agent_name, content=response)
# Print
if self.print_on is True:
@ -1197,13 +1201,18 @@ class Agent:
f"[Structured Output] [Time: {time.strftime('%H:%M:%S')}] \n\n {json.dumps(response, indent=4)}",
loop_count,
)
elif self.streaming_on:
pass
else:
elif self.streaming_on and isinstance(response, dict) and response.get("choices"):
# Handle streaming tool calls structured output
tool_calls = response.get("choices", [{}])[0].get("message", {}).get("tool_calls", [])
if tool_calls:
self.pretty_print(
f"[Structured Output] [Time: {time.strftime('%H:%M:%S')}] \n\n {json.dumps(tool_calls, indent=4)}",
loop_count,
)
elif not self.streaming_on:
self.pretty_print(
response, loop_count
)
# Check and execute callable tools
if exists(self.tools):
self.tool_execution_retry(
@ -2207,7 +2216,15 @@ class Agent:
raise ValueError("Response is required.")
try:
# Stream and print the response token by token
# Use centralized string streaming from wrapper
if hasattr(self.llm, "handle_string_streaming"):
self.llm.handle_string_streaming(
response=response,
print_on=self.print_on,
delay=delay,
)
else:
# Fallback to original implementation if wrapper doesn't support it
for token in response.split():
print(token, end=" ", flush=True)
time.sleep(delay)
@ -2424,86 +2441,32 @@ class Agent:
del kwargs["is_last"]
try:
# Set streaming parameter in LLM if streaming is enabled
if self.streaming_on and hasattr(self.llm, "stream"):
original_stream = self.llm.stream
# Special handling for streaming with tools - need raw stream for parsing
if self.streaming_on and exists(self.tools_list_dictionary):
original_stream = getattr(self.llm, 'stream', False)
self.llm.stream = True
try:
if img is not None:
streaming_response = self.llm.run(
task=task, img=img, *args, **kwargs
)
stream_response = self.llm.run(task=task, img=img, *args, **kwargs)
else:
streaming_response = self.llm.run(
task=task, *args, **kwargs
)
# If we get a streaming response, handle it with the new streaming panel
if hasattr(
streaming_response, "__iter__"
) and not isinstance(streaming_response, str):
# Check if streaming_callback is provided (for ConcurrentWorkflow dashboard integration)
if streaming_callback is not None:
# Real-time callback streaming for dashboard integration
chunks = []
for chunk in streaming_response:
if (
hasattr(chunk, "choices")
and chunk.choices[0].delta.content
):
content = chunk.choices[
0
].delta.content
chunks.append(content)
# Call the streaming callback with the new chunk
streaming_callback(content)
complete_response = "".join(chunks)
# Check print_on parameter for different streaming behaviors
elif self.print_on is False:
# Silent streaming - no printing, just collect chunks
chunks = []
for chunk in streaming_response:
if (
hasattr(chunk, "choices")
and chunk.choices[0].delta.content
):
content = chunk.choices[
0
].delta.content
chunks.append(content)
complete_response = "".join(chunks)
else:
# Collect chunks for conversation saving
collected_chunks = []
def on_chunk_received(chunk: str):
"""Callback to collect chunks as they arrive"""
collected_chunks.append(chunk)
# Optional: Save each chunk to conversation in real-time
# This creates a more detailed conversation history
if self.verbose:
logger.debug(
f"Streaming chunk received: {chunk[:50]}..."
)
# Use the streaming panel to display and collect the response
complete_response = formatter.print_streaming_panel(
streaming_response,
title=f"🤖 Agent: {self.agent_name} Loops: {current_loop}",
style=None, # Use random color like non-streaming approach
collect_chunks=True,
on_chunk_callback=on_chunk_received,
)
# Restore original stream setting
stream_response = self.llm.run(task=task, *args, **kwargs)
return stream_response
finally:
self.llm.stream = original_stream
# Return the complete response for further processing
return complete_response
else:
# Restore original stream setting
self.llm.stream = original_stream
return streaming_response
# Use centralized streaming logic from wrapper if streaming is enabled (no tools)
elif self.streaming_on and hasattr(self.llm, "run_with_streaming"):
return self.llm.run_with_streaming(
task=task,
img=img,
streaming_callback=streaming_callback,
title=f"Agent: {self.agent_name} Loops: {current_loop}",
print_on=self.print_on,
verbose=self.verbose,
*args,
**kwargs,
)
else:
args = {
"task": task,
@ -2908,9 +2871,18 @@ class Agent:
try:
temp_llm = self.temp_llm_instance_for_tool_summary()
summary = temp_llm.run(
task=self.short_memory.get_str()
# Use centralized streaming logic for MCP tool summary
if self.streaming_on:
summary = temp_llm.run_with_streaming(
task=self.short_memory.get_str(),
title=f"Agent: {self.agent_name} - MCP Tool Summary",
style="cyan",
print_on=self.print_on,
verbose=self.verbose,
)
else:
summary = temp_llm.run(task=self.short_memory.get_str())
except Exception as e:
logger.error(
f"Error calling LLM after MCP tool execution: {e}"
@ -2918,7 +2890,7 @@ class Agent:
# Fallback: provide a default summary
summary = "I successfully executed the MCP tool and retrieved the information above."
if self.print_on is True:
if self.print_on and not self.streaming_on:
self.pretty_print(summary, loop_count=current_loop)
# Add to the memory
@ -2935,7 +2907,7 @@ class Agent:
temperature=self.temperature,
max_tokens=self.max_tokens,
system_prompt=self.system_prompt,
stream=False, # Always disable streaming for tool summaries
stream=self.streaming_on,
tools_list_dictionary=None,
parallel_tool_calls=False,
base_url=self.llm_base_url,
@ -3000,12 +2972,26 @@ class Agent:
"""
)
# Use centralized streaming logic for tool summary
if self.streaming_on:
tool_response = temp_llm.run_tool_summary_with_streaming(
tool_results=str(output),
agent_name=self.agent_name,
print_on=self.print_on,
verbose=self.verbose,
)
else:
tool_response = temp_llm.run(
f"Please analyze and summarize the following tool execution output:\n\n{output}"
)
# Add the tool response to memory
self.short_memory.add(
role=self.agent_name,
content=tool_response,
)
if self.print_on is True:
if self.print_on and not self.streaming_on:
self.pretty_print(
tool_response,
loop_count,

@ -16,6 +16,7 @@ from swarms.tools.pydantic_to_json import (
)
from swarms.tools.tool_parse_exec import parse_and_execute_json
from swarms.utils.loguru_logger import initialize_logger
from loguru import logger as loguru_logger
logger = initialize_logger(log_folder="base_tool")
@ -3063,3 +3064,77 @@ class BaseTool(BaseModel):
)
return function_calls
def handle_streaming_with_tools(
self,
response: Any,
llm: Any,
agent_name: str = "agent",
print_on: bool = True
) -> Union[str, Dict[str, Any]]:
"""
Simplified streaming response handler with tool support.
Args:
response: Streaming response object
llm: Language model instance
agent_name: Name of the agent
print_on: Whether to print streaming output
Returns:
Union[str, Dict[str, Any]]: Processed response (text or tool calls)
"""
# Validate response
if not response:
logger.warning("Empty streaming response received")
return ""
if not hasattr(response, "__iter__"):
logger.warning("Non-iterable response received for streaming")
return str(response) if response else ""
if hasattr(llm, 'parse_streaming_chunks_with_tools'):
text_response, tool_calls = llm.parse_streaming_chunks_with_tools(
stream=response,
agent_name=agent_name,
print_on=print_on,
verbose=self.verbose
)
if tool_calls:
formatted_calls = []
for tc in tool_calls:
if tc and tc.get("name"):
args = tc.get("input") or tc.get("arguments", {})
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError as e:
print(f"Warning: Failed to parse tool arguments for {tc.get('name')}: {e}")
args = {"error": f"JSON parse failed: {e}", "raw": args}
formatted_calls.append({
"type": "function",
"function": {"name": tc["name"], "arguments": json.dumps(args)},
"id": tc.get("id")
})
return {"choices": [{"message": {"tool_calls": formatted_calls}}]} if formatted_calls else text_response
return text_response
else:
# Simple fallback streaming
chunks = []
try:
for chunk in response:
if hasattr(chunk, "choices") and chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
chunks.append(content)
if print_on:
print(content, end="", flush=True)
if print_on and chunks:
print()
return "".join(chunks)
except Exception as e:
logger.error(f"Error in fallback streaming for agent {agent_name}: {e}")
return "".join(chunks) if chunks else ""

@ -1,3 +1,5 @@
import traceback
from typing import Optional, Callable
import asyncio
import base64
import traceback
@ -340,6 +342,7 @@ class LiteLLM:
# Store other types of runtime_args for debugging
completion_params["runtime_args"] = runtime_args
def output_for_tools(self, response: any):
if self.mcp_call is True:
out = response.choices[0].message.tool_calls[0].function
@ -648,6 +651,338 @@ class LiteLLM:
f"Model {self.model_name} does not support vision"
)
def _collect_streaming_chunks(self, streaming_response, callback=None):
"""Helper method to collect chunks from streaming response."""
chunks = []
for chunk in streaming_response:
if hasattr(chunk, "choices") and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
chunks.append(content)
if callback:
callback(content)
return "".join(chunks)
def _handle_streaming_response(
self,
streaming_response,
title: str = "LLM Response",
style: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
print_on: bool = True,
verbose: bool = False,
) -> str:
"""
Centralized streaming response handler for all streaming scenarios.
Args:
streaming_response: The streaming response object
title: Title for the streaming panel
style: Style for the panel (optional)
streaming_callback: Callback for real-time streaming
print_on: Whether to print the streaming output
verbose: Whether to enable verbose logging
Returns:
str: The complete response string
"""
# Non-streaming response - return as is
if not (hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str)):
return streaming_response
# Handle callback streaming
if streaming_callback is not None:
return self._collect_streaming_chunks(streaming_response, streaming_callback)
# Handle silent streaming
if not print_on:
return self._collect_streaming_chunks(streaming_response)
# Handle formatted streaming with panel
from swarms.utils.formatter import formatter
from loguru import logger
collected_chunks = []
def on_chunk_received(chunk: str):
collected_chunks.append(chunk)
if verbose:
logger.debug(f"Streaming chunk received: {chunk[:50]}...")
return formatter.print_streaming_panel(
streaming_response,
title=title,
style=style,
collect_chunks=True,
on_chunk_callback=on_chunk_received,
)
def run_with_streaming(
self,
task: str,
img: Optional[str] = None,
audio: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
title: str = "LLM Response",
style: Optional[str] = None,
print_on: bool = True,
verbose: bool = False,
*args,
**kwargs,
) -> str:
"""
Run LLM with centralized streaming handling.
Args:
task: The task/prompt to send to the LLM
img: Optional image input
audio: Optional audio input
streaming_callback: Callback for real-time streaming
title: Title for streaming panel
style: Style for streaming panel
print_on: Whether to print streaming output
verbose: Whether to enable verbose logging
Returns:
str: The complete response
"""
original_stream = self.stream
self.stream = True
try:
# Build kwargs for run method
run_kwargs = {"task": task, **kwargs}
if img is not None:
run_kwargs["img"] = img
if audio is not None:
run_kwargs["audio"] = audio
response = self.run(*args, **run_kwargs)
return self._handle_streaming_response(
response,
title=title,
style=style,
streaming_callback=streaming_callback,
print_on=print_on,
verbose=verbose,
)
finally:
self.stream = original_stream
def run_tool_summary_with_streaming(
self,
tool_results: str,
agent_name: str = "Agent",
print_on: bool = True,
verbose: bool = False,
*args,
**kwargs,
) -> str:
"""
Run tool summary with streaming support.
Args:
tool_results: The tool execution results to summarize
agent_name: Name of the agent for the panel title
print_on: Whether to print streaming output
verbose: Whether to enable verbose logging
Returns:
str: The complete summary response
"""
return self.run_with_streaming(
task=f"Please analyze and summarize the following tool execution output:\n\n{tool_results}",
title=f"Agent: {agent_name} - Tool Summary",
style="green",
print_on=print_on,
verbose=verbose,
*args,
**kwargs,
)
def handle_string_streaming(
self,
response: str,
print_on: bool = True,
delay: float = 0.01,
) -> None:
"""
Handle streaming for string responses by simulating streaming output.
Args:
response: The string response to stream
print_on: Whether to print the streaming output
delay: Delay between characters for streaming effect
"""
if not (print_on and response):
return
import time
for char in response:
print(char, end="", flush=True)
if delay > 0:
time.sleep(delay)
print() # Newline at the end
def _process_anthropic_chunk(self, chunk, current_tool_call, tool_call_buffer, tool_calls_in_stream, print_on, verbose):
"""Process Anthropic-style streaming chunks."""
import json
from loguru import logger
chunk_type = getattr(chunk, 'type', None)
full_text_response = ""
if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use':
tool_name = chunk.content_block.name
if print_on:
print(f"\nTool Call: {tool_name}...", flush=True)
current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""}
tool_call_buffer = ""
elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'):
if chunk.delta.type == 'input_json_delta':
tool_call_buffer += chunk.delta.partial_json
elif chunk.delta.type == 'text_delta':
text_chunk = chunk.delta.text
full_text_response += text_chunk
if print_on:
print(text_chunk, end="", flush=True)
elif chunk_type == 'content_block_stop' and current_tool_call:
try:
tool_input = json.loads(tool_call_buffer)
current_tool_call["input"] = tool_input
tool_calls_in_stream.append(current_tool_call)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse tool arguments: {tool_call_buffer}. Error: {e}")
# Store the raw buffer for debugging
current_tool_call["input"] = {"raw_buffer": tool_call_buffer, "error": str(e)}
tool_calls_in_stream.append(current_tool_call)
current_tool_call = None
tool_call_buffer = ""
return full_text_response, current_tool_call, tool_call_buffer
def _process_openai_chunk(self, chunk, tool_calls_in_stream, print_on, verbose):
"""Process OpenAI-style streaming chunks."""
import json
full_text_response = ""
if not (hasattr(chunk, 'choices') and chunk.choices):
return full_text_response
choice = chunk.choices[0]
if not (hasattr(choice, 'delta') and choice.delta):
return full_text_response
delta = choice.delta
# Handle text content
if hasattr(delta, 'content') and delta.content:
text_chunk = delta.content
full_text_response += text_chunk
if print_on:
print(text_chunk, end="", flush=True)
# Handle tool calls in streaming chunks
if hasattr(delta, 'tool_calls') and delta.tool_calls:
for tool_call in delta.tool_calls:
tool_index = getattr(tool_call, 'index', 0)
# Ensure we have enough slots in the list
while len(tool_calls_in_stream) <= tool_index:
tool_calls_in_stream.append(None)
if hasattr(tool_call, 'function') and tool_call.function:
func = tool_call.function
# Create new tool call if slot is empty and we have a function name
if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name:
if print_on:
print(f"\nTool Call: {func.name}...", flush=True)
tool_calls_in_stream[tool_index] = {
"id": getattr(tool_call, 'id', f"call_{tool_index}"),
"name": func.name,
"arguments": ""
}
# Accumulate arguments
if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None:
tool_calls_in_stream[tool_index]["arguments"] += func.arguments
if verbose:
logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'")
# Try to parse if we have complete JSON
try:
args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"])
tool_calls_in_stream[tool_index]["input"] = args_dict
tool_calls_in_stream[tool_index]["arguments_complete"] = True
if verbose:
logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}")
except json.JSONDecodeError:
# Continue accumulating - JSON might be incomplete
if verbose:
logger.debug(f"Incomplete JSON for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: {tool_calls_in_stream[tool_index]['arguments'][:100]}...")
return full_text_response
def parse_streaming_chunks_with_tools(
self,
stream,
agent_name: str = "Agent",
print_on: bool = True,
verbose: bool = False,
) -> tuple:
"""
Parse streaming chunks and extract both text and tool calls.
Args:
stream: The streaming response object
agent_name: Name of the agent for printing
print_on: Whether to print streaming output
verbose: Whether to enable verbose logging
Returns:
tuple: (full_text_response, tool_calls_list)
"""
full_text_response = ""
tool_calls_in_stream = []
current_tool_call = None
tool_call_buffer = ""
if print_on:
print(f"{agent_name}: ", end="", flush=True)
# Process streaming chunks in real-time
try:
for chunk in stream:
if verbose:
logger.debug(f"Processing streaming chunk: {type(chunk)}")
# Try Anthropic-style processing first
anthropic_result = self._process_anthropic_chunk(
chunk, current_tool_call, tool_call_buffer, tool_calls_in_stream, print_on, verbose
)
if anthropic_result[0]: # If text was processed
text_chunk, current_tool_call, tool_call_buffer = anthropic_result
full_text_response += text_chunk
continue
# If not Anthropic, try OpenAI-style processing
openai_text = self._process_openai_chunk(chunk, tool_calls_in_stream, print_on, verbose)
if openai_text:
full_text_response += openai_text
except Exception as e:
logger.error(f"Error processing streaming chunks: {e}")
if print_on:
print(f"\n[Streaming Error: {e}]")
return full_text_response, tool_calls_in_stream
if print_on:
print() # Newline after streaming text
return full_text_response, tool_calls_in_stream
def run(
self,
task: str,
@ -840,6 +1175,11 @@ class LiteLLM:
.message.tool_calls[0]
.function.arguments
)
# Standard completion
response = await acompletion(**completion_params)
print(response)
return response
elif self.return_all is True:
return response.model_dump()
elif "gemini" in self.model_name.lower():

Loading…
Cancel
Save