[FEAT][SwarmNetwork] [Task][handle_scheduled_task, triggers, actions, conditions, ]

pull/336/head
Kye 1 year ago
parent c11a2f467c
commit 70d55e94bc

@ -555,6 +555,67 @@ def interactive_conversation(llm):
# Replace with your LLM instance
interactive_conversation(llm)
```
### `SwarmNetwork`
- Efficient Task Management: SwarmNetwork's intelligent agent pool and task queue management system ensures tasks are distributed evenly across agents. This leads to efficient use of resources and faster task completion.
- Scalability: SwarmNetwork can dynamically scale the number of agents based on the number of pending tasks. This means it can handle an increase in workload by adding more agents, and conserve resources when the workload is low by reducing the number of agents.
- Versatile Deployment Options: With SwarmNetwork, each agent can be run on its own thread, process, container, machine, or even cluster. This provides a high degree of flexibility and allows for deployment that best suits the user's needs and infrastructure.
```python
import os
from dotenv import load_dotenv
# Import the OpenAIChat model and the Agent struct
from swarms.models import OpenAIChat
from swarms.structs import Agent
from swarms.structs.swarm_net import SwarmNetwork
# Load the environment variables
load_dotenv()
# Get the API key from the environment
api_key = os.environ.get("OPENAI_API_KEY")
# Initialize the language model
llm = OpenAIChat(
temperature=0.5,
openai_api_key=api_key,
)
## Initialize the workflow
agent = Agent(llm=llm, max_loops=1, agent_name="Social Media Manager")
agent2 = Agent(llm=llm, max_loops=1, agent_name=" Product Manager")
agent3 = Agent(llm=llm, max_loops=1, agent_name="SEO Manager")
# Load the swarmnet with the agents
swarmnet = SwarmNetwork(
agents=[agent, agent2, agent3],
)
# List the agents in the swarm network
out = swarmnet.list_agents()
print(out)
# Run the workflow on a task
out = swarmnet.run_single_agent(
agent2.id, "Generate a 10,000 word blog on health and wellness."
)
print(out)
# Run all the agents in the swarm network on a task
out = swarmnet.run_many_agents(
"Generate a 10,000 word blog on health and wellness."
)
print(out)
```
---

@ -1,12 +0,0 @@
- pdf_to_text: "swarms/utils/pdf_to_text.md"
- load_model_torch: "swarms/utils/load_model_torch.md"
- metrics_decorator: "swarms/utils/metrics_decorator.md"
- prep_torch_inference: "swarms/utils/prep_torch_inference.md"
- find_image_path: "swarms/utils/find_image_path.md"
- print_class_parameters: "swarms/utils/print_class_parameters.md"
- extract_code_from_markdown: "swarms/utils/extract_code_from_markdown.md"
- check_device: "swarms/utils/check_device.md"
- display_markdown_message: "swarms/utils/display_markdown_message.md"
- phoenix_tracer: "swarms/utils/phoenix_tracer.md"
- limit_tokens_from_string: "swarms/utils/limit_tokens_from_string.md"
- math_eval: "swarms/utils/math_eval.md"

@ -22,7 +22,9 @@ def generate_file_list(directory, output_file):
# Remove the file extension
file_name, _ = os.path.splitext(file)
# Write the file name and path to the output file
f.write(f'- {file_name}: "swarms/utils/{file_path}"\n')
f.write(
f'- {file_name}: "swarms/utils/{file_path}"\n'
)
# Use the function to generate the file list

@ -0,0 +1,48 @@
import os
from dotenv import load_dotenv
# Import the OpenAIChat model and the Agent struct
from swarms.models import OpenAIChat
from swarms.structs import Agent
from swarms.structs.swarm_net import SwarmNetwork
# Load the environment variables
load_dotenv()
# Get the API key from the environment
api_key = os.environ.get("OPENAI_API_KEY")
# Initialize the language model
llm = OpenAIChat(
temperature=0.5,
openai_api_key=api_key,
)
## Initialize the workflow
agent = Agent(llm=llm, max_loops=1, agent_name="Social Media Manager")
agent2 = Agent(llm=llm, max_loops=1, agent_name=" Product Manager")
agent3 = Agent(llm=llm, max_loops=1, agent_name="SEO Manager")
# Load the swarmnet with the agents
swarmnet = SwarmNetwork(
agents=[agent, agent2, agent3],
)
# List the agents in the swarm network
out = swarmnet.list_agents()
print(out)
# Run the workflow on a task
out = swarmnet.run_single_agent(
agent2.id, "Generate a 10,000 word blog on health and wellness."
)
print(out)
# Run all the agents in the swarm network on a task
out = swarmnet.run_many_agents(
"Generate a 10,000 word blog on health and wellness."
)
print(out)

@ -1,6 +1,4 @@
from swarms.utils.disable_logging import disable_logging
disable_logging()
# disable_logging()
from swarms.agents import * # noqa: E402, F403
from swarms.swarms import * # noqa: E402, F403

@ -8,6 +8,7 @@ from swarms.structs.schemas import (
ArtifactUpload,
StepInput,
)
from swarms.structs.swarm_net import SwarmNetwork
__all__ = [
"Agent",
@ -18,4 +19,5 @@ __all__ = [
"Artifact",
"ArtifactUpload",
"StepInput",
"SwarmNetwork",
]

@ -63,98 +63,95 @@ class Agent:
* Ability to provide a loop interval
Args:
id (str): The id of the agent
llm (Any): The language model to use
template (Optional[str]): The template to use
max_loops (int): The maximum number of loops
stopping_condition (Optional[Callable[[str], bool]]): The stopping condition
template (str): The template to use
max_loops (int): The maximum number of loops to run
stopping_condition (Callable): The stopping condition to use
loop_interval (int): The loop interval
retry_attempts (int): The retry attempts
retry_attempts (int): The number of retry attempts
retry_interval (int): The retry interval
return_history (bool): Return the history
stopping_token (str): The stopping token
dynamic_loops (Optional[bool]): Dynamic loops
interactive (bool): Interactive mode
dashboard (bool): Dashboard mode
dynamic_loops (bool): Enable dynamic loops
interactive (bool): Enable interactive mode
dashboard (bool): Enable dashboard
agent_name (str): The name of the agent
agent_description (str): The description of the agent
system_prompt (str): The system prompt
tools (List[BaseTool]): The tools
dynamic_temperature_enabled (Optional[bool]): Dynamic temperature enabled
sop (Optional[str]): The standard operating procedure
sop_list (Optional[List[str]]): The standard operating procedure list
saved_state_path (Optional[str]): The saved state path
autosave (Optional[bool]): Autosave
context_length (Optional[int]): The context length
tools (List[BaseTool]): The tools to use
dynamic_temperature_enabled (bool): Enable dynamic temperature
sop (str): The standard operating procedure
sop_list (List[str]): The standard operating procedure list
saved_state_path (str): The path to the saved state
autosave (bool): Autosave the state
context_length (int): The context length
user_name (str): The user name
self_healing_enabled (Optional[bool]): Self healing enabled
code_interpreter (Optional[bool]): Code interpreter
multi_modal (Optional[bool]): Multi modal
pdf_path (Optional[str]): The pdf path
list_of_pdf (Optional[str]): The list of pdf
tokenizer (Optional[Any]): The tokenizer
memory (Optional[VectorDatabase]): The memory
preset_stopping_token (Optional[bool]): Preset stopping token
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
self_healing_enabled (bool): Enable self healing
code_interpreter (bool): Enable code interpreter
multi_modal (bool): Enable multimodal
pdf_path (str): The path to the pdf
list_of_pdf (str): The list of pdf
tokenizer (Any): The tokenizer
memory (VectorDatabase): The memory
preset_stopping_token (bool): Enable preset stopping token
traceback (Any): The traceback
traceback_handlers (Any): The traceback handlers
streaming_on (bool): Enable streaming
Methods:
run(task: str, **kwargs: Any): Run the agent on a task
run_concurrent(tasks: List[str], **kwargs: Any): Run the agent on a list of tasks concurrently
bulk_run(inputs: List[Dict[str, Any]]): Run the agent on a list of inputs
from_llm_and_template(llm: Any, template: str): Create AgentStream from LLM and a string template.
from_llm_and_template_file(llm: Any, template_file: str): Create AgentStream from LLM and a template file.
save(file_path): Save the agent history to a file
load(file_path): Load the agent history from a file
validate_response(response: str): Validate the response based on certain criteria
print_history_and_memory(): Print the entire history and memory of the agent
step(task: str, **kwargs): Executes a single step in the agent interaction, generating a response from the language model based on the given input text.
graceful_shutdown(): Gracefully shutdown the system saving the state
run_with_timeout(task: str, timeout: int): Run the loop but stop if it takes longer than the timeout
analyze_feedback(): Analyze the feedback for issues
undo_last(): Response the last response and return the previous state
add_response_filter(filter_word: str): Add a response filter to filter out certain words from the response
apply_reponse_filters(response: str): Apply the response filters to the response
filtered_run(task: str): Filtered run
interactive_run(max_loops: int): Interactive run mode
streamed_generation(prompt: str): Stream the generation of the response
get_llm_params(): Extracts and returns the parameters of the llm object for serialization.
save_state(file_path: str): Saves the current state of the agent to a JSON file, including the llm parameters.
load_state(file_path: str): Loads the state of the agent from a json file and restores the configuration and memory.
retry_on_failure(function, retries: int = 3, retry_delay: int = 1): Retry wrapper for LLM calls.
run_code(response: str): Run the code in the response
construct_dynamic_prompt(): Construct the dynamic prompt
extract_tool_commands(text: str): Extract the tool commands from the text
parse_and_execute_tools(response: str): Parse and execute the tools
execute_tools(tool_name, params): Execute the tool with the provided params
truncate_history(): Take the history and truncate it to fit into the model context length
add_task_to_memory(task: str): Add the task to the memory
add_message_to_memory(message: str): Add the message to the memory
add_message_to_memory_and_truncate(message: str): Add the message to the memory and truncate
print_dashboard(task: str): Print dashboard
activate_autonomous_agent(): Print the autonomous agent activation message
dynamic_temperature(): Dynamically change the temperature
_check_stopping_condition(response: str): Check if the stopping condition is met
format_prompt(template, **kwargs: Any): Format the template with the provided kwargs using f-string interpolation.
get_llm_init_params(): Get LLM init params
get_tool_description(): Get the tool description
find_tool_by_name(name: str): Find a tool by name
Example:
run: Run the agent
run_concurrent: Run the agent concurrently
bulk_run: Run the agent in bulk
save: Save the agent
load: Load the agent
validate_response: Validate the response
print_history_and_memory: Print the history and memory
step: Step through the agent
graceful_shutdown: Gracefully shutdown the agent
run_with_timeout: Run the agent with a timeout
analyze_feedback: Analyze the feedback
undo_last: Undo the last response
add_response_filter: Add a response filter
apply_response_filters: Apply the response filters
filtered_run: Run the agent with filtered responses
interactive_run: Run the agent in interactive mode
streamed_generation: Stream the generation of the response
get_llm_params: Get the llm parameters
save_state: Save the state
load_state: Load the state
get_llm_init_params: Get the llm init parameters
get_tool_description: Get the tool description
find_tool_by_name: Find a tool by name
extract_tool_commands: Extract the tool commands
execute_tools: Execute the tools
parse_and_execute_tools: Parse and execute the tools
truncate_history: Truncate the history
add_task_to_memory: Add the task to the memory
add_message_to_memory: Add the message to the memory
add_message_to_memory_and_truncate: Add the message to the memory and truncate
parse_tool_docs: Parse the tool docs
print_dashboard: Print the dashboard
loop_count_print: Print the loop count
streaming: Stream the content
_history: Generate the history
_dynamic_prompt_setup: Setup the dynamic prompt
agent_system_prompt_2: Agent system prompt 2
run_async: Run the agent asynchronously
run_async_concurrent: Run the agent asynchronously and concurrently
run_async_concurrent: Run the agent asynchronously and concurrently
construct_dynamic_prompt: Construct the dynamic prompt
construct_dynamic_prompt: Construct the dynamic prompt
Examples:
>>> from swarms.models import OpenAIChat
>>> from swarms.structs import Agent
>>> llm = OpenAIChat(
... openai_api_key=api_key,
... temperature=0.5,
... )
>>> agent = Agent(
... llm=llm, max_loops=5,
... #system_prompt=SYSTEM_PROMPT,
... #retry_interval=1,
... )
>>> agent.run("Generate a 10,000 word blog")
>>> agent.save("path/agent.yaml")
>>> llm = OpenAIChat()
>>> agent = Agent(llm=llm, max_loops=1)
>>> response = agent.run("Generate a report on the financials.")
>>> print(response)
>>> # Generate a report on the financials.
"""
def __init__(
@ -172,7 +169,7 @@ class Agent:
dynamic_loops: Optional[bool] = False,
interactive: bool = False,
dashboard: bool = False,
agent_name: str = "Autonomous-Agent-XYZ1B",
agent_name: str = None,
agent_description: str = None,
system_prompt: str = AGENT_SYSTEM_PROMPT_3,
tools: List[BaseTool] = None,

@ -0,0 +1,320 @@
import asyncio
import logging
import queue
import threading
from typing import List, Optional
from fastapi import FastAPI
from swarms.structs.agent import Agent
from swarms.structs.base import BaseStructure
class SwarmNetwork(BaseStructure):
"""
SwarmNetwork class
The SwarmNetwork class is responsible for managing the agents pool
and the task queue. It also monitors the health of the agents and
scales the pool up or down based on the number of pending tasks
and the current load of the agents.
For example, if the number of pending tasks is greater than the
number of agents in the pool, the SwarmNetwork will scale up the
pool by adding new agents. If the number of pending tasks is less
than the number of agents in the pool, the SwarmNetwork will scale
down the pool by removing agents.
The SwarmNetwork class also provides a simple API for interacting
with the agents pool. The API is implemented using the Flask
framework and is enabled by default. The API can be disabled by
setting the `api_enabled` parameter to False.
Features:
- Agent pool management
- Task queue management
- Agent health monitoring
- Agent pool scaling
- Simple API for interacting with the agent pool
- Simple API for interacting with the task queue
- Simple API for interacting with the agent health monitor
- Simple API for interacting with the agent pool scaler
- Create APIs for each agent in the pool (optional)
- Run each agent on it's own thread
- Run each agent on it's own process
- Run each agent on it's own container
- Run each agent on it's own machine
- Run each agent on it's own cluster
"""
def __init__(
self,
idle_threshold: float = 0.2,
busy_threshold: float = 0.7,
agents: List[Agent] = None,
api_enabled: Optional[bool] = False,
logging_enabled: Optional[bool] = False,
*args,
**kwargs,
):
self.task_queue = queue.Queue()
self.idle_threshold = idle_threshold
self.busy_threshold = busy_threshold
self.lock = threading.Lock()
self.agents = agents
self.api_enabled = api_enabled
self.logging_enabled = logging_enabled
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
if api_enabled:
self.api = FastAPI()
self.agent_pool = []
def add_task(self, task):
"""Add task to the task queue
Args:
task (_type_): _description_
Example:
>>> from swarms.structs.agent import Agent
>>> from swarms.structs.swarm_net import SwarmNetwork
>>> agent = Agent()
>>> swarm = SwarmNetwork(agents=[agent])
>>> swarm.add_task("task")
"""
self.logger.info(f"Adding task {task} to queue")
try:
self.task_queue.put(task)
self.logger.info(f"Task {task} added to queue")
except Exception as error:
print(
f"Error adding task to queue: {error} try again with"
" a new task"
)
raise error
async def async_add_task(self, task):
"""Add task to the task queue
Args:
task (_type_): _description_
Example:
>>> from swarms.structs.agent import Agent
>>> from swarms.structs.swarm_net import SwarmNetwork
>>> agent = Agent()
>>> swarm = SwarmNetwork(agents=[agent])
>>> swarm.add_task("task")
"""
self.logger.info(
f"Adding task {task} to queue asynchronously"
)
try:
# Add task to queue asynchronously with asyncio
loop = asyncio.get_running_loop()
await loop.run_in_executor(
None, self.task_queue.put, task
)
self.logger.info(f"Task {task} added to queue")
except Exception as error:
print(
f"Error adding task to queue: {error} try again with"
" a new task"
)
raise error
def run_single_agent(
self, agent_id, task: Optional[str] = None, *args, **kwargs
):
"""Run agent the task on the agent id
Args:
agent_id (_type_): _description_
task (str, optional): _description_. Defaults to None.
Raises:
ValueError: _description_
Returns:
_type_: _description_
"""
self.logger.info(f"Running task {task} on agent {agent_id}")
try:
for agent in self.agents_pool:
if agent.id == agent_id:
return agent.run(task, *args, **kwargs)
self.logger.info(f"No agent found with ID {agent_id}")
raise ValueError(f"No agent found with ID {agent_id}")
except Exception as error:
print(f"Error running task on agent: {error}")
raise error
def run_many_agents(
self, task: Optional[str] = None, *args, **kwargs
) -> List:
"""Run the task on all agents
Args:
task (str, optional): _description_. Defaults to None.
Returns:
List: _description_
"""
self.logger.info(f"Running task {task} on all agents")
try:
return [
agent.run(task, *args, **kwargs)
for agent in self.agents_pool
]
except Exception as error:
print(f"Error running task on agents: {error}")
raise error
def list_agents(self):
"""List all agents
Returns:
List: _description_
"""
self.logger.info("[Listing all active agents]")
try:
# return [agent.id for agent in self.agents_pool]
for agent in self.agents:
num_agents = len(self.agents)
self.logger.info(
f"[Number of active agents: {num_agents}]"
)
return self.logger.info(
f"[Agent] [ID: {agent.id}] [Name:"
f" {agent.agent_name}] [Description:"
f" {agent.agent_description}] [Status] [Running]"
)
except Exception as error:
print(f"Error listing agents: {error}")
raise error
def get_agent(self, agent_id):
"""Get agent by id
Args:
agent_id (_type_): _description_
Returns:
_type_: _description_
"""
self.logger.info(f"Getting agent {agent_id}")
try:
for agent in self.agents_pool:
if agent.id == agent_id:
return agent
raise ValueError(f"No agent found with ID {agent_id}")
except Exception as error:
self.logger.error(f"Error getting agent: {error}")
raise error
def add_agent(self, agent):
"""Add agent to the agent pool
Args:
agent (_type_): _description_
"""
self.logger.info(f"Adding agent {agent} to pool")
try:
self.agents_pool.append(agent)
except Exception as error:
print(f"Error adding agent to pool: {error}")
raise error
def remove_agent(self, agent_id):
"""Remove agent from the agent pool
Args:
agent_id (_type_): _description_
"""
self.logger.info(f"Removing agent {agent_id} from pool")
try:
for agent in self.agents_pool:
if agent.id == agent_id:
self.agents_pool.remove(agent)
return
raise ValueError(f"No agent found with ID {agent_id}")
except Exception as error:
print(f"Error removing agent from pool: {error}")
raise error
async def async_remove_agent(self, agent_id):
"""Remove agent from the agent pool
Args:
agent_id (_type_): _description_
"""
self.logger.info(f"Removing agent {agent_id} from pool")
try:
# Remove agent from pool asynchronously with asyncio
loop = asyncio.get_running_loop()
await loop.run_in_executor(
None, self.remove_agent, agent_id
)
except Exception as error:
print(f"Error removing agent from pool: {error}")
raise error
def scale_up(self, num_agents: int = 1):
"""Scale up the agent pool
Args:
num_agents (int, optional): _description_. Defaults to 1.
"""
self.logger.info(f"Scaling up agent pool by {num_agents}")
try:
for _ in range(num_agents):
self.agents_pool.append(Agent())
except Exception as error:
print(f"Error scaling up agent pool: {error}")
raise error
def scale_down(self, num_agents: int = 1):
"""Scale down the agent pool
Args:
num_agents (int, optional): _description_. Defaults to 1.
"""
for _ in range(num_agents):
self.agents_pool.pop()
# - Create APIs for each agent in the pool (optional) with fastapi
def create_apis_for_agents(self):
"""Create APIs for each agent in the pool (optional) with fastapi
Returns:
_type_: _description_
"""
self.apis = []
for agent in self.agents:
self.api.get(f"/{agent.id}")
def run_agent(task: str, *args, **kwargs):
return agent.run(task, *args, **kwargs)
self.apis.append(self.api)
def run(self):
"""run the swarm network"""
# Observe all agents in the pool
self.logger.info("Starting the SwarmNetwork")
for agent in self.agents:
self.logger.info(f"Starting agent {agent.id}")
self.logger.info(
f"[Agent][{agent.id}] [Status] [Running] [Awaiting"
" Task]"
)

@ -1,4 +1,7 @@
import sched
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import (
Any,
Callable,
@ -10,23 +13,36 @@ from typing import (
from swarms.structs.agent import Agent
# Define a generic Task that can handle different types of callable objects
@dataclass
class Task:
"""
Task class for running a task in a sequential workflow.
Args:
description (str): The description of the task.
agent (Union[Callable, Agent]): The model or agent to execute the task.
args (List[Any]): Additional arguments to pass to the task execution.
kwargs (Dict[str, Any]): Additional keyword arguments to pass to the task execution.
result (Any): The result of the task execution.
history (List[Any]): The history of the task execution.
Attributes:
description (str): Description of the task.
agent (Union[Callable, Agent]): Agent or callable object to run the task.
args (List[Any]): Arguments to pass to the agent or callable object.
kwargs (Dict[str, Any]): Keyword arguments to pass to the agent or callable object.
result (Any): Result of the task.
history (List[Any]): History of the task.
schedule_time (datetime): Time to schedule the task.
scheduler (sched.scheduler): Scheduler to schedule the task.
trigger (Callable): Trigger to run the task.
action (Callable): Action to run the task.
condition (Callable): Condition to run the task.
priority (int): Priority of the task.
dependencies (List[Task]): List of tasks that need to be completed before this task can be executed.
Methods:
execute: Execute the task.
execute: Execute the task by calling the agent or model with the arguments and keyword arguments.
handle_scheduled_task: Handles the execution of a scheduled task.
set_trigger: Sets the trigger for the task.
set_action: Sets the action for the task.
set_condition: Sets the condition for the task.
is_completed: Checks whether the task has been completed.
add_dependency: Adds a task to the list of dependencies.
set_priority: Sets the priority of the task.
check_dependency_completion: Checks whether all the dependencies have been completed.
Examples:
@ -45,34 +61,134 @@ class Task:
kwargs: Dict[str, Any] = field(default_factory=dict)
result: Any = None
history: List[Any] = field(default_factory=list)
# logger = logging.getLogger(__name__)
schedule_time: datetime = None
scheduler = sched.scheduler(time.time, time.sleep)
trigger: Callable = None
action: Callable = None
condition: Callable = None
priority: int = 0
dependencies: List["Task"] = field(default_factory=list)
def execute(self):
"""
Execute the task.
Execute the task by calling the agent or model with the arguments and
keyword arguments.
Examples:
>>> from swarms.structs import Task, Agent
>>> from swarms.models import OpenAIChat
>>> agent = Agent(llm=OpenAIChat(openai_api_key=""), max_loops=1, dashboard=False)
>>> task = Task(description="What's the weather in miami", agent=agent)
>>> task.execute()
>>> task.result
Raises:
ValueError: If a Agent instance is used as a task and the 'task' argument is not provided.
"""
if isinstance(self.agent, Agent):
# Add a prompt to notify the Agent of the sequential workflow
if "prompt" in self.kwargs:
self.kwargs["prompt"] += (
f"\n\nPrevious output: {self.result}"
if self.result
else ""
)
else:
self.kwargs["prompt"] = (
f"Main task: {self.description}"
+ (
f"\n\nPrevious output: {self.result}"
if self.result
else ""
try:
if isinstance(self.agent, Agent):
if self.condition is None or self.condition():
self.result = self.agent.run(
*self.args, **self.kwargs
)
self.history.append(self.result)
if self.action is not None:
self.action()
else:
self.result = self.agent.run(
*self.args, **self.kwargs
)
self.result = self.agent.run(*self.args, **self.kwargs)
else:
self.result = self.agent(*self.args, **self.kwargs)
self.history.append(self.result)
self.history.append(self.result)
except Exception as error:
print(f"[ERROR][Task] {error}")
def handle_scheduled_task(self):
"""
Handles the execution of a scheduled task.
If the schedule time is not set or has already passed, the task is executed immediately.
Otherwise, the task is scheduled to be executed at the specified schedule time.
"""
try:
if (
self.schedule_time is None
or self.schedule_time <= datetime.now()
):
self.execute()
else:
delay = (
self.schedule_time - datetime.now()
).total_seconds()
self.scheduler.enter(delay, 1, self.execute)
self.scheduler_run()
except Exception as error:
print(f"[ERROR][Task] {error}")
def set_trigger(self, trigger: Callable):
"""
Sets the trigger for the task.
Args:
trigger (Callable): The trigger to set.
"""
self.trigger = trigger
def set_action(self, action: Callable):
"""
Sets the action for the task.
Args:
action (Callable): The action to set.
"""
self.action = action
def set_condition(self, condition: Callable):
"""
Sets the condition for the task.
Args:
condition (Callable): The condition to set.
"""
self.condition = condition
def is_completed(self):
"""Is the task completed?
Returns:
_type_: _description_
"""
return self.result is not None
def add_dependency(self, task):
"""Adds a task to the list of dependencies.
Args:
task (_type_): _description_
"""
self.dependencies.append(task)
def set_priority(self, priority: int):
"""Sets the priority of the task.
Args:
priority (int): _description_
"""
self.priority = priority
def check_dependency_completion(self):
"""
Checks whether all the dependencies have been completed.
Returns:
bool: True if all the dependencies have been completed, False otherwise.
"""
try:
for task in self.dependencies:
if not task.is_completed():
return False
except Exception as error:
print(
f"[ERROR][Task][check_dependency_completion] {error}"
)

@ -0,0 +1,95 @@
import json
from typing import List, Optional
from pydantic.v1 import BaseModel, Field, Json, root_validator
from swarms.structs.agent import Agent
from swarms.structs.task import Task
class Team(BaseModel):
"""
Class that represents a group of agents, how they should work together and
their tasks.
Attributes:
tasks (Optional[List[Task]]): List of tasks.
agents (Optional[List[Agent]]): List of agents in this Team.
architecture (str): Architecture that the Team will follow. Default is "sequential".
verbose (bool): Verbose mode for the Agent Execution. Default is False.
config (Optional[Json]): Configuration of the Team. Default is None.
"""
tasks: Optional[List[Task]] = Field(description="List of tasks")
agents: Optional[List[Agent]] = Field(description="List of agents in this Team.")
architecture = Field(
description="architecture that the Team will follow.", default="sequential"
)
verbose: bool = Field(
description="Verbose mode for the Agent Execution", default=False
)
config: Optional[Json] = Field(
description="Configuration of the Team.", default=None
)
@root_validator(pre=True)
def check_config(_cls, values):
if not values.get("config") and (
not values.get("agents") and not values.get("tasks")
):
raise ValueError("Either agents and task need to be set or config.")
if values.get("config"):
config = json.loads(values.get("config"))
if not config.get("agents") or not config.get("tasks"):
raise ValueError("Config should have agents and tasks.")
values["agents"] = [Agent(**agent) for agent in config["agents"]]
tasks = []
for task in config["tasks"]:
task_agent = [
agt for agt in values["agents"] if agt.role == task["agent"]
][0]
del task["agent"]
tasks.append(Task(**task, agent=task_agent))
values["tasks"] = tasks
return values
def run(self) -> str:
"""
Kickoff the Team to work on its tasks.
Returns:
output (List[str]): Output of the Team for each task.
"""
if self.architecture == "sequential":
return self.__sequential_loop()
def __sequential_loop(self) -> str:
"""
Loop that executes the sequential architecture.
Returns:
output (str): Output of the Team.
"""
task_outcome = None
for task in self.tasks:
# Add delegation tools to the task if the agent allows it
# if task.agent.allow_delegation:
# tools = AgentTools(agents=self.agents).tools()
# task.tools += tools
self.__log(f"\nWorking Agent: {task.agent.role}")
self.__log(f"Starting Task: {task.description} ...")
task_outcome = task.execute(task_outcome)
self.__log(f"Task output: {task_outcome}")
return task_outcome
def __log(self, message):
if self.verbose:
print(message)

@ -0,0 +1,50 @@
import pytest
from unittest.mock import Mock, patch
from swarms.structs.swarm_net import SwarmNetwork
from swarms.structs.agent import Agent
@pytest.fixture
def swarm_network():
agents = [Agent(id=f"Agent_{i}") for i in range(5)]
return SwarmNetwork(agents=agents)
def test_swarm_network_init(swarm_network):
assert isinstance(swarm_network.agents, list)
assert len(swarm_network.agents) == 5
@patch("swarms.structs.swarm_net.SwarmNetwork.logger")
def test_run(mock_logger, swarm_network):
swarm_network.run()
assert (
mock_logger.info.call_count == 10
) # 2 log messages per agent
def test_run_with_mocked_agents(mocker, swarm_network):
mock_agents = [Mock(spec=Agent) for _ in range(5)]
mocker.patch.object(swarm_network, "agents", mock_agents)
swarm_network.run()
for mock_agent in mock_agents:
assert mock_agent.run.called
def test_swarm_network_with_no_agents():
swarm_network = SwarmNetwork(agents=[])
assert swarm_network.agents == []
def test_swarm_network_add_agent(swarm_network):
new_agent = Agent(id="Agent_5")
swarm_network.add_agent(new_agent)
assert len(swarm_network.agents) == 6
assert swarm_network.agents[-1] == new_agent
def test_swarm_network_remove_agent(swarm_network):
agent_to_remove = swarm_network.agents[0]
swarm_network.remove_agent(agent_to_remove)
assert len(swarm_network.agents) == 4
assert agent_to_remove not in swarm_network.agents

@ -9,6 +9,8 @@ from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
)
from swarms.structs.agent import Agent
from swarms.structs.task import Task
import datetime
from datetime import timedelta
load_dotenv()
@ -163,3 +165,119 @@ def test_execute():
task = Task(id="5", task="Task5", result=None, agents=[agent])
# Assuming execute method returns True on successful execution
assert task.execute() is True
def test_task_execute_with_agent(mocker):
mock_agent = mocker.Mock(spec=Agent)
mock_agent.run.return_value = "result"
task = Task(description="Test task", agent=mock_agent)
task.execute()
assert task.result == "result"
assert task.history == ["result"]
def test_task_execute_with_callable(mocker):
mock_callable = mocker.Mock()
mock_callable.run.return_value = "result"
task = Task(description="Test task", agent=mock_callable)
task.execute()
assert task.result == "result"
assert task.history == ["result"]
def test_task_execute_with_condition(mocker):
mock_agent = mocker.Mock(spec=Agent)
mock_agent.run.return_value = "result"
condition = mocker.Mock(return_value=True)
task = Task(
description="Test task", agent=mock_agent, condition=condition
)
task.execute()
assert task.result == "result"
assert task.history == ["result"]
def test_task_execute_with_condition_false(mocker):
mock_agent = mocker.Mock(spec=Agent)
mock_agent.run.return_value = "result"
condition = mocker.Mock(return_value=False)
task = Task(
description="Test task", agent=mock_agent, condition=condition
)
task.execute()
assert task.result is None
assert task.history == []
def test_task_execute_with_action(mocker):
mock_agent = mocker.Mock(spec=Agent)
mock_agent.run.return_value = "result"
action = mocker.Mock()
task = Task(
description="Test task", agent=mock_agent, action=action
)
task.execute()
assert task.result == "result"
assert task.history == ["result"]
action.assert_called_once()
def test_task_handle_scheduled_task_now(mocker):
mock_agent = mocker.Mock(spec=Agent)
mock_agent.run.return_value = "result"
task = Task(
description="Test task",
agent=mock_agent,
schedule_time=datetime.now(),
)
task.handle_scheduled_task()
assert task.result == "result"
assert task.history == ["result"]
def test_task_handle_scheduled_task_future(mocker):
mock_agent = mocker.Mock(spec=Agent)
mock_agent.run.return_value = "result"
task = Task(
description="Test task",
agent=mock_agent,
schedule_time=datetime.now() + timedelta(days=1),
)
with mocker.patch.object(
task.scheduler, "enter"
) as mock_enter, mocker.patch.object(
task.scheduler, "run"
) as mock_run:
task.handle_scheduled_task()
mock_enter.assert_called_once()
mock_run.assert_called_once()
def test_task_set_trigger():
task = Task(description="Test task", agent=Agent())
def trigger():
return True
task.set_trigger(trigger)
assert task.trigger == trigger
def test_task_set_action():
task = Task(description="Test task", agent=Agent())
def action():
return True
task.set_action(action)
assert task.action == action
def test_task_set_condition():
task = Task(description="Test task", agent=Agent())
def condition():
return True
task.set_condition(condition)
assert task.condition == condition

Loading…
Cancel
Save