[FEATS][RoundRobinSwarm]

pull/468/head
Kye 8 months ago
parent 860df09bc1
commit b9cd77fbdd

@ -154,6 +154,7 @@ nav:
- SwarmNetwork: "swarms/structs/swarmnetwork.md" - SwarmNetwork: "swarms/structs/swarmnetwork.md"
- MajorityVoting: "swarms/structs/majorityvoting.md" - MajorityVoting: "swarms/structs/majorityvoting.md"
- AgentRearrange: "swarms/structs/agent_rearrange.md" - AgentRearrange: "swarms/structs/agent_rearrange.md"
- RoundRobin: "swarms/structs/round_robin_swarm.md"
- swarms.memory: - swarms.memory:
- Building Custom Vector Memory Databases with the BaseVectorDatabase Class: "swarms/memory/diy_memory.md" - Building Custom Vector Memory Databases with the BaseVectorDatabase Class: "swarms/memory/diy_memory.md"
- ShortTermMemory: "swarms/memory/short_term_memory.md" - ShortTermMemory: "swarms/memory/short_term_memory.md"

@ -0,0 +1,127 @@
# RoundRobin: Round-Robin Task Execution in a Swarm
## Introduction
The `RoundRobinSwarm` class is designed to manage and execute tasks among multiple agents in a round-robin fashion. This approach ensures that each agent in a swarm receives an equal opportunity to execute tasks, which promotes fairness and efficiency in distributed systems. It is particularly useful in environments where collaborative, sequential task execution is needed among various agents.
## Conceptual Overview
### What is Round-Robin?
Round-robin is a scheduling technique commonly used in computing for managing processes in shared systems. It involves assigning a fixed time slot to each process and cycling through all processes in a circular order without prioritization. In the context of swarms of agents, this method ensures equitable distribution of tasks and resource usage among all agents.
### Application in Swarms
In swarms, `RoundRobinSwarm` utilizes the round-robin scheduling to manage tasks among agents like software components, autonomous robots, or virtual entities. This strategy is beneficial where tasks are interdependent or require sequential processing.
## Class Attributes
- `agents (List[Agent])`: List of agents participating in the swarm.
- `verbose (bool)`: Enables or disables detailed logging of swarm operations.
- `max_loops (int)`: Limits the number of times the swarm cycles through all agents.
- `index (int)`: Maintains the current position in the agent list to ensure round-robin execution.
## Methods
### `__init__`
Initializes the swarm with the provided list of agents, verbosity setting, and operational parameters.
**Parameters:**
- `agents`: Optional list of agents in the swarm.
- `verbose`: Boolean flag for detailed logging.
- `max_loops`: Maximum number of execution cycles.
- `callback`: Optional function called after each loop.
### `run`
Executes a specified task across all agents in a round-robin manner, cycling through each agent repeatedly for the number of specified loops.
**Conceptual Behavior:**
- Distribute the task sequentially among all agents starting from the current index.
- Each agent processes the task and potentially modifies it or produces new output.
- After an agent completes its part of the task, the index moves to the next agent.
- This cycle continues until the specified maximum number of loops is completed.
- Optionally, a callback function can be invoked after each loop to handle intermediate results or perform additional actions.
## Examples
### Example 1: Load Balancing Among Servers
In this example, `RoundRobinSwarm` is used to distribute network requests evenly among a group of servers. This is common in scenarios where load balancing is crucial for maintaining system responsiveness and scalability.
```python
from swarms.structs.round_robin import RoundRobinSwarm
from swarms import Agent
# Define server agents
server1 = Agent(agent_name="Server1", system_prompt="Handle network requests")
server2 = Agent(agent_name="Server2", system_prompt="Handle network requests")
server3 = Agent(agent_name="Server3", system_prompt="Handle network requests")
# Initialize the swarm with server agents
network_load_balancer = RoundRobinSwarm(agents=[server1, server2, server3], verbose=True)
# Define a network request task
task = "Process incoming network request"
# Simulate processing of multiple requests
for _ in range(10): # Assume 10 incoming requests
results = network_load_balancer.run(task)
print("Request processed:", results)
```
### Example 2: Document Review Process
This example demonstrates how `RoundRobinSwarm` can be used to distribute parts of a document among different reviewers in a sequential manner, ensuring that each part of the document is reviewed by different agents.
```python
from swarms.structs.round_robin import RoundRobinSwarm
from swarms import Agent
# Define reviewer agents
reviewer1 = Agent(agent_name="Reviewer1", system_prompt="Review document section")
reviewer2 = Agent(agent_name="Reviewer2", system_prompt="Review document section")
reviewer3 = Agent(agent_name="Reviewer3", system_prompt="Review document section")
# Initialize the swarm with reviewer agents
document_review_swarm = RoundRobinSwarm(agents=[reviewer1, reviewer2, reviewer3], verbose=True)
# Define a document review task
task = "Review section of the document"
# Distribute sections of the document to different reviewers
for section in range(5): # Assume the document has 5 sections
results = document_review_swarm.run(task)
print(f"Section {section + 1} reviewed:", results)
```
### Example 3: Multi-Stage Data Processing
In this scenario, `RoundRobinSwarm` facilitates a multi-stage data processing pipeline where data passes through multiple agents, each performing a specific type of data processing in sequence.
```python
from swarms.structs.round_robin import RoundRobinSwarm
from swarms import Agent
# Define data processing agents
preprocessor = Agent(agent_name="Preprocessor", system_prompt="Preprocess data")
analyzer = Agent(agent_name="Analyzer", system_prompt="Analyze data")
summarizer = Agent(agent_name="Summarizer", system_prompt="Summarize analysis results")
# Initialize the swarm with data processing agents
data_processing_swarm = RoundRobinSwarm(agents=[preprocessor, analyzer, summarizer], verbose=True)
# Define a data processing task
task = "Initial raw data"
# Run the data through the processing pipeline
results = data_processing_swarm.run(task)
print("Final results from data processing:", results)
```
These examples provide a glimpse into how the `RoundRobinSwarm` class can be adapted to various domains and applications, showcasing its versatility in managing tasks and resources in a distributed environment.
```
## Conclusion
The RoundRobinSwarm class provides a robust and flexible framework for managing tasks among multiple agents in a fair and efficient manner. This class is especially useful in environments where tasks need to be distributed evenly among a group of agents, ensuring that all tasks are handled timely and effectively. Through the round-robin algorithm, each agent in the swarm is guaranteed an equal opportunity to contribute to the overall task, promoting efficiency and collaboration.

@ -0,0 +1,58 @@
from swarms import Agent, Anthropic
from swarms.structs.round_robin import RoundRobinSwarm
# Initialize the director agent
director = Agent(
agent_name="Director",
system_prompt="Directs the tasks for the workers",
llm=Anthropic(),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="director.json",
)
# Initialize worker 1
worker1 = Agent(
agent_name="Worker1",
system_prompt="Generates a transcript for a youtube video on what swarms are",
llm=Anthropic(),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker1.json",
)
# Initialize worker 2
worker2 = Agent(
agent_name="Worker2",
system_prompt="Summarizes the transcript generated by Worker1",
llm=Anthropic(),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker2.json",
)
# Round Robin
round_table = RoundRobinSwarm(
agents=[director, worker1, worker2],
verbose=True,
max_loops=1,
callback=None,
)
# Run the task and get the results
task = "Create a format to express and communicate swarms of llms in a structured manner for youtube"
results = round_table.run(task)
print("Round Robin Results:", results)

@ -84,6 +84,7 @@ from swarms.structs.yaml_model import (
YamlModel, YamlModel,
) )
from swarms.structs.message_pool import MessagePool from swarms.structs.message_pool import MessagePool
from swarms.structs.round_robin import RoundRobinSwarm
__all__ = [ __all__ = [
"Agent", "Agent",
@ -156,4 +157,5 @@ __all__ = [
"YamlModel", "YamlModel",
"MessagePool", "MessagePool",
"rearrange", "rearrange",
"RoundRobinSwarm",
] ]

@ -0,0 +1,87 @@
from swarms.structs.base_swarm import BaseSwarm
from typing import List
from swarms.structs.agent import Agent
from swarms.utils.loguru_logger import logger
class RoundRobinSwarm(BaseSwarm):
"""
A swarm implementation that executes tasks in a round-robin fashion.
Args:
agents (List[Agent], optional): List of agents in the swarm. Defaults to None.
verbose (bool, optional): Flag to enable verbose mode. Defaults to False.
max_loops (int, optional): Maximum number of loops to run. Defaults to 1.
callback (callable, optional): Callback function to be called after each loop. Defaults to None.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Attributes:
agents (List[Agent]): List of agents in the swarm.
verbose (bool): Flag to enable verbose mode.
max_loops (int): Maximum number of loops to run.
index (int): Current index of the agent being executed.
Methods:
run(task: str, *args, **kwargs) -> Any: Executes the given task on the agents in a round-robin fashion.
"""
def __init__(
self,
agents: List[Agent] = None,
verbose: bool = False,
max_loops: int = 1,
callback: callable = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.agents = agents
self.verbose = verbose
self.max_loops = max_loops
self.callback = callback
self.index = 0
def run(self, task: str, *args, **kwargs):
"""
Executes the given task on the agents in a round-robin fashion.
Args:
task (str): The task to be executed.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
Any: The result of the task execution.
Raises:
Exception: If an exception occurs during task execution.
"""
try:
result = task
n = len(self.agents)
logger.info(f"Running the task {task} on {n} agents.")
for loop in range(self.max_loops):
for _ in range(n):
current_agent = self.agents[self.index]
try:
logger.info(f"Running Agent {current_agent.agent_name} on task {result}")
result = current_agent.run(result, *args, **kwargs)
except Exception as e:
logger.error(
f"Handling an exception for {current_agent.name}: {e}"
)
raise e
finally:
self.index = (
self.index + 1
) % n # Increment and wrap around the index
if self.callback:
logger.info(f"Calling the callback function for loop {loop}")
self.callback(loop, result)
return result
except Exception as e:
logger.error(f"An error occurred: {e}")
return e

@ -0,0 +1,20 @@
import pytest
from swarms.structs.round_robin import RoundRobinSwarm
from swarms.structs.agent import Agent
@pytest.fixture
def round_robin_swarm():
agents = [Agent(name=f"Agent{i}") for i in range(3)]
return RoundRobinSwarm(agents=agents, verbose=True, max_loops=2)
def test_init(round_robin_swarm):
assert isinstance(round_robin_swarm, RoundRobinSwarm)
assert round_robin_swarm.verbose == True
assert round_robin_swarm.max_loops == 2
assert len(round_robin_swarm.agents) == 3
def test_run(round_robin_swarm):
task = "test_task"
result = round_robin_swarm.run(task)
assert result == task
assert round_robin_swarm.index == 0
Loading…
Cancel
Save