18 KiB
AgentRearrange
Class
The AgentRearrange
class represents a swarm of agents for rearranging tasks. It allows you to create a swarm of agents, add or remove agents from the swarm, and run the swarm to process tasks based on a specified flow pattern. The class now includes sequential awareness features that allow agents to know about the agents ahead and behind them in sequential flows.
Attributes
Attribute | Type | Description |
---|---|---|
id |
str |
Unique identifier for the swarm |
name |
str |
Name of the swarm |
description |
str |
Description of the swarm's purpose |
agents |
dict |
Dictionary mapping agent names to Agent objects |
flow |
str |
Flow pattern defining task execution order |
max_loops |
int |
Maximum number of execution loops |
verbose |
bool |
Whether to enable verbose logging |
memory_system |
BaseVectorDatabase |
Memory system for storing agent interactions |
human_in_the_loop |
bool |
Whether human intervention is enabled |
custom_human_in_the_loop |
Callable |
Custom function for human intervention |
output_type |
OutputType |
Format of output ("all", "final", "list", or "dict") |
autosave |
bool |
Whether to automatically save agent data |
rules |
str |
Custom rules to add to the conversation |
team_awareness |
bool |
Whether to enable team awareness and sequential flow information |
time_enabled |
bool |
Whether to enable timestamps in conversation |
message_id_on |
bool |
Whether to enable message IDs in conversation |
Methods
__init__(self, id: str = swarm_id(), name: str = "AgentRearrange", description: str = "A swarm of agents for rearranging tasks.", agents: List[Union[Agent, Callable]] = None, flow: str = None, max_loops: int = 1, verbose: bool = True, memory_system: Any = None, human_in_the_loop: bool = False, custom_human_in_the_loop: Optional[Callable[[str], str]] = None, output_type: OutputType = "all", autosave: bool = True, rules: str = None, team_awareness: bool = False, time_enabled: bool = False, message_id_on: bool = False, *args, **kwargs)
Initializes the AgentRearrange
object with enhanced sequential awareness capabilities.
Parameter | Type | Description |
---|---|---|
id |
str (optional) |
Unique identifier for the swarm. Defaults to auto-generated UUID. |
name |
str (optional) |
Name of the swarm. Defaults to "AgentRearrange". |
description |
str (optional) |
Description of the swarm's purpose. Defaults to "A swarm of agents for rearranging tasks.". |
agents |
List[Union[Agent, Callable]] (optional) |
A list of Agent objects or callables. Defaults to None . |
flow |
str (optional) |
The flow pattern of the tasks. Defaults to None . |
max_loops |
int (optional) |
The maximum number of loops for the agents to run. Defaults to 1 . |
verbose |
bool (optional) |
Whether to enable verbose logging or not. Defaults to True . |
memory_system |
Any (optional) |
Memory system for storing agent interactions. Defaults to None . |
human_in_the_loop |
bool (optional) |
Whether human intervention is enabled. Defaults to False . |
custom_human_in_the_loop |
Callable[[str], str] (optional) |
Custom function for human intervention. Defaults to None . |
output_type |
OutputType (optional) |
Format of output. Defaults to "all" . |
autosave |
bool (optional) |
Whether to automatically save agent data. Defaults to True . |
rules |
str (optional) |
Custom rules to add to the conversation. Defaults to None . |
team_awareness |
bool (optional) |
Whether to enable team awareness and sequential flow information. Defaults to False . |
time_enabled |
bool (optional) |
Whether to enable timestamps in conversation. Defaults to False . |
message_id_on |
bool (optional) |
Whether to enable message IDs in conversation. Defaults to False . |
add_agent(self, agent: Agent)
Adds an agent to the swarm.
Parameter | Type | Description |
---|---|---|
agent |
Agent |
The agent to be added. |
remove_agent(self, agent_name: str)
Removes an agent from the swarm.
Parameter | Type | Description |
---|---|---|
agent_name |
str |
The name of the agent to be removed. |
add_agents(self, agents: List[Agent])
Adds multiple agents to the swarm.
Parameter | Type | Description |
---|---|---|
agents |
List[Agent] |
A list of Agent objects. |
validate_flow(self)
Validates the flow pattern.
Raises:
ValueError
: If the flow pattern is incorrectly formatted or contains duplicate agent names.
Returns:
bool
:True
if the flow pattern is valid.
Sequential Awareness Methods
get_agent_sequential_awareness(self, agent_name: str) -> str
Gets the sequential awareness information for a specific agent, showing which agents come before and after in the sequence.
Parameter | Type | Description |
---|---|---|
agent_name |
str |
The name of the agent to get awareness for. |
Returns:
str
: A string describing the agents ahead and behind in the sequence.
Example:
awareness = agent_system.get_agent_sequential_awareness("Agent2")
# Returns: "Sequential awareness: Agent ahead: Agent1 | Agent behind: Agent3"
get_sequential_flow_structure(self) -> str
Gets the overall sequential flow structure information showing the complete workflow with relationships between agents.
Returns:
str
: A string describing the complete sequential flow structure.
Example:
flow_structure = agent_system.get_sequential_flow_structure()
# Returns: "Sequential Flow Structure:
# Step 1: Agent1
# Step 2: Agent2 (follows: Agent1) (leads to: Agent3)
# Step 3: Agent3 (follows: Agent2)"
run(self, task: str = None, img: str = None, *args, **kwargs)
Executes the agent rearrangement task with specified compute resources.
Parameter | Type | Description |
---|---|---|
task |
str (optional) |
The task to execute. Defaults to None . |
img |
str (optional) |
Path to input image if required. Defaults to None . |
*args |
- | Additional positional arguments passed to _run() . |
**kwargs |
- | Additional keyword arguments passed to _run() . |
Returns:
- The result from executing the task through the cluster operations wrapper.
batch_run(self, tasks: List[str], img: Optional[List[str]] = None, batch_size: int = 10, *args, **kwargs)
Process multiple tasks in batches.
Parameter | Type | Description |
---|---|---|
tasks |
List[str] |
List of tasks to process |
img |
List[str] (optional) |
Optional list of images corresponding to tasks |
batch_size |
int |
Number of tasks to process simultaneously |
*args |
- | Additional positional arguments |
**kwargs |
- | Additional keyword arguments |
Returns:
List[str]
: List of results corresponding to input tasks
concurrent_run(self, tasks: List[str], img: Optional[List[str]] = None, max_workers: Optional[int] = None, *args, **kwargs)
Process multiple tasks concurrently using ThreadPoolExecutor.
Parameter | Type | Description |
---|---|---|
tasks |
List[str] |
List of tasks to process |
img |
List[str] (optional) |
Optional list of images corresponding to tasks |
max_workers |
int (optional) |
Maximum number of worker threads |
*args |
- | Additional positional arguments |
**kwargs |
- | Additional keyword arguments |
Returns:
List[str]
: List of results corresponding to input tasks
Sequential Awareness Feature
The AgentRearrange
class now includes a powerful sequential awareness feature that enhances agent collaboration in sequential workflows. When agents are executed sequentially, they automatically receive information about:
- Agent ahead: The agent that completed their task before them
- Agent behind: The agent that will receive their output next
This feature is automatically enabled when using sequential flows and provides agents with context about their position in the workflow, improving coordination and task understanding.
How It Works
- Automatic Detection: The system automatically detects when agents are running sequentially vs. in parallel
- Context Injection: Before each sequential agent runs, awareness information is added to the conversation
- Enhanced Collaboration: Agents can reference previous agents' work and prepare output for the next agent
Example with Sequential Awareness
from swarms import Agent, AgentRearrange
# Create agents
agent1 = Agent(agent_name="Researcher", system_prompt="Research the topic")
agent2 = Agent(agent_name="Writer", system_prompt="Write based on research")
agent3 = Agent(agent_name="Editor", system_prompt="Edit the written content")
# Create sequential workflow
workflow = AgentRearrange(
agents=[agent1, agent2, agent3],
flow="Researcher -> Writer -> Editor",
team_awareness=True # Enables sequential awareness
)
# Run the workflow
result = workflow.run("Research and write about artificial intelligence")
What happens automatically:
- Researcher runs first (no awareness info needed)
- Writer receives: "Sequential awareness: Agent ahead: Researcher | Agent behind: Editor"
- Editor receives: "Sequential awareness: Agent ahead: Writer"
Documentation for rearrange
Function
======================================
The rearrange
function is a helper function that rearranges the given list of agents based on the specified flow.
Parameters
Parameter | Type | Description |
---|---|---|
name |
str (optional) |
Name for the agent system. Defaults to None . |
description |
str (optional) |
Description for the agent system. Defaults to None . |
agents |
List[Agent] |
The list of agents to be rearranged. |
flow |
str |
The flow used for rearranging the agents. |
task |
str (optional) |
The task to be performed during rearrangement. Defaults to None . |
img |
str (optional) |
Path to input image if required. Defaults to None . |
*args |
- | Additional positional arguments. |
**kwargs |
- | Additional keyword arguments. |
Returns
The result of running the agent system with the specified task.
Example
agents = [agent1, agent2, agent3]
flow = "agent1 -> agent2, agent3"
task = "Perform a task"
rearrange(agents, flow, task)
Example Usage
Here's an example of how to use the AgentRearrange
class and the rearrange
function with the new sequential awareness features:
from swarms import Agent, AgentRearrange
from typing import List
# Initialize the director agent
director = Agent(
agent_name="Accounting Director",
system_prompt="Directs the accounting tasks for the workers",
llm=Anthropic(),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="accounting_director.json",
)
# Initialize worker 1
worker1 = Agent(
agent_name="Accountant 1",
system_prompt="Processes financial transactions and prepares financial statements",
llm=Anthropic(),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="accountant1.json",
)
# Initialize worker 2
worker2 = Agent(
agent_name="Accountant 2",
system_prompt="Performs audits and ensures compliance with financial regulations",
llm=Anthropic(),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="accountant2.json",
)
# Create a list of agents
agents = [director, worker1, worker2]
# Define the flow pattern
flow = "Accounting Director -> Accountant 1 -> Accountant 2"
# Using AgentRearrange class with sequential awareness
agent_system = AgentRearrange(
agents=agents,
flow=flow,
team_awareness=True, # Enables sequential awareness
time_enabled=True, # Enable timestamps
message_id_on=True # Enable message IDs
)
# Get sequential flow information
flow_structure = agent_system.get_sequential_flow_structure()
print("Flow Structure:", flow_structure)
# Get awareness for specific agents
worker1_awareness = agent_system.get_agent_sequential_awareness("Accountant 1")
print("Worker1 Awareness:", worker1_awareness)
# Run the workflow
output = agent_system.run("Process monthly financial statements")
print(output)
In this example, we first initialize three agents: director
, worker1
, and worker2
. Then, we create a list of these agents and define the flow pattern "Director -> Worker1 -> Worker2"
.
The new sequential awareness features provide:
- Automatic context: Each agent knows who came before and who comes after
- Better coordination: Agents can reference previous work and prepare for next steps
- Flow visualization: You can see the complete workflow structure
- Enhanced logging: Better tracking of agent interactions
Error Handling
The AgentRearrange
class includes error handling mechanisms to validate the flow pattern. If the flow pattern is incorrectly formatted or contains duplicate agent names, a ValueError
will be raised with an appropriate error message.
Example:
# Invalid flow pattern
invalid_flow = "Director->Worker1,Worker2->Worker3"
agent_system = AgentRearrange(agents=agents, flow=invalid_flow)
output = agent_system.run("Some task")`
This will raise a ValueError
with the message "Agent 'Worker3' is not registered."
.
Parallel and Sequential Processing
The AgentRearrange
class supports both parallel and sequential processing of tasks based on the specified flow pattern. If the flow pattern includes multiple agents separated by commas (e.g., "agent1, agent2"
), the agents will be executed in parallel, and their outputs will be concatenated. If the flow pattern includes a single agent, it will be executed sequentially with enhanced awareness.
Parallel processing
parallel_flow = "Worker1, Worker2 -> Director"
Sequential processing with awareness
sequential_flow = "Worker1 -> Worker2 -> Director"
In the parallel_flow
example, Worker1
and Worker2
will be executed in parallel, and their outputs will be concatenated and passed to Director
.
In the sequential_flow
example, Worker1
will be executed first, then Worker2
will receive awareness that Worker1
came before and Director
comes after, and finally Director
will receive awareness that Worker2
came before.
Logging and Monitoring
The AgentRearrange
class includes comprehensive logging capabilities using the loguru
library. The new sequential awareness features add enhanced logging:
2023-05-08 10:30:15.456 | INFO | agent_rearrange:__init__:34 - Adding agent Director to the swarm.
2023-05-08 10:30:15.457 | INFO | agent_rearrange:__init__:34 - Adding agent Worker1 to the swarm.
2023-05-08 10:30:15.457 | INFO | agent_rearrange:__init__:34 - Adding agent Worker2 to the swarm.
2023-05-08 10:30:15.458 | INFO | agent_rearrange:run:118 - Running agents in parallel: ['Worker1', 'Worker2']
2023-05-08 10:30:15.459 | INFO | agent_rearrange:run:121 - Running agents sequentially: ['Director']
2023-05-08 10:30:15.460 | INFO | agent_rearrange:run:125 - Added sequential awareness for Worker2: Sequential awareness: Agent ahead: Worker1 | Agent behind: Director
Additional Parameters
The AgentRearrange
class now accepts additional parameters for enhanced functionality:
agent_system = AgentRearrange(
agents=agents,
flow=flow,
team_awareness=True, # Enable sequential awareness
time_enabled=True, # Enable conversation timestamps
message_id_on=True, # Enable message IDs
verbose=True # Enable detailed logging
)
Customization
The AgentRearrange
class and the rearrange
function can be customized and extended to suit specific use cases. The new sequential awareness features provide a foundation for building more sophisticated agent coordination systems.
Limitations
It's important to note that the AgentRearrange
class and the rearrange
function rely on the individual agents to process tasks correctly. The quality of the output will depend on the capabilities and configurations of the agents used in the swarm.
The sequential awareness feature works best with agents that can understand and utilize context about their position in the workflow.
Conclusion
The AgentRearrange
class and the rearrange
function provide a flexible and extensible framework for orchestrating swarms of agents to process tasks based on a specified flow pattern. The new sequential awareness features significantly enhance agent collaboration by providing context about workflow relationships.
By combining the capabilities of individual agents with enhanced awareness of their position in the workflow, you can create more intelligent and coordinated multi-agent systems that understand not just their individual tasks, but also their role in the larger workflow.
Whether you're working on natural language processing tasks, data analysis, or any other domain where agent-based systems can be beneficial, the enhanced AgentRearrange
class provides a solid foundation for building sophisticated swarm-based solutions with improved coordination and context awareness.