[DOCS][SwarmRouter]

pull/1221/head
Kye Gomez 1 week ago
parent 129a10a872
commit da9618df0e

@ -4,13 +4,13 @@ The `SwarmRouter` class is a flexible routing system designed to manage differen
Full Path: `from swarms.structs.swarm_router`
## Initialization Parameters
Main class for routing tasks to different swarm types.
| Attribute | Type | Description |
| --- | --- | --- |
| `id` | str | Unique identifier for the SwarmRouter instance (auto-generated if not provided) |
| `name` | str | Name of the SwarmRouter instance |
| `description` | str | Description of the SwarmRouter's purpose |
| `max_loops` | int | Maximum number of loops to perform |
@ -24,35 +24,84 @@ Main class for routing tasks to different swarm types.
| `rules` | str | Rules to inject into every agent |
| `documents` | List[str] | List of document file paths |
| `output_type` | OutputType | Output format type (e.g., "string", "dict", "list", "json", "yaml", "xml") |
| `no_cluster_ops` | bool | Flag to disable cluster operations |
| `speaker_fn` | callable | Speaker function for GroupChat swarm type |
| `load_agents_from_csv` | bool | Flag to enable/disable loading agents from CSV |
| `csv_file_path` | str | Path to the CSV file for loading agents |
| `return_entire_history` | bool | Flag to enable/disable returning the entire conversation history |
| `multi_agent_collab_prompt` | bool | Whether to enable multi-agent collaboration prompts |
| `list_all_agents` | bool | Flag to enable/disable listing all agents to each other |
| `conversation` | Any | Conversation object for managing agent interactions |
| `agents_config` | Optional[Dict[Any, Any]] | Configuration dictionary for agents |
| `speaker_function` | str | Speaker function name for InteractiveGroupChat swarm type |
| `heavy_swarm_loops_per_agent` | int | Number of loops per agent for HeavySwarm (default: 1) |
| `heavy_swarm_question_agent_model_name` | str | Model name for the question agent in HeavySwarm (default: "gpt-4.1") |
| `heavy_swarm_worker_model_name` | str | Model name for worker agents in HeavySwarm (default: "gpt-4.1") |
| `heavy_swarm_swarm_show_output` | bool | Flag to show output for HeavySwarm (default: True) |
| `telemetry_enabled` | bool | Flag to enable/disable telemetry logging (default: False) |
| `council_judge_model_name` | str | Model name for the judge in CouncilAsAJudge (default: "gpt-4o-mini") |
| `verbose` | bool | Flag to enable/disable verbose logging (default: False) |
| `worker_tools` | List[Callable] | List of tools available to worker agents |
| `aggregation_strategy` | str | Aggregation strategy for HeavySwarm (default: "synthesis") |
#### Methods:
### Methods
| Method | Parameters | Description |
| --- | --- | --- |
| `__init__` | `name: str = "swarm-router", description: str = "Routes your task to the desired swarm", max_loops: int = 1, agents: List[Union[Agent, Callable]] = [], swarm_type: SwarmType = "SequentialWorkflow", autosave: bool = False, rearrange_flow: str = None, return_json: bool = False, auto_generate_prompts: bool = False, shared_memory_system: Any = None, rules: str = None, documents: List[str] = [], output_type: OutputType = "dict", no_cluster_ops: bool = False, speaker_fn: callable = None, load_agents_from_csv: bool = False, csv_file_path: str = None, return_entire_history: bool = True, multi_agent_collab_prompt: bool = True` | Initialize the SwarmRouter |
| `setup` | None | Set up the SwarmRouter by activating APE and handling shared memory and rules |
| `activate_shared_memory` | None | Activate shared memory with all agents |
| `handle_rules` | None | Inject rules to every agent |
| `activate_ape` | None | Activate automatic prompt engineering for agents that support it |
| `reliability_check` | None | Perform reliability checks on the SwarmRouter configuration |
| `_create_swarm` | `task: str = None, *args, **kwargs` | Create and return the specified swarm type |
| `update_system_prompt_for_agent_in_swarm` | None | Update system prompts for all agents with collaboration prompts |
| `_log` | `level: str, message: str, task: str = "", metadata: Dict[str, Any] = None` | Create a log entry |
| `_run` | `task: str, img: Optional[str] = None, model_response: Optional[str] = None, *args, **kwargs` | Run the specified task on the selected swarm type |
| `run` | `task: str, img: Optional[str] = None, model_response: Optional[str] = None, *args, **kwargs` | Execute a task on the selected swarm type |
| `__call__` | `task: str, *args, **kwargs` | Make the SwarmRouter instance callable |
| `batch_run` | `tasks: List[str], *args, **kwargs` | Execute multiple tasks in sequence |
| `async_run` | `task: str, *args, **kwargs` | Execute a task asynchronously |
| `get_logs` | None | Retrieve all logged entries |
| `concurrent_run` | `task: str, *args, **kwargs` | Execute a task using concurrent execution |
| `concurrent_batch_run` | `tasks: List[str], *args, **kwargs` | Execute multiple tasks concurrently |
#### `run()`
Execute a task on the selected swarm type.
**Input Parameters:**
| Parameter | Type | Required | Default | Description |
| --- | --- | --- | --- | --- |
| `task` | `Optional[str]` | No | `None` | The task to be executed by the swarm |
| `img` | `Optional[str]` | No | `None` | Path to an image file for vision tasks |
| `tasks` | `Optional[List[str]]` | No | `None` | List of tasks (used for BatchedGridWorkflow) |
| `*args` | `Any` | No | - | Variable length argument list |
| `**kwargs` | `Any` | No | - | Arbitrary keyword arguments |
**Output:**
| Type | Description |
| --- | --- |
| `Any` | The result of the swarm's execution. The exact type depends on the `output_type` configuration (e.g., `str`, `dict`, `list`, `json`, `yaml`, `xml`) |
**Example:**
```python
result = router.run(
task="Analyze the market trends and provide recommendations",
img="chart.png" # Optional
)
```
---
### `batch_run()`
Execute multiple tasks in sequence on the selected swarm type.
**Input Parameters:**
| Parameter | Type | Required | Default | Description |
| --- | --- | --- | --- | --- |
| `tasks` | `List[str]` | Yes | - | List of tasks to be executed sequentially |
| `img` | `Optional[str]` | No | `None` | Path to an image file for vision tasks |
| `imgs` | `Optional[List[str]]` | No | `None` | List of image file paths for vision tasks |
| `*args` | `Any` | No | - | Variable length argument list |
| `**kwargs` | `Any` | No | - | Arbitrary keyword arguments |
**Output:**
| Type | Description |
| --- | --- |
| `List[Any]` | A list of results from the swarm's execution, one result per task. Each result type depends on the `output_type` configuration |
**Example:**
```python
tasks = ["Analyze Q1 report", "Summarize competitor landscape", "Evaluate market trends"]
results = router.batch_run(tasks, img="report.png") # Optional img parameter
```
## Available Swarm Types
@ -62,7 +111,6 @@ The `SwarmRouter` supports many various multi-agent architectures for various ap
|------------|-------------|
| `AgentRearrange` | Optimizes agent arrangement for task execution |
| `MixtureOfAgents` | Combines multiple agent types for diverse tasks |
| `SpreadSheetSwarm` | Uses spreadsheet-like operations for task management |
| `SequentialWorkflow` | Executes tasks sequentially |
| `ConcurrentWorkflow` | Executes tasks in parallel |
| `GroupChat` | Facilitates communication among agents in a group chat format |
@ -73,10 +121,10 @@ The `SwarmRouter` supports many various multi-agent architectures for various ap
| `MALT` | Multi-Agent Language Tasks |
| `CouncilAsAJudge` | Council-based judgment system |
| `InteractiveGroupChat` | Interactive group chat with user participation |
| `HeavySwarm` | Heavy swarm architecture with question and worker agents |
| `BatchedGridWorkflow` | Batched grid workflow for parallel task processing |
| `auto` | Automatically selects best swarm type via embedding search |
## Basic Usage
```python
@ -129,9 +177,13 @@ router = SwarmRouter(
if __name__ == "__main__":
# Run a comprehensive private equity document analysis task
result = router.run(
"Where is the best place to find template term sheets for series A startups? Provide links and references"
task="Where is the best place to find template term sheets for series A startups? Provide links and references",
img=None # Optional: provide image path for vision tasks
)
print(result)
# For BatchedGridWorkflow, you can pass multiple tasks:
# result = router.run(tasks=["Task 1", "Task 2", "Task 3"])
```
## Advanced Usage
@ -225,22 +277,6 @@ mixture_router = SwarmRouter(
result = mixture_router.run("Evaluate the potential acquisition of TechStartup Inc.")
```
### SpreadSheetSwarm
Use Case: Collaborative data processing and analysis.
```python
spreadsheet_router = SwarmRouter(
name="DataProcessor",
description="Collaborative data processing and analysis",
max_loops=1,
agents=[data_cleaner, statistical_analyzer, visualizer],
swarm_type="SpreadSheetSwarm"
)
result = spreadsheet_router.run("Process and visualize customer churn data")
```
### SequentialWorkflow
Use Case: Step-by-step document analysis and report generation.
@ -379,6 +415,47 @@ result = interactive_chat_router.run("Discuss the market trends and provide inte
The InteractiveGroupChat allows for dynamic interaction between agents and users, enabling real-time participation in group discussions and decision-making processes. This is particularly useful for scenarios requiring human input or validation during the conversation flow.
### HeavySwarm
Use Case: Complex task decomposition with question and worker agents.
```python
heavy_swarm_router = SwarmRouter(
name="HeavySwarm",
description="Complex task decomposition and execution",
swarm_type="HeavySwarm",
heavy_swarm_loops_per_agent=2,
heavy_swarm_question_agent_model_name="gpt-4.1",
heavy_swarm_worker_model_name="gpt-4.1",
heavy_swarm_swarm_show_output=True,
worker_tools=[tool1, tool2],
aggregation_strategy="synthesis",
output_type="string"
)
result = heavy_swarm_router.run("Analyze market trends and provide comprehensive recommendations")
```
HeavySwarm uses a question agent to decompose complex tasks and worker agents to execute subtasks, making it ideal for complex problem-solving scenarios.
### BatchedGridWorkflow
Use Case: Parallel processing of multiple tasks in a batched grid format.
```python
batched_grid_router = SwarmRouter(
name="BatchedGridWorkflow",
description="Process multiple tasks in parallel batches",
max_loops=1,
agents=[agent1, agent2, agent3],
swarm_type="BatchedGridWorkflow"
)
result = batched_grid_router.run(tasks=["Task 1", "Task 2", "Task 3"])
```
BatchedGridWorkflow is designed for efficiently processing multiple tasks in parallel batches, optimizing resource utilization.
## Advanced Features
### Processing Documents
@ -402,15 +479,7 @@ To process multiple tasks in a batch:
```python
tasks = ["Analyze Q1 report", "Summarize competitor landscape", "Evaluate market trends"]
results = router.batch_run(tasks)
```
### Asynchronous Execution
For asynchronous task execution:
```python
result = await router.async_run("Generate financial projections")
results = router.batch_run(tasks, img="image.png") # Optional: img parameter for image tasks
```
### Concurrent Execution
@ -418,16 +487,7 @@ result = await router.async_run("Generate financial projections")
To run a single task concurrently:
```python
result = router.concurrent_run("Analyze multiple data streams")
```
### Concurrent Batch Processing
To process multiple tasks concurrently:
```python
tasks = ["Task 1", "Task 2", "Task 3"]
results = router.concurrent_batch_run(tasks)
result = router.concurrent_run("Analyze multiple data streams", img="image.png") # Optional: img parameter
```
### Using the SwarmRouter as a Callable

@ -1,4 +1,3 @@
import json
from swarms import Agent
@ -12,7 +11,7 @@ agent = Agent(
dynamic_context_window=True,
streaming_on=False,
top_p=None,
output_type="dict",
stream=True,
)
out = agent.run(
@ -20,4 +19,5 @@ out = agent.run(
n=1,
)
print(json.dumps(out, indent=4))
for token in out:
print(token, end="", flush=True)

@ -84,4 +84,3 @@ print(history)
# Get the final refined answer
final_answer = strategy_debate.get_final_answer()
print(final_answer)

@ -80,4 +80,3 @@ print(result)
# Get the final refined answer
final_answer = debate_system.get_final_answer()
print(final_answer)

@ -68,4 +68,3 @@ results = architecture_debate.batched_run(architecture_questions)
# Display results
for result in results:
print(result)

@ -2584,7 +2584,9 @@ class Agent:
task=task, *args, **kwargs
)
if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str):
if hasattr(
streaming_response, "__iter__"
) and not isinstance(streaming_response, str):
complete_response = ""
token_count = 0
final_chunk = None
@ -2594,7 +2596,10 @@ class Agent:
if first_chunk is None:
first_chunk = chunk
if hasattr(chunk, "choices") and chunk.choices[0].delta.content:
if (
hasattr(chunk, "choices")
and chunk.choices[0].delta.content
):
content = chunk.choices[0].delta.content
complete_response += content
token_count += 1
@ -2602,20 +2607,46 @@ class Agent:
# Schema per token outputted
token_info = {
"token_index": token_count,
"model": getattr(chunk, 'model', self.get_current_model()),
"id": getattr(chunk, 'id', ''),
"created": getattr(chunk, 'created', int(time.time())),
"object": getattr(chunk, 'object', 'chat.completion.chunk'),
"model": getattr(
chunk,
"model",
self.get_current_model(),
),
"id": getattr(chunk, "id", ""),
"created": getattr(
chunk, "created", int(time.time())
),
"object": getattr(
chunk,
"object",
"chat.completion.chunk",
),
"token": content,
"system_fingerprint": getattr(chunk, 'system_fingerprint', ''),
"finish_reason": chunk.choices[0].finish_reason,
"citations": getattr(chunk, 'citations', None),
"provider_specific_fields": getattr(chunk, 'provider_specific_fields', None),
"service_tier": getattr(chunk, 'service_tier', 'default'),
"obfuscation": getattr(chunk, 'obfuscation', None),
"usage": getattr(chunk, 'usage', None),
"system_fingerprint": getattr(
chunk, "system_fingerprint", ""
),
"finish_reason": chunk.choices[
0
].finish_reason,
"citations": getattr(
chunk, "citations", None
),
"provider_specific_fields": getattr(
chunk,
"provider_specific_fields",
None,
),
"service_tier": getattr(
chunk, "service_tier", "default"
),
"obfuscation": getattr(
chunk, "obfuscation", None
),
"usage": getattr(
chunk, "usage", None
),
"logprobs": chunk.choices[0].logprobs,
"timestamp": time.time()
"timestamp": time.time(),
}
print(f"ResponseStream {token_info}")
@ -2625,43 +2656,50 @@ class Agent:
final_chunk = chunk
#Final ModelResponse to stream
if final_chunk and hasattr(final_chunk, 'usage') and final_chunk.usage:
# Final ModelResponse to stream
if (
final_chunk
and hasattr(final_chunk, "usage")
and final_chunk.usage
):
usage = final_chunk.usage
print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', "
f"created={getattr(final_chunk, 'created', 'N/A')}, "
f"model='{getattr(final_chunk, 'model', self.get_current_model())}', "
f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', "
f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', "
f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', "
f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, "
f"function_call=None, tool_calls=None, audio=None), logprobs=None)], "
f"provider_specific_fields=None, "
f"usage=Usage(completion_tokens={usage.completion_tokens}, "
f"prompt_tokens={usage.prompt_tokens}, "
f"total_tokens={usage.total_tokens}, "
f"completion_tokens_details=CompletionTokensDetailsWrapper("
f"accepted_prediction_tokens={usage.completion_tokens_details.accepted_prediction_tokens}, "
f"audio_tokens={usage.completion_tokens_details.audio_tokens}, "
f"reasoning_tokens={usage.completion_tokens_details.reasoning_tokens}, "
f"rejected_prediction_tokens={usage.completion_tokens_details.rejected_prediction_tokens}, "
f"text_tokens={usage.completion_tokens_details.text_tokens}), "
f"prompt_tokens_details=PromptTokensDetailsWrapper("
f"audio_tokens={usage.prompt_tokens_details.audio_tokens}, "
f"cached_tokens={usage.prompt_tokens_details.cached_tokens}, "
f"text_tokens={usage.prompt_tokens_details.text_tokens}, "
f"image_tokens={usage.prompt_tokens_details.image_tokens})))")
print(
f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', "
f"created={getattr(final_chunk, 'created', 'N/A')}, "
f"model='{getattr(final_chunk, 'model', self.get_current_model())}', "
f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', "
f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', "
f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', "
f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, "
f"function_call=None, tool_calls=None, audio=None), logprobs=None)], "
f"provider_specific_fields=None, "
f"usage=Usage(completion_tokens={usage.completion_tokens}, "
f"prompt_tokens={usage.prompt_tokens}, "
f"total_tokens={usage.total_tokens}, "
f"completion_tokens_details=CompletionTokensDetailsWrapper("
f"accepted_prediction_tokens={usage.completion_tokens_details.accepted_prediction_tokens}, "
f"audio_tokens={usage.completion_tokens_details.audio_tokens}, "
f"reasoning_tokens={usage.completion_tokens_details.reasoning_tokens}, "
f"rejected_prediction_tokens={usage.completion_tokens_details.rejected_prediction_tokens}, "
f"text_tokens={usage.completion_tokens_details.text_tokens}), "
f"prompt_tokens_details=PromptTokensDetailsWrapper("
f"audio_tokens={usage.prompt_tokens_details.audio_tokens}, "
f"cached_tokens={usage.prompt_tokens_details.cached_tokens}, "
f"text_tokens={usage.prompt_tokens_details.text_tokens}, "
f"image_tokens={usage.prompt_tokens_details.image_tokens})))"
)
else:
print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', "
f"created={getattr(final_chunk, 'created', 'N/A')}, "
f"model='{getattr(final_chunk, 'model', self.get_current_model())}', "
f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', "
f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', "
f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', "
f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, "
f"function_call=None, tool_calls=None, audio=None), logprobs=None)], "
f"provider_specific_fields=None)")
print(
f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', "
f"created={getattr(final_chunk, 'created', 'N/A')}, "
f"model='{getattr(final_chunk, 'model', self.get_current_model())}', "
f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', "
f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', "
f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', "
f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, "
f"function_call=None, tool_calls=None, audio=None), logprobs=None)], "
f"provider_specific_fields=None)"
)
self.llm.stream = original_stream
return complete_response

@ -2,7 +2,16 @@ import concurrent.futures
import json
import os
import traceback
from typing import Any, Callable, Dict, List, Literal, Optional, Union, get_args
from typing import (
Any,
Callable,
Dict,
List,
Literal,
Optional,
Union,
get_args,
)
from pydantic import BaseModel, Field

Loading…
Cancel
Save