[FEATS][ run_agents_concurrently,

run_agents_concurrently_async,
    run_single_agent,
    run_agents_concurrently_multiprocess,
    run_agents_sequentially,
    run_agents_with_different_tasks,
    run_agent_with_timeout,
    run_agents_with_resource_monitoring,] + [CLEANUP]
pull/622/head
Your Name 2 months ago
parent 512ffe8a2f
commit 5ef4897f07

@ -0,0 +1,299 @@
from loguru import logger
import os
from typing import List
from pydantic import BaseModel, Field
from swarm_models import OpenAIFunctionCaller, OpenAIChat
from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter
class AgentConfig(BaseModel):
"""Configuration for an individual agent in a swarm"""
name: str = Field(
description="The name of the agent", example="Research-Agent"
)
description: str = Field(
description="A description of the agent's purpose and capabilities",
example="Agent responsible for researching and gathering information",
)
system_prompt: str = Field(
description="The system prompt that defines the agent's behavior",
example="You are a research agent. Your role is to gather and analyze information...",
)
max_loops: int = Field(
description="Maximum number of reasoning loops the agent can perform",
example=3,
)
class SwarmConfig(BaseModel):
"""Configuration for a swarm of cooperative agents"""
name: str = Field(
description="The name of the swarm",
example="Research-Writing-Swarm",
)
description: str = Field(
description="The description of the swarm's purpose and capabilities",
example="A swarm of agents that work together to research topics and write articles",
)
agents: List[AgentConfig] = Field(
description="The list of agents that make up the swarm",
example=[
AgentConfig(
name="Research-Agent",
description="Gathers information",
system_prompt="You are a research agent...",
max_loops=2,
),
AgentConfig(
name="Writing-Agent",
description="Writes content",
system_prompt="You are a writing agent...",
max_loops=1,
),
],
)
max_loops: int = Field(
description="The maximum number of loops to run the swarm",
example=1,
)
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
BOSS_SYSTEM_PROMPT = """
Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective.
### Instructions:
1. **Task Assignment**:
- Analyze available worker agents when a task is presented.
- Delegate tasks to existing agents with clear, direct, and actionable instructions if an appropriate agent is available.
- If no suitable agent exists, create a new agent with a fitting system prompt to handle the task.
2. **Agent Creation**:
- Name agents according to the task they are intended to perform (e.g., "Twitter Marketing Agent").
- Provide each new agent with a concise and clear system prompt that includes its role, objectives, and any tools it can utilize.
3. **Efficiency**:
- Minimize redundancy and maximize task completion speed.
- Avoid unnecessary agent creation if an existing agent can fulfill the task.
4. **Communication**:
- Be explicit in task delegation instructions to avoid ambiguity and ensure effective task execution.
- Require agents to report back on task completion or encountered issues.
5. **Reasoning and Decisions**:
- Offer brief reasoning when selecting or creating agents to maintain transparency.
- Avoid using an agent if unnecessary, with a clear explanation if no agents are suitable for a task.
# Output Format
Present your plan in clear, bullet-point format or short concise paragraphs, outlining task assignment, agent creation, efficiency strategies, and communication protocols.
# Notes
- Preserve transparency by always providing reasoning for task-agent assignments and creation.
- Ensure instructions to agents are unambiguous to minimize error.
"""
class AutoSwarmBuilder:
"""A class that automatically builds and manages swarms of AI agents.
This class handles the creation, coordination and execution of multiple AI agents working
together as a swarm to accomplish complex tasks. It uses a boss agent to delegate work
and create new specialized agents as needed.
Args:
name (str): The name of the swarm
description (str): A description of the swarm's purpose
verbose (bool, optional): Whether to output detailed logs. Defaults to True.
max_loops (int, optional): Maximum number of execution loops. Defaults to 1.
"""
def __init__(
self,
name: str = None,
description: str = None,
verbose: bool = True,
max_loops: int = 1,
):
self.name = name
self.description = description
self.verbose = verbose
self.max_loops = max_loops
self.agents_pool = []
logger.info(
f"Initialized AutoSwarmBuilder: {name} {description}"
)
# @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def run(self, task: str, image_url: str = None, *args, **kwargs):
"""Run the swarm on a given task.
Args:
task (str): The task to be accomplished
image_url (str, optional): URL of an image input if needed. Defaults to None.
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
Returns:
The output from the swarm's execution
"""
logger.info(f"Running swarm on task: {task}")
agents = self._create_agents(task, image_url, *args, **kwargs)
logger.info(f"Agents created {len(agents)}")
logger.info("Routing task through swarm")
output = self.swarm_router(agents, task, image_url)
logger.info(f"Swarm execution complete with output: {output}")
return output
# @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def _create_agents(self, task: str, *args, **kwargs):
"""Create the necessary agents for a task.
Args:
task (str): The task to create agents for
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
Returns:
list: List of created agents
"""
logger.info("Creating agents for task")
model = OpenAIFunctionCaller(
system_prompt=BOSS_SYSTEM_PROMPT,
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.1,
base_model=SwarmConfig,
)
agents_dictionary = model.run(task)
logger.info(f"Agents dictionary: {agents_dictionary}")
# Convert dictionary to SwarmConfig if needed
if isinstance(agents_dictionary, dict):
agents_dictionary = SwarmConfig(**agents_dictionary)
# Set swarm config
self.name = agents_dictionary.name
self.description = agents_dictionary.description
self.max_loops = getattr(
agents_dictionary, "max_loops", 1
) # Default to 1 if not set
logger.info(
f"Swarm config: {self.name}, {self.description}, {self.max_loops}"
)
# Create agents from config
agents = []
for agent_config in agents_dictionary.agents:
# Convert dict to AgentConfig if needed
if isinstance(agent_config, dict):
agent_config = AgentConfig(**agent_config)
agent = self.build_agent(
agent_name=agent_config.name,
agent_description=agent_config.description,
agent_system_prompt=agent_config.system_prompt,
max_loops=agent_config.max_loops,
)
agents.append(agent)
return agents
def build_agent(
self,
agent_name: str,
agent_description: str,
agent_system_prompt: str,
max_loops: int = 1,
):
"""Build a single agent with the given specifications.
Args:
agent_name (str): Name of the agent
agent_description (str): Description of the agent's purpose
agent_system_prompt (str): The system prompt for the agent
Returns:
Agent: The constructed agent instance
"""
logger.info(f"Building agent: {agent_name}")
agent = Agent(
agent_name=agent_name,
description=agent_description,
system_prompt=agent_system_prompt,
llm=model,
max_loops=max_loops,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path=f"{agent_name}.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" soon "yaml" and
streaming_on=False,
auto_generate_prompt=True,
)
return agent
# @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def swarm_router(
self,
agents: List[Agent],
task: str,
image_url: str = None,
*args,
**kwargs,
):
"""Route tasks between agents in the swarm.
Args:
agents (List[Agent]): List of available agents
task (str): The task to route
image_url (str, optional): URL of an image input if needed. Defaults to None.
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
Returns:
The output from the routed task execution
"""
logger.info("Routing task through swarm")
swarm_router_instance = SwarmRouter(
agents=agents,
swarm_type="auto",
max_loops=1,
)
return swarm_router_instance.run(
self.name + " " + self.description + " " + task,
)
example = AutoSwarmBuilder()
print(
example.run(
"Write multiple blog posts about the latest advancements in swarm intelligence all at once"
)
)

@ -0,0 +1,162 @@
import os
from dotenv import load_dotenv
from swarms import Agent
from swarm_models import OpenAIChat
from swarms.structs.swarm_router import SwarmRouter
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Define specialized system prompts for each agent
DATA_EXTRACTOR_PROMPT = """You are a highly specialized private equity agent focused on data extraction from various documents. Your expertise includes:
1. Extracting key financial metrics (revenue, EBITDA, growth rates, etc.) from financial statements and reports
2. Identifying and extracting important contract terms from legal documents
3. Pulling out relevant market data from industry reports and analyses
4. Extracting operational KPIs from management presentations and internal reports
5. Identifying and extracting key personnel information from organizational charts and bios
Provide accurate, structured data extracted from various document types to support investment analysis."""
SUMMARIZER_PROMPT = """You are an expert private equity agent specializing in summarizing complex documents. Your core competencies include:
1. Distilling lengthy financial reports into concise executive summaries
2. Summarizing legal documents, highlighting key terms and potential risks
3. Condensing industry reports to capture essential market trends and competitive dynamics
4. Summarizing management presentations to highlight key strategic initiatives and projections
5. Creating brief overviews of technical documents, emphasizing critical points for non-technical stakeholders
Deliver clear, concise summaries that capture the essence of various documents while highlighting information crucial for investment decisions."""
FINANCIAL_ANALYST_PROMPT = """You are a specialized private equity agent focused on financial analysis. Your key responsibilities include:
1. Analyzing historical financial statements to identify trends and potential issues
2. Evaluating the quality of earnings and potential adjustments to EBITDA
3. Assessing working capital requirements and cash flow dynamics
4. Analyzing capital structure and debt capacity
5. Evaluating financial projections and underlying assumptions
Provide thorough, insightful financial analysis to inform investment decisions and valuation."""
MARKET_ANALYST_PROMPT = """You are a highly skilled private equity agent specializing in market analysis. Your expertise covers:
1. Analyzing industry trends, growth drivers, and potential disruptors
2. Evaluating competitive landscape and market positioning
3. Assessing market size, segmentation, and growth potential
4. Analyzing customer dynamics, including concentration and loyalty
5. Identifying potential regulatory or macroeconomic impacts on the market
Deliver comprehensive market analysis to assess the attractiveness and risks of potential investments."""
OPERATIONAL_ANALYST_PROMPT = """You are an expert private equity agent focused on operational analysis. Your core competencies include:
1. Evaluating operational efficiency and identifying improvement opportunities
2. Analyzing supply chain and procurement processes
3. Assessing sales and marketing effectiveness
4. Evaluating IT systems and digital capabilities
5. Identifying potential synergies in merger or add-on acquisition scenarios
Provide detailed operational analysis to uncover value creation opportunities and potential risks."""
# Initialize specialized agents
data_extractor_agent = Agent(
agent_name="Data-Extractor",
system_prompt=DATA_EXTRACTOR_PROMPT,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="data_extractor_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
summarizer_agent = Agent(
agent_name="Document-Summarizer",
system_prompt=SUMMARIZER_PROMPT,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="summarizer_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
financial_analyst_agent = Agent(
agent_name="Financial-Analyst",
system_prompt=FINANCIAL_ANALYST_PROMPT,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="financial_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
market_analyst_agent = Agent(
agent_name="Market-Analyst",
system_prompt=MARKET_ANALYST_PROMPT,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="market_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
operational_analyst_agent = Agent(
agent_name="Operational-Analyst",
system_prompt=OPERATIONAL_ANALYST_PROMPT,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="operational_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
# Initialize the SwarmRouter
router = SwarmRouter(
name="pe-document-analysis-swarm",
description="Analyze documents for private equity due diligence and investment decision-making",
max_loops=1,
agents=[
data_extractor_agent,
summarizer_agent,
# financial_analyst_agent,
# market_analyst_agent,
# operational_analyst_agent,
],
swarm_type="auto", # or "SequentialWorkflow" or "ConcurrentWorkflow" or
auto_generate_prompts=True,
)
# Example usage
if __name__ == "__main__":
# Run a comprehensive private equity document analysis task
result = router.run(
"Where is the best place to find template term sheets for series A startups. Provide links and references"
)
print(result)
# Retrieve and print logs
for log in router.get_logs():
print(f"{log.timestamp} - {log.level}: {log.message}")

@ -176,6 +176,7 @@ nav:
- ForestSwarm: "swarms/structs/forest_swarm.md" - ForestSwarm: "swarms/structs/forest_swarm.md"
- SwarmRouter: "swarms/structs/swarm_router.md" - SwarmRouter: "swarms/structs/swarm_router.md"
- TaskQueueSwarm: "swarms/structs/taskqueue_swarm.md" - TaskQueueSwarm: "swarms/structs/taskqueue_swarm.md"
- Various Execution Methods: "swarms/structs/various_execution_methods.md"
- Workflows: - Workflows:
- ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md" - ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md"
- SequentialWorkflow: "swarms/structs/sequential_workflow.md" - SequentialWorkflow: "swarms/structs/sequential_workflow.md"

@ -0,0 +1,168 @@
# Concurrent Agents API Reference
This documentation covers the API for running multiple agents concurrently using various execution strategies. The implementation uses `asyncio` with `uvloop` for enhanced performance and `ThreadPoolExecutor` for handling CPU-bound operations.
## Table of Contents
- [Core Functions](#core-functions)
- [Advanced Functions](#advanced-functions)
- [Utility Functions](#utility-functions)
- [Resource Monitoring](#resource-monitoring)
- [Usage Examples](#usage-examples)
## Core Functions
### run_agents_concurrently()
Primary function for running multiple agents concurrently with optimized performance using both uvloop and ThreadPoolExecutor.
#### Arguments
| Parameter | Type | Required | Default | Description |
|-------------|----------------|----------|----------------|-------------|
| agents | List[AgentType]| Yes | - | List of Agent instances to run concurrently |
| task | str | Yes | - | Task string to execute |
| batch_size | int | No | CPU count | Number of agents to run in parallel in each batch |
| max_workers | int | No | CPU count * 2 | Maximum number of threads in the executor |
#### Returns
`List[Any]`: List of outputs from each agent
#### Flow Diagram
```mermaid
graph TD
A[Start] --> B[Initialize ThreadPoolExecutor]
B --> C[Split Agents into Batches]
C --> D[Process Batch]
D --> E{More Batches?}
E -->|Yes| D
E -->|No| F[Combine Results]
F --> G[Return Results]
subgraph "Batch Processing"
D --> H[Run Agents Async]
H --> I[Wait for Completion]
I --> J[Collect Batch Results]
end
```
### run_agents_sequentially()
Runs multiple agents sequentially for baseline comparison or simple use cases.
#### Arguments
| Parameter | Type | Required | Default | Description |
|-----------|----------------|----------|---------|-------------|
| agents | List[AgentType]| Yes | - | List of Agent instances to run |
| task | str | Yes | - | Task string to execute |
#### Returns
`List[Any]`: List of outputs from each agent
## Advanced Functions
### run_agents_with_different_tasks()
Runs multiple agents with different tasks concurrently.
#### Arguments
| Parameter | Type | Required | Default | Description |
|-----------------|-------------------------------|----------|----------------|-------------|
| agent_task_pairs| List[tuple[AgentType, str]] | Yes | - | List of (agent, task) tuples |
| batch_size | int | No | CPU count | Number of agents to run in parallel |
| max_workers | int | No | CPU count * 2 | Maximum number of threads |
### run_agents_with_timeout()
Runs multiple agents concurrently with timeout limits.
#### Arguments
| Parameter | Type | Required | Default | Description |
|-------------|----------------|----------|----------------|-------------|
| agents | List[AgentType]| Yes | - | List of Agent instances |
| task | str | Yes | - | Task string to execute |
| timeout | float | Yes | - | Timeout in seconds for each agent |
| batch_size | int | No | CPU count | Number of agents to run in parallel |
| max_workers | int | No | CPU count * 2 | Maximum number of threads |
## Usage Examples
```python
from swarms.structs.agent import Agent
from your_module import run_agents_concurrently
# Initialize agents
agents = [
Agent(
agent_name=f"Analysis-Agent-{i}",
system_prompt="You are a financial analysis expert",
llm=model,
max_loops=1
)
for i in range(5)
]
# Basic concurrent execution
task = "Analyze the impact of rising interest rates on tech stocks"
outputs = run_agents_concurrently(agents, task)
# Running with timeout
outputs_with_timeout = run_agents_with_timeout(
agents=agents,
task=task,
timeout=30.0,
batch_size=2
)
# Running different tasks
task_pairs = [
(agents[0], "Analyze tech stocks"),
(agents[1], "Analyze energy stocks"),
(agents[2], "Analyze retail stocks")
]
different_outputs = run_agents_with_different_tasks(task_pairs)
```
## Resource Monitoring
### ResourceMetrics
A dataclass for system resource metrics.
#### Properties
| Property | Type | Description |
|----------------|-------|-------------|
| cpu_percent | float | Current CPU usage percentage |
| memory_percent | float | Current memory usage percentage |
| active_threads | int | Number of active threads |
### run_agents_with_resource_monitoring()
Runs agents with system resource monitoring and adaptive batch sizing.
#### Arguments
| Parameter | Type | Required | Default | Description |
|------------------|----------------|----------|---------|-------------|
| agents | List[AgentType]| Yes | - | List of Agent instances |
| task | str | Yes | - | Task string to execute |
| cpu_threshold | float | No | 90.0 | Max CPU usage percentage |
| memory_threshold | float | No | 90.0 | Max memory usage percentage |
| check_interval | float | No | 1.0 | Resource check interval in seconds |
## Performance Considerations
- All functions are decorated with `@profile_func` for performance monitoring
- Default batch sizes and worker counts are optimized based on CPU cores
- Resource monitoring helps prevent system overload
- Using `uvloop` provides better performance than standard `asyncio`
## Error Handling
- Functions handle asyncio event loop creation/retrieval
- Timeout mechanism prevents infinite waiting
- Resource monitoring allows for adaptive performance adjustment

@ -35,7 +35,7 @@ agent = Agent(
# output_type="json", # output_type="json",
output_type="json", # "json", "dict", "csv" OR "string" soon "yaml" and output_type="json", # "json", "dict", "csv" OR "string" soon "yaml" and
streaming_on=False, streaming_on=False,
# auto_generate_prompt=True, auto_generate_prompt=True,
) )

@ -3,7 +3,9 @@ from swarm_models import OpenAIChat
import os import os
model = OpenAIChat( model = OpenAIChat(
api_key=os.getenv("OPENAI_API_KEY"), model_name="gpt-4o-mini", temperature=0.1 api_key=os.getenv("OPENAI_API_KEY"),
model_name="gpt-4o-mini",
temperature=0.1,
) )
# Aggregator system prompt # Aggregator system prompt
@ -25,4 +27,4 @@ prompt_generator_sys_prompt = Prompt(
llm=model, llm=model,
) )
print(prompt_generator_sys_prompt.get_prompt()) # print(prompt_generator_sys_prompt.get_prompt())

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "5.8.7" version = "5.9.1"
description = "Swarms - Pytorch" description = "Swarms - Pytorch"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]

@ -14,8 +14,6 @@ from swarms.agents.tool_agent import ToolAgent
from swarms.agents.create_agents_from_yaml import ( from swarms.agents.create_agents_from_yaml import (
create_agents_from_yaml, create_agents_from_yaml,
) )
from swarms.agents.prompt_generator_agent import PromptGeneratorAgent
__all__ = [ __all__ = [
"ToolAgent", "ToolAgent",
@ -30,5 +28,4 @@ __all__ = [
"check_exit", "check_exit",
"check_end", "check_end",
"create_agents_from_yaml", "create_agents_from_yaml",
"PromptGeneratorAgent",
] ]

@ -1,84 +0,0 @@
import os
from swarms import Agent
from swarm_models import OpenAIChat
from swarms.prompts.prompt_generator_optimizer import (
prompt_generator_sys_prompt,
)
from dotenv import load_dotenv
from swarms.agents.prompt_generator_agent import PromptGeneratorAgent
from yaml import dump
load_dotenv()
def generate_prompt(
num_loops: int = 1,
autosave: bool = True,
save_to_yaml: bool = False,
prompt: str = None,
save_format: str = "yaml",
) -> None:
"""
This function creates and runs a prompt generator agent with default settings for number of loops and autosave.
Args:
num_loops (int, optional): The number of loops to run the agent. Defaults to 1.
autosave (bool, optional): Whether to autosave the agent's state. Defaults to True.
save_to_yaml (bool, optional): Whether to save the agent's configuration to a YAML file. Defaults to False.
prompt (str): The prompt to generate.
save_format (str, optional): The format in which to save the generated prompt. Defaults to "yaml".
Returns:
None
"""
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key,
model_name="gpt-4o-mini",
temperature=0.1,
max_tokens=2000,
)
# Initialize the agent
agent = Agent(
agent_name="Prompt-Optimizer",
system_prompt=prompt_generator_sys_prompt.get_prompt(),
llm=model,
max_loops=num_loops,
autosave=autosave,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="optimizer_agent.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
output_type="string",
)
# Main Class
prompt_generator = PromptGeneratorAgent(agent)
# Run the agent
prompt_generator.run(prompt, save_format)
if save_to_yaml:
with open("agent_config.yaml", "w") as file:
dump(agent.config, file)
# # Example usage
# if __name__ == "__main__":
# try:
# create_and_run_agent(
# num_loops=1,
# autosave=True,
# save_to_yaml=True,
# prompt="Generate an amazing prompt for analyzing healthcare insurance documents",
# )
# except Exception as e:
# logger.error(f"An error occurred: {e}")

@ -1,92 +0,0 @@
import json
import time
import uuid
import yaml
from dotenv import load_dotenv
from loguru import logger
from swarms.structs.agent import Agent
from swarms.prompts.prompt_generator_optimizer import (
prompt_generator_sys_prompt,
)
load_dotenv()
class PromptGeneratorAgent:
"""
A class representing a prompt generator agent.
Attributes:
----------
agent : Agent
The underlying agent instance.
"""
def __init__(self, agent: Agent):
"""
Initializes the PromptGeneratorAgent instance.
Args:
----
agent : Agent
The agent instance to be used for prompt generation.
"""
self.agent = agent
def run(self, task: str, format: str = "json") -> str:
"""
Runs the prompt generator agent with the given task description and saves the generated prompt with the given metadata in the specified format.
Args:
----
task : str
The task description to be used for prompt generation.
metadata : Dict[str, Any]
The metadata to be saved along with the prompt.
format : str, optional
The format in which the prompt should be saved (default is "json").
Returns:
-------
str
The generated prompt.
"""
prompt = self.agent.run(task)
self.save_prompt(prompt, format)
return prompt
def save_prompt(
self,
prompt: str,
format: str = "yaml",
):
"""
Saves the generated prompt with the given metadata in the specified format using the prompt generator sys prompt model dump.
Args:
----
prompt : str
The generated prompt to be saved.
metadata : Dict[str, Any]
The metadata to be saved along with the prompt.
format : str, optional
The format in which the prompt should be saved (default is "json").
"""
data = {
"prompt_history": prompt_generator_sys_prompt.model_dump(),
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"prompt": prompt,
}
if format == "json":
with open(f"prompt_{uuid.uuid4()}.json", "w") as f:
json.dump(data, f, indent=4)
elif format == "yaml":
with open(f"prompt_{uuid.uuid4()}.yaml", "w") as f:
yaml.dump(data, f)
else:
logger.error(
"Invalid format. Only 'json' and 'yaml' are supported."
)

@ -8,7 +8,6 @@ from swarms.cli.onboarding_process import OnboardingProcess
from swarms.agents.create_agents_from_yaml import ( from swarms.agents.create_agents_from_yaml import (
create_agents_from_yaml, create_agents_from_yaml,
) )
from swarms.agents.cli_prompt_generator_func import generate_prompt
import subprocess import subprocess
console = Console() console = Console()
@ -46,6 +45,7 @@ def show_help():
[bold white]run-agents[/bold white] : Run your Agents from your specified yaml file. Specify the yaml file with path the `--yaml-file` arg. Example: `--yaml-file agents.yaml` [bold white]run-agents[/bold white] : Run your Agents from your specified yaml file. Specify the yaml file with path the `--yaml-file` arg. Example: `--yaml-file agents.yaml`
[bold white]generate-prompt[/bold white] : Generate a prompt through automated prompt engineering. Requires an OPENAI Key in your `.env` Example: --prompt "Generate a prompt for an agent to analyze legal docs" [bold white]generate-prompt[/bold white] : Generate a prompt through automated prompt engineering. Requires an OPENAI Key in your `.env` Example: --prompt "Generate a prompt for an agent to analyze legal docs"
[bold white]auto-upgrade[/bold white] : Automatically upgrades Swarms to the latest version [bold white]auto-upgrade[/bold white] : Automatically upgrades Swarms to the latest version
[bold white]book-call[/bold white] : Book a strategy session with our team to discuss your use case and get personalized guidance
For more details, visit: https://docs.swarms.world For more details, visit: https://docs.swarms.world
""" """
@ -81,6 +81,18 @@ def redirect_to_docs():
time.sleep(2) time.sleep(2)
# Redirect to docs
def redirect_to_call():
console.print(
"[bold yellow]Opening the Call page...[/bold yellow]"
)
# Simulating API key retrieval process by opening the website
import webbrowser
webbrowser.open("https://cal.com/swarms/swarms-strategy-session")
time.sleep(2)
# Check and start cache (login system simulation) # Check and start cache (login system simulation)
def check_login(): def check_login():
cache_file = "cache.txt" cache_file = "cache.txt"
@ -155,7 +167,8 @@ def main():
"check-login", "check-login",
"run-agents", "run-agents",
"generate-prompt", # Added new command for generating prompts "generate-prompt", # Added new command for generating prompts
"auto-upgrade", # Added new command for auto-upgrade "auto-upgrade", # Added new command for auto-upgrade,
"book-call",
], ],
help="Command to run", help="Command to run",
) )
@ -204,22 +217,24 @@ def main():
create_agents_from_yaml( create_agents_from_yaml(
yaml_file=args.yaml_file, return_type="tasks" yaml_file=args.yaml_file, return_type="tasks"
) )
elif args.command == "generate-prompt": # elif args.command == "generate-prompt":
if ( # if (
args.prompt # args.prompt
): # Corrected from args.prompt_task to args.prompt # ): # Corrected from args.prompt_task to args.prompt
generate_prompt( # generate_prompt(
num_loops=args.num_loops, # num_loops=args.num_loops,
autosave=args.autosave, # autosave=args.autosave,
save_to_yaml=args.save_to_yaml, # save_to_yaml=args.save_to_yaml,
prompt=args.prompt, # Corrected from args.prompt_task to args.prompt # prompt=args.prompt, # Corrected from args.prompt_task to args.prompt
) # )
else: # else:
console.print( # console.print(
"[bold red]Please specify a task for generating a prompt using '--prompt'.[/bold red]" # "[bold red]Please specify a task for generating a prompt using '--prompt'.[/bold red]"
) # )
elif args.command == "auto-upgrade": elif args.command == "auto-upgrade":
check_and_upgrade_version() check_and_upgrade_version()
elif args.command == "book-call":
redirect_to_call()
else: else:
console.print( console.print(
"[bold red]Unknown command! Type 'help' for usage.[/bold red]" "[bold red]Unknown command! Type 'help' for usage.[/bold red]"

@ -13,9 +13,11 @@ from pydantic.v1 import validator
from swarms_cloud.utils.log_to_swarms_database import log_agent_data from swarms_cloud.utils.log_to_swarms_database import log_agent_data
from swarms_cloud.utils.capture_system_data import capture_system_data from swarms_cloud.utils.capture_system_data import capture_system_data
from swarms.tools.base_tool import BaseTool from swarms.tools.base_tool import BaseTool
# from swarms.agents.ape_agent import auto_generate_prompt # from swarms.agents.ape_agent import auto_generate_prompt
from typing import Any from typing import Any
class Prompt(BaseModel): class Prompt(BaseModel):
""" """
A class representing a prompt with content, edit history, and version control. A class representing a prompt with content, edit history, and version control.
@ -91,13 +93,13 @@ class Prompt(BaseModel):
values["content"] values["content"]
] # Store initial version in history ] # Store initial version in history
return v return v
def __init__(self, **data): def __init__(self, **data):
super().__init__(**data) super().__init__(**data)
if self.autosave: if self.autosave:
self._autosave() self._autosave()
if self.auto_generate_prompt and self.llm: if self.auto_generate_prompt and self.llm:
self.auto_generate_prompt() self.auto_generate_prompt()
@ -252,7 +254,7 @@ class Prompt(BaseModel):
with open(file_path, "w") as file: with open(file_path, "w") as file:
json.dump(self.model_dump(), file) json.dump(self.model_dump(), file)
logger.info(f"Autosaved prompt {self.id} to {file_path}.") logger.info(f"Autosaved prompt {self.id} to {file_path}.")
# def auto_generate_prompt(self): # def auto_generate_prompt(self):
# logger.info(f"Auto-generating prompt for {self.name}") # logger.info(f"Auto-generating prompt for {self.name}")
# task = self.name + " " + self.description + " " + self.content # task = self.name + " " + self.description + " " + self.content
@ -260,7 +262,7 @@ class Prompt(BaseModel):
# logger.info("Generated prompt successfully, updating content") # logger.info("Generated prompt successfully, updating content")
# self.edit_prompt(prompt) # self.edit_prompt(prompt)
# logger.info("Prompt content updated") # logger.info("Prompt content updated")
# return "Prompt auto-generated successfully." # return "Prompt auto-generated successfully."
class Config: class Config:

@ -66,5 +66,5 @@ prompt_generator_sys_prompt.edit_prompt(
""" """
) )
print(prompt_generator_sys_prompt.get_prompt()) # print(prompt_generator_sys_prompt.get_prompt())
print(prompt_generator_sys_prompt.model_dump_json(indent=4)) # print(prompt_generator_sys_prompt.model_dump_json(indent=4))

@ -67,6 +67,17 @@ from swarms.structs.yaml_model import (
) )
from swarms.structs.swarm_router import SwarmRouter, SwarmType from swarms.structs.swarm_router import SwarmRouter, SwarmType
from swarms.structs.swarm_arange import SwarmRearrange from swarms.structs.swarm_arange import SwarmRearrange
from swarms.structs.multi_agent_exec import (
run_agents_concurrently,
run_agents_concurrently_async,
run_single_agent,
run_agents_concurrently_multiprocess,
run_agents_sequentially,
run_agents_with_different_tasks,
run_agent_with_timeout,
run_agents_with_resource_monitoring,
)
__all__ = [ __all__ = [
"Agent", "Agent",
@ -130,4 +141,12 @@ __all__ = [
"SwarmRouter", "SwarmRouter",
"SwarmType", "SwarmType",
"SwarmRearrange", "SwarmRearrange",
"run_agents_concurrently",
"run_agents_concurrently_async",
"run_single_agent",
"run_agents_concurrently_multiprocess",
"run_agents_sequentially",
"run_agents_with_different_tasks",
"run_agent_with_timeout",
"run_agents_with_resource_monitoring",
] ]

@ -57,7 +57,7 @@ from clusterops import (
execute_with_cpu_cores, execute_with_cpu_cores,
) )
from swarms.agents.ape_agent import auto_generate_prompt from swarms.agents.ape_agent import auto_generate_prompt
import yaml
# Utils # Utils
# Custom stopping condition # Custom stopping condition
@ -1014,7 +1014,9 @@ class Agent:
elif self.output_type == "dict": elif self.output_type == "dict":
return self.agent_output.model_dump() return self.agent_output.model_dump()
elif self.output_type == "yaml": elif self.output_type == "yaml":
return yaml.safe_dump(self.agent_output.model_dump(), sort_keys=False) return yaml.safe_dump(
self.agent_output.model_dump(), sort_keys=False
)
else: else:
raise ValueError( raise ValueError(
f"Invalid output type: {self.output_type}" f"Invalid output type: {self.output_type}"

@ -1,20 +0,0 @@
from typing import List, Any, Dict, Optional
class AutoSwarmBuilder:
def __init__(self, task: str, num_agents: int = 10, batch_size: int = 10):
self.task = task
self.num_agents = num_agents
self.batch_size = batch_size
def run(self, task: str, image_url: str = None, *args, **kwargs):
pass
def _create_swarm(self):
pass
def _create_agents(self):
pass
def _run_agents(self):
pass

@ -0,0 +1,344 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
import psutil
from dataclasses import dataclass
import threading
from typing import List, Union, Any, Callable
from multiprocessing import cpu_count
import uvloop
from swarms.structs.agent import Agent
from swarms.utils.calculate_func_metrics import profile_func
# Use uvloop for faster asyncio event loop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Type definitions
AgentType = Union[Agent, Callable]
def run_single_agent(agent: AgentType, task: str) -> Any:
"""Run a single agent synchronously"""
return agent.run(task)
async def run_agent_async(
agent: AgentType, task: str, executor: ThreadPoolExecutor
) -> Any:
"""
Run an agent asynchronously using a thread executor.
Args:
agent: Agent instance to run
task: Task string to execute
executor: ThreadPoolExecutor instance for handling CPU-bound operations
Returns:
Agent execution result
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
executor, run_single_agent, agent, task
)
async def run_agents_concurrently_async(
agents: List[AgentType], task: str, executor: ThreadPoolExecutor
) -> List[Any]:
"""
Run multiple agents concurrently using asyncio and thread executor.
Args:
agents: List of Agent instances to run concurrently
task: Task string to execute
executor: ThreadPoolExecutor for CPU-bound operations
Returns:
List of outputs from each agent
"""
results = await asyncio.gather(
*(run_agent_async(agent, task, executor) for agent in agents)
)
return results
@profile_func
def run_agents_concurrently(
agents: List[AgentType],
task: str,
batch_size: int = None,
max_workers: int = None,
) -> List[Any]:
"""
Optimized concurrent agent runner using both uvloop and ThreadPoolExecutor.
Args:
agents: List of Agent instances to run concurrently
task: Task string to execute
batch_size: Number of agents to run in parallel in each batch (defaults to CPU count)
max_workers: Maximum number of threads in the executor (defaults to CPU count * 2)
Returns:
List of outputs from each agent
"""
# Optimize defaults based on system resources
cpu_cores = cpu_count()
batch_size = batch_size or cpu_cores
max_workers = max_workers or cpu_cores * 2
results = []
# Get or create event loop
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Create a shared thread pool executor with optimal worker count
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Process agents in batches
for i in range(0, len(agents), batch_size):
batch = agents[i : i + batch_size]
batch_results = loop.run_until_complete(
run_agents_concurrently_async(batch, task, executor)
)
results.extend(batch_results)
return results
@profile_func
def run_agents_concurrently_multiprocess(
agents: List[Agent], task: str, batch_size: int = cpu_count()
) -> List[Any]:
"""
Manage and run multiple agents concurrently in batches, with optimized performance.
Args:
agents (List[Agent]): List of Agent instances to run concurrently.
task (str): The task string to execute by all agents.
batch_size (int, optional): Number of agents to run in parallel in each batch.
Defaults to the number of CPU cores.
Returns:
List[Any]: A list of outputs from each agent.
"""
results = []
loop = asyncio.get_event_loop()
# Process agents in batches to avoid overwhelming system resources
for i in range(0, len(agents), batch_size):
batch = agents[i : i + batch_size]
batch_results = loop.run_until_complete(
run_agents_concurrently_async(batch, task)
)
results.extend(batch_results)
return results
@profile_func
def run_agents_sequentially(agents: List[AgentType], task: str) -> List[Any]:
"""
Run multiple agents sequentially for baseline comparison.
Args:
agents: List of Agent instances to run
task: Task string to execute
Returns:
List of outputs from each agent
"""
return [run_single_agent(agent, task) for agent in agents]
@profile_func
def run_agents_with_different_tasks(
agent_task_pairs: List[tuple[AgentType, str]],
batch_size: int = None,
max_workers: int = None
) -> List[Any]:
"""
Run multiple agents with different tasks concurrently.
Args:
agent_task_pairs: List of (agent, task) tuples
batch_size: Number of agents to run in parallel
max_workers: Maximum number of threads
Returns:
List of outputs from each agent
"""
async def run_pair_async(pair: tuple[AgentType, str], executor: ThreadPoolExecutor) -> Any:
agent, task = pair
return await run_agent_async(agent, task, executor)
cpu_cores = cpu_count()
batch_size = batch_size or cpu_cores
max_workers = max_workers or cpu_cores * 2
results = []
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for i in range(0, len(agent_task_pairs), batch_size):
batch = agent_task_pairs[i : i + batch_size]
batch_results = loop.run_until_complete(
asyncio.gather(*(run_pair_async(pair, executor) for pair in batch))
)
results.extend(batch_results)
return results
async def run_agent_with_timeout(
agent: AgentType,
task: str,
timeout: float,
executor: ThreadPoolExecutor
) -> Any:
"""
Run an agent with a timeout limit.
Args:
agent: Agent instance to run
task: Task string to execute
timeout: Timeout in seconds
executor: ThreadPoolExecutor instance
Returns:
Agent execution result or None if timeout occurs
"""
try:
return await asyncio.wait_for(
run_agent_async(agent, task, executor),
timeout=timeout
)
except asyncio.TimeoutError:
return None
@profile_func
def run_agents_with_timeout(
agents: List[AgentType],
task: str,
timeout: float,
batch_size: int = None,
max_workers: int = None
) -> List[Any]:
"""
Run multiple agents concurrently with a timeout for each agent.
Args:
agents: List of Agent instances
task: Task string to execute
timeout: Timeout in seconds for each agent
batch_size: Number of agents to run in parallel
max_workers: Maximum number of threads
Returns:
List of outputs (None for timed out agents)
"""
cpu_cores = cpu_count()
batch_size = batch_size or cpu_cores
max_workers = max_workers or cpu_cores * 2
results = []
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for i in range(0, len(agents), batch_size):
batch = agents[i : i + batch_size]
batch_results = loop.run_until_complete(
asyncio.gather(
*(run_agent_with_timeout(agent, task, timeout, executor)
for agent in batch)
)
)
results.extend(batch_results)
return results
@dataclass
class ResourceMetrics:
cpu_percent: float
memory_percent: float
active_threads: int
def get_system_metrics() -> ResourceMetrics:
"""Get current system resource usage"""
return ResourceMetrics(
cpu_percent=psutil.cpu_percent(),
memory_percent=psutil.virtual_memory().percent,
active_threads=threading.active_count()
)
@profile_func
def run_agents_with_resource_monitoring(
agents: List[AgentType],
task: str,
cpu_threshold: float = 90.0,
memory_threshold: float = 90.0,
check_interval: float = 1.0
) -> List[Any]:
"""
Run agents with system resource monitoring and adaptive batch sizing.
Args:
agents: List of Agent instances
task: Task string to execute
cpu_threshold: Max CPU usage percentage
memory_threshold: Max memory usage percentage
check_interval: Resource check interval in seconds
Returns:
List of outputs from each agent
"""
async def monitor_resources():
while True:
metrics = get_system_metrics()
if metrics.cpu_percent > cpu_threshold or metrics.memory_percent > memory_threshold:
# Reduce batch size or pause execution
pass
await asyncio.sleep(check_interval)
# Implementation details...
# # Example usage:
# # Initialize your agents with the same model to avoid re-creating it
# agents = [
# Agent(
# agent_name=f"Financial-Analysis-Agent_parallel_swarm{i}",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
# llm=model,
# max_loops=1,
# autosave=True,
# dashboard=False,
# verbose=False,
# dynamic_temperature_enabled=False,
# saved_state_path=f"finance_agent_{i}.json",
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# return_step_meta=False,
# )
# for i in range(5) # Assuming you want 10 agents
# ]
# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria"
# outputs = run_agents_concurrently(agents, task)
# for i, output in enumerate(outputs):
# print(f"Output from agent {i+1}:\n{output}")

@ -1,5 +1,5 @@
from multiprocessing import Manager, Pool, cpu_count from multiprocessing import Manager, Pool, cpu_count
from typing import Sequence from typing import Sequence, Union, Callable
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.base_workflow import BaseWorkflow from swarms.structs.base_workflow import BaseWorkflow
@ -49,7 +49,7 @@ class MultiProcessWorkflow(BaseWorkflow):
self, self,
max_workers: int = 5, max_workers: int = 5,
autosave: bool = True, autosave: bool = True,
agents: Sequence[Agent] = None, agents: Sequence[Union[Agent, Callable]] = None,
*args, *args,
**kwargs, **kwargs,
): ):

@ -1,120 +0,0 @@
import os
# from swarms.structs. import OpenAIChat
import asyncio
from swarms.utils.calculate_func_metrics import profile_func
# Function to run a single agent on the task (synchronous)
def run_single_agent(agent, task):
return agent.run(task)
# Asynchronous wrapper for agent tasks
async def run_agent_async(agent, task):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, run_single_agent, agent, task
)
# Asynchronous function to run agents concurrently
async def run_agents_concurrently_async(agents, task: str):
"""
Run multiple agents concurrently on the same task with optimized performance.
:param agents: List of Agent instances to run concurrently.
:param task: The task string to execute by all agents.
:return: A list of outputs from each agent.
"""
# Run all agents asynchronously using asyncio.gather
results = await asyncio.gather(
*(run_agent_async(agent, task) for agent in agents)
)
return results
# Function to manage the overall process and batching
@profile_func
def run_agents_concurrently(agents, task: str, batch_size: int = 5):
"""
Manage and run multiple agents concurrently in batches, with optimized performance.
:param agents: List of Agent instances to run concurrently.
:param task: The task string to execute by all agents.
:param batch_size: Number of agents to run in parallel in each batch.
:return: A list of outputs from each agent.
"""
results = []
loop = asyncio.get_event_loop()
batch_size = (
os.cpu_count() if batch_size > os.cpu_count() else batch_size
)
# Process agents in batches to avoid overwhelming system resources
for i in range(0, len(agents), batch_size):
batch = agents[i : i + batch_size]
batch_results = loop.run_until_complete(
run_agents_concurrently_async(batch, task)
)
results.extend(batch_results)
return results
# # Example usage:
# # Initialize your agents with the same model to avoid re-creating it
# agents = [
# Agent(
# agent_name=f"Financial-Analysis-Agent_parallel_swarm{i}",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
# llm=model,
# max_loops=1,
# autosave=True,
# dashboard=False,
# verbose=False,
# dynamic_temperature_enabled=False,
# saved_state_path=f"finance_agent_{i}.json",
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# return_step_meta=False,
# )
# for i in range(5) # Assuming you want 10 agents
# ]
# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria"
# outputs = run_agents_concurrently(agents, task)
# for i, output in enumerate(outputs):
# print(f"Output from agent {i+1}:\n{output}")
# # Output from agent 2:
# # execution_time=12.89196228981018 memory_usage=-294.9375 cpu_usage=-10.3 io_operations=23309 function_calls=1
# # execution_time=11.810921907424927 memory_usage=-242.734375 cpu_usage=-26.4 io_operations=10752 function_calls=1
# # Parallel
# # execution_time=18.79391312599182 memory_usage=-342.9375 cpu_usage=-2.5 io_operations=59518 function_calls=1
# # # Multiprocess
# # 2024-08-22T14:49:33.986491-0400 Function metrics: {
# # "execution_time": 24.783875942230225,
# # "memory_usage": -286.734375,
# # "cpu_usage": -24.6,
# # "io_operations": 17961,
# # "function_calls": 1
# # }
# # Latest
# # Analysis-Agent_parallel_swarm4_state.json
# # 2024-08-22T15:43:11.800970-0400 Function metrics: {
# # "execution_time": 11.062030792236328,
# # "memory_usage": -249.5625,
# # "cpu_usage": -15.700000000000003,
# # "io_operations": 13439,
# # "function_calls": 1
# # }

@ -1,130 +0,0 @@
import asyncio
from typing import List, Any
from swarms import Agent
from multiprocessing import cpu_count
from swarms.utils.calculate_func_metrics import profile_func
# Use uvloop for faster asyncio event loop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
def run_single_agent(agent: Agent, task: str) -> Any:
"""
Run a single agent on the given task.
Args:
agent (Agent): The agent to run.
task (str): The task for the agent to perform.
Returns:
Any: The result of the agent's execution.
"""
return agent.run(task)
async def run_agent_async(agent: Agent, task: str) -> Any:
"""
Asynchronous wrapper for agent tasks.
Args:
agent (Agent): The agent to run asynchronously.
task (str): The task for the agent to perform.
Returns:
Any: The result of the agent's execution.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, run_single_agent, agent, task
)
async def run_agents_concurrently_async(
agents: List[Agent], task: str
) -> List[Any]:
"""
Run multiple agents concurrently on the same task with optimized performance.
Args:
agents (List[Agent]): List of Agent instances to run concurrently.
task (str): The task string to execute by all agents.
Returns:
List[Any]: A list of outputs from each agent.
"""
results = await asyncio.gather(
*(run_agent_async(agent, task) for agent in agents)
)
return results
@profile_func
def run_agents_concurrently_multiprocess(
agents: List[Agent], task: str, batch_size: int = cpu_count()
) -> List[Any]:
"""
Manage and run multiple agents concurrently in batches, with optimized performance.
Args:
agents (List[Agent]): List of Agent instances to run concurrently.
task (str): The task string to execute by all agents.
batch_size (int, optional): Number of agents to run in parallel in each batch.
Defaults to the number of CPU cores.
Returns:
List[Any]: A list of outputs from each agent.
"""
results = []
loop = asyncio.get_event_loop()
# Process agents in batches to avoid overwhelming system resources
for i in range(0, len(agents), batch_size):
batch = agents[i : i + batch_size]
batch_results = loop.run_until_complete(
run_agents_concurrently_async(batch, task)
)
results.extend(batch_results)
return results
# # # Example usage:
# # Initialize your agents with the same model to avoid re-creating it
# agents = [
# Agent(
# agent_name=f"Financial-Analysis-Agent_new_parallel_swarm_test{i}",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
# llm=model,
# max_loops=1,
# autosave=True,
# dashboard=False,
# verbose=False,
# dynamic_temperature_enabled=False,
# saved_state_path=f"finance_agent_{i}.json",
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# return_step_meta=False,
# )
# for i in range(5) # Assuming you want 10 agents
# ]
# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria"
# outputs = run_agents_concurrently_multiprocess(
# agents,
# task,
# )
# for i, output in enumerate(outputs):
# print(f"Output from agent {i+1}:\n{output}")
# # execution_time=15.958749055862427 memory_usage=-328.046875 cpu_usage=-2.5999999999999943 io_operations=81297 function_calls=1
# # Analysis-Agent_new_parallel_swarm_test1_state.json
# # 2024-08-22T15:42:12.463246-0400 Function metrics: {
# # "execution_time": 15.958749055862427,
# # "memory_usage": -328.046875,
# # "cpu_usage": -2.5999999999999943,
# # "io_operations": 81297,
# "function_calls": 1
# }

@ -14,6 +14,9 @@ class SequentialWorkflow(BaseSwarm):
max_loops (int, optional): The maximum number of loops to execute the workflow. Defaults to 1. max_loops (int, optional): The maximum number of loops to execute the workflow. Defaults to 1.
*args: Variable length argument list. *args: Variable length argument list.
**kwargs: Arbitrary keyword arguments. **kwargs: Arbitrary keyword arguments.
Raises:
ValueError: If agents list is None or empty, or if max_loops is 0
""" """
def __init__( def __init__(
@ -25,26 +28,40 @@ class SequentialWorkflow(BaseSwarm):
*args, *args,
**kwargs, **kwargs,
): ):
super().__init__( if agents is None or len(agents) == 0:
name=name, raise ValueError("Agents list cannot be None or empty")
description=description,
agents=agents, if max_loops == 0:
*args, raise ValueError("max_loops cannot be 0")
**kwargs,
) try:
self.name = name super().__init__(
self.description = description name=name,
self.agents = agents description=description,
self.flow = " -> ".join(agent.agent_name for agent in agents) agents=agents,
self.agent_rearrange = AgentRearrange( *args,
name=name, **kwargs,
description=description, )
agents=agents, self.name = name
flow=self.flow, self.description = description
max_loops=max_loops, self.agents = agents
*args, self.flow = " -> ".join(
**kwargs, agent.agent_name for agent in agents
) )
self.agent_rearrange = AgentRearrange(
name=name,
description=description,
agents=agents,
flow=self.flow,
max_loops=max_loops,
*args,
**kwargs,
)
except Exception as e:
logger.error(
f"Error initializing SequentialWorkflow: {str(e)}"
)
raise
def run(self, task: str) -> str: def run(self, task: str) -> str:
""" """
@ -55,7 +72,14 @@ class SequentialWorkflow(BaseSwarm):
Returns: Returns:
str: The final result after processing through all agents. str: The final result after processing through all agents.
Raises:
ValueError: If task is None or empty
Exception: If any error occurs during task execution
""" """
if not task or not isinstance(task, str):
raise ValueError("Task must be a non-empty string")
try: try:
logger.info( logger.info(
f"Running task with dynamic flow: {self.flow}" f"Running task with dynamic flow: {self.flow}"

@ -1,107 +1,338 @@
from typing import List, Tuple, Optional # from typing import List, Tuple, Optional
import numpy as np # import numpy as np
import torch # import torch
from transformers import AutoTokenizer, AutoModel # from transformers import AutoTokenizer, AutoModel
# from pydantic import BaseModel, Field
# from loguru import logger
# import json
# from tenacity import retry, stop_after_attempt, wait_exponential
# from uuid import uuid4
# class SwarmType(BaseModel):
# name: str
# description: str
# embedding: Optional[List[float]] = Field(
# default=None, exclude=True
# )
# class SwarmMatcherConfig(BaseModel):
# model_name: str = "sentence-transformers/all-MiniLM-L6-v2"
# embedding_dim: int = (
# 512 # Dimension of the sentence-transformers model
# )
# class SwarmMatcher:
# """
# A class for matching tasks to swarm types based on their descriptions.
# It utilizes a transformer model to generate embeddings for task and swarm type descriptions,
# and then calculates the dot product to find the best match.
# """
# def __init__(self, config: SwarmMatcherConfig):
# """
# Initializes the SwarmMatcher with a configuration.
# Args:
# config (SwarmMatcherConfig): The configuration for the SwarmMatcher.
# """
# logger.add("swarm_matcher_debug.log", level="DEBUG")
# logger.debug("Initializing SwarmMatcher")
# try:
# self.config = config
# self.tokenizer = AutoTokenizer.from_pretrained(
# config.model_name
# )
# self.model = AutoModel.from_pretrained(config.model_name)
# self.swarm_types: List[SwarmType] = []
# logger.debug("SwarmMatcher initialized successfully")
# except Exception as e:
# logger.error(f"Error initializing SwarmMatcher: {str(e)}")
# raise
# @retry(
# stop=stop_after_attempt(3),
# wait=wait_exponential(multiplier=1, min=4, max=10),
# )
# def get_embedding(self, text: str) -> np.ndarray:
# """
# Generates an embedding for a given text using the configured model.
# Args:
# text (str): The text for which to generate an embedding.
# Returns:
# np.ndarray: The embedding vector for the text.
# """
# logger.debug(f"Getting embedding for text: {text[:50]}...")
# try:
# inputs = self.tokenizer(
# text,
# return_tensors="pt",
# padding=True,
# truncation=True,
# max_length=512,
# )
# with torch.no_grad():
# outputs = self.model(**inputs)
# embedding = (
# outputs.last_hidden_state.mean(dim=1)
# .squeeze()
# .numpy()
# )
# logger.debug("Embedding generated successfully")
# return embedding
# except Exception as e:
# logger.error(f"Error generating embedding: {str(e)}")
# raise
# def add_swarm_type(self, swarm_type: SwarmType):
# """
# Adds a swarm type to the list of swarm types, generating an embedding for its description.
# Args:
# swarm_type (SwarmType): The swarm type to add.
# """
# logger.debug(f"Adding swarm type: {swarm_type.name}")
# try:
# embedding = self.get_embedding(swarm_type.description)
# swarm_type.embedding = embedding.tolist()
# self.swarm_types.append(swarm_type)
# logger.info(f"Added swarm type: {swarm_type.name}")
# except Exception as e:
# logger.error(
# f"Error adding swarm type {swarm_type.name}: {str(e)}"
# )
# raise
# def find_best_match(self, task: str) -> Tuple[str, float]:
# """
# Finds the best match for a given task among the registered swarm types.
# Args:
# task (str): The task for which to find the best match.
# Returns:
# Tuple[str, float]: A tuple containing the name of the best matching swarm type and the score.
# """
# logger.debug(f"Finding best match for task: {task[:50]}...")
# try:
# task_embedding = self.get_embedding(task)
# best_match = None
# best_score = -float("inf")
# for swarm_type in self.swarm_types:
# score = np.dot(
# task_embedding, np.array(swarm_type.embedding)
# )
# if score > best_score:
# best_score = score
# best_match = swarm_type
# logger.info(
# f"Best match for task: {best_match.name} (score: {best_score})"
# )
# return best_match.name, float(best_score)
# except Exception as e:
# logger.error(
# f"Error finding best match for task: {str(e)}"
# )
# raise
# def auto_select_swarm(self, task: str) -> str:
# """
# Automatically selects the best swarm type for a given task based on their descriptions.
# Args:
# task (str): The task for which to select a swarm type.
# Returns:
# str: The name of the selected swarm type.
# """
# logger.debug(f"Auto-selecting swarm for task: {task[:50]}...")
# best_match, score = self.find_best_match(task)
# logger.info(f"Task: {task}")
# logger.info(f"Selected Swarm Type: {best_match}")
# logger.info(f"Confidence Score: {score:.2f}")
# return best_match
# def run_multiple(self, tasks: List[str], *args, **kwargs) -> str:
# swarms = []
# for task in tasks:
# output = self.auto_select_swarm(task)
# # Append
# swarms.append(output)
# return swarms
# def save_swarm_types(self, filename: str):
# """
# Saves the registered swarm types to a JSON file.
# Args:
# filename (str): The name of the file to which to save the swarm types.
# """
# try:
# with open(filename, "w") as f:
# json.dump([st.dict() for st in self.swarm_types], f)
# logger.info(f"Saved swarm types to {filename}")
# except Exception as e:
# logger.error(f"Error saving swarm types: {str(e)}")
# raise
# def load_swarm_types(self, filename: str):
# """
# Loads swarm types from a JSON file.
# Args:
# filename (str): The name of the file from which to load the swarm types.
# """
# try:
# with open(filename, "r") as f:
# swarm_types_data = json.load(f)
# self.swarm_types = [
# SwarmType(**st) for st in swarm_types_data
# ]
# logger.info(f"Loaded swarm types from {filename}")
# except Exception as e:
# logger.error(f"Error loading swarm types: {str(e)}")
# raise
# def initialize_swarm_types(matcher: SwarmMatcher):
# logger.debug("Initializing swarm types")
# swarm_types = [
# SwarmType(
# name="AgentRearrange",
# description="Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation and minimizing bottlenecks. Keywords: orchestration, coordination, pipeline optimization, task scheduling, resource allocation, workflow management, agent organization, process optimization",
# ),
# SwarmType(
# name="MixtureOfAgents",
# description="Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach to problem-solving and leveraging individual strengths. Keywords: multi-agent system, expert collaboration, distributed intelligence, collective problem solving, agent specialization, team coordination, hybrid approaches, knowledge synthesis",
# ),
# SwarmType(
# name="SpreadSheetSwarm",
# description="Collaborative data processing and analysis in a spreadsheet-like environment, facilitating real-time data sharing and visualization. Keywords: data analysis, tabular processing, collaborative editing, data transformation, spreadsheet operations, data visualization, real-time collaboration, structured data",
# ),
# SwarmType(
# name="SequentialWorkflow",
# description="Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical approach to task execution. Keywords: linear processing, waterfall methodology, step-by-step execution, ordered tasks, sequential operations, process flow, systematic approach, staged execution",
# ),
# SwarmType(
# name="ConcurrentWorkflow",
# description="Process multiple tasks or data sources concurrently in parallel, maximizing productivity and reducing processing time. Keywords: parallel processing, multi-threading, asynchronous execution, distributed computing, concurrent operations, simultaneous tasks, parallel workflows, scalable processing",
# ),
# # SwarmType(
# # name="HierarchicalSwarm",
# # description="Organize agents in a hierarchical structure with clear reporting lines and delegation of responsibilities. Keywords: management hierarchy, organizational structure, delegation, supervision, chain of command, tiered organization, structured coordination",
# # ),
# # SwarmType(
# # name="AdaptiveSwarm",
# # description="Dynamically adjust agent behavior and swarm configuration based on task requirements and performance feedback. Keywords: dynamic adaptation, self-optimization, feedback loops, learning systems, flexible configuration, responsive behavior, adaptive algorithms",
# # ),
# # SwarmType(
# # name="ConsensusSwarm",
# # description="Achieve group decisions through consensus mechanisms and voting protocols among multiple agents. Keywords: group decision making, voting systems, collective intelligence, agreement protocols, democratic processes, collaborative decisions",
# # ),
# ]
# for swarm_type in swarm_types:
# matcher.add_swarm_type(swarm_type)
# logger.debug("Swarm types initialized")
# def swarm_matcher(task: str, *args, **kwargs):
# """
# Runs the SwarmMatcher example with predefined tasks and swarm types.
# """
# config = SwarmMatcherConfig()
# matcher = SwarmMatcher(config)
# initialize_swarm_types(matcher)
# # matcher.save_swarm_types(f"swarm_logs/{uuid4().hex}.json")
# swarm_type = matcher.auto_select_swarm(task)
# logger.info(f"{swarm_type}")
# return swarm_type
from typing import List, Tuple, Dict
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from loguru import logger from loguru import logger
from uuid import uuid4
import chromadb
import json import json
from tenacity import retry, stop_after_attempt, wait_exponential from tenacity import retry, stop_after_attempt, wait_exponential
# Ensure you have the necessary libraries installed:
# pip install torch transformers pydantic loguru tenacity
class SwarmType(BaseModel): class SwarmType(BaseModel):
"""A swarm type with its name, description and optional metadata"""
id: str = Field(default_factory=lambda: str(uuid4()))
name: str name: str
description: str description: str
embedding: Optional[List[float]] = Field( metadata: Dict = Field(default_factory=dict)
default=None, exclude=True
)
class SwarmMatcherConfig(BaseModel): class SwarmMatcherConfig(BaseModel):
model_name: str = "sentence-transformers/all-MiniLM-L6-v2" """Configuration for the SwarmMatcher"""
embedding_dim: int = (
512 # Dimension of the sentence-transformers model collection_name: str = "swarm_types"
distance_metric: str = "cosine" # or "l2" or "ip"
embedding_function: str = (
"sentence-transformers/all-mpnet-base-v2" # Better model than MiniLM
) )
persist_directory: str = "./chroma_db"
class SwarmMatcher: class SwarmMatcher:
""" """
A class for matching tasks to swarm types based on their descriptions. An improved swarm matcher that uses ChromaDB for better vector similarity search.
It utilizes a transformer model to generate embeddings for task and swarm type descriptions, Features:
and then calculates the dot product to find the best match. - Persistent storage of embeddings
- Better vector similarity search with multiple distance metrics
- Improved embedding model
- Metadata filtering capabilities
- Batch operations support
""" """
def __init__(self, config: SwarmMatcherConfig): def __init__(self, config: SwarmMatcherConfig):
""" """Initialize the improved swarm matcher"""
Initializes the SwarmMatcher with a configuration. logger.add("swarm_matcher.log", rotation="100 MB")
self.config = config
Args:
config (SwarmMatcherConfig): The configuration for the SwarmMatcher.
"""
logger.add("swarm_matcher_debug.log", level="DEBUG")
logger.debug("Initializing SwarmMatcher")
try:
self.config = config
self.tokenizer = AutoTokenizer.from_pretrained(
config.model_name
)
self.model = AutoModel.from_pretrained(config.model_name)
self.swarm_types: List[SwarmType] = []
logger.debug("SwarmMatcher initialized successfully")
except Exception as e:
logger.error(f"Error initializing SwarmMatcher: {str(e)}")
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def get_embedding(self, text: str) -> np.ndarray:
"""
Generates an embedding for a given text using the configured model.
Args: # Initialize ChromaDB client with persistence
text (str): The text for which to generate an embedding. self.chroma_client = chromadb.Client()
Returns: # Get or create collection
np.ndarray: The embedding vector for the text.
"""
logger.debug(f"Getting embedding for text: {text[:50]}...")
try: try:
inputs = self.tokenizer( self.collection = self.chroma_client.get_collection(
text, name=config.collection_name,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512,
) )
with torch.no_grad(): except ValueError:
outputs = self.model(**inputs) self.collection = self.chroma_client.create_collection(
embedding = ( name=config.collection_name,
outputs.last_hidden_state.mean(dim=1) metadata={"hnsw:space": config.distance_metric},
.squeeze()
.numpy()
) )
logger.debug("Embedding generated successfully")
return embedding
except Exception as e:
logger.error(f"Error generating embedding: {str(e)}")
raise
def add_swarm_type(self, swarm_type: SwarmType): logger.info(
""" f"Initialized SwarmMatcher with collection '{config.collection_name}'"
Adds a swarm type to the list of swarm types, generating an embedding for its description. )
Args: def add_swarm_type(self, swarm_type: SwarmType) -> None:
swarm_type (SwarmType): The swarm type to add. """Add a single swarm type to the collection"""
"""
logger.debug(f"Adding swarm type: {swarm_type.name}")
try: try:
embedding = self.get_embedding(swarm_type.description) self.collection.add(
swarm_type.embedding = embedding.tolist() ids=[swarm_type.id],
self.swarm_types.append(swarm_type) documents=[swarm_type.description],
metadatas=[
{"name": swarm_type.name, **swarm_type.metadata}
],
)
logger.info(f"Added swarm type: {swarm_type.name}") logger.info(f"Added swarm type: {swarm_type.name}")
except Exception as e: except Exception as e:
logger.error( logger.error(
@ -109,142 +340,239 @@ class SwarmMatcher:
) )
raise raise
def find_best_match(self, task: str) -> Tuple[str, float]: def add_swarm_types(self, swarm_types: List[SwarmType]) -> None:
""" """Add multiple swarm types in batch"""
Finds the best match for a given task among the registered swarm types.
Args:
task (str): The task for which to find the best match.
Returns:
Tuple[str, float]: A tuple containing the name of the best matching swarm type and the score.
"""
logger.debug(f"Finding best match for task: {task[:50]}...")
try: try:
task_embedding = self.get_embedding(task) self.collection.add(
best_match = None ids=[st.id for st in swarm_types],
best_score = -float("inf") documents=[st.description for st in swarm_types],
for swarm_type in self.swarm_types: metadatas=[
score = np.dot( {"name": st.name, **st.metadata}
task_embedding, np.array(swarm_type.embedding) for st in swarm_types
) ],
if score > best_score:
best_score = score
best_match = swarm_type
logger.info(
f"Best match for task: {best_match.name} (score: {best_score})"
) )
return best_match.name, float(best_score) logger.info(f"Added {len(swarm_types)} swarm types")
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Error finding best match for task: {str(e)}" f"Error adding swarm types in batch: {str(e)}"
) )
raise raise
def auto_select_swarm(self, task: str) -> str: @retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def find_best_matches(
self,
task: str,
n_results: int = 3,
score_threshold: float = 0.7,
) -> List[Tuple[str, float]]:
""" """
Automatically selects the best swarm type for a given task based on their descriptions. Find the best matching swarm types for a given task
Returns multiple matches with their scores
Args:
task (str): The task for which to select a swarm type.
Returns:
str: The name of the selected swarm type.
""" """
logger.debug(f"Auto-selecting swarm for task: {task[:50]}...") try:
best_match, score = self.find_best_match(task) results = self.collection.query(
logger.info(f"Task: {task}") query_texts=[task],
logger.info(f"Selected Swarm Type: {best_match}") n_results=n_results,
logger.info(f"Confidence Score: {score:.2f}") include=["metadatas", "distances"],
return best_match )
def run_multiple(self, tasks: List[str], *args, **kwargs) -> str:
swarms = []
for task in tasks: matches = []
output = self.auto_select_swarm(task) for metadata, distance in zip(
results["metadatas"][0], results["distances"][0]
):
# Convert distance to similarity score (1 - normalized_distance)
score = 1 - (
distance / 2
) # Normalize cosine distance to [0,1]
if score >= score_threshold:
matches.append((metadata["name"], score))
# Append logger.info(f"Found {len(matches)} matches for task")
swarms.append(output) return matches
return swarms except Exception as e:
logger.error(f"Error finding matches for task: {str(e)}")
raise
def save_swarm_types(self, filename: str): def auto_select_swarm(self, task: str) -> str:
""" """
Saves the registered swarm types to a JSON file. Automatically select the best swarm type for a task
Returns only the top match
Args:
filename (str): The name of the file to which to save the swarm types.
""" """
matches = self.find_best_matches(task, n_results=1)
if not matches:
logger.warning("No suitable matches found for task")
return "SequentialWorkflow" # Default fallback
best_match, score = matches[0]
logger.info(
f"Selected swarm type '{best_match}' with confidence {score:.3f}"
)
return best_match
def run_multiple(self, tasks: List[str]) -> List[str]:
"""Process multiple tasks in batch"""
return [self.auto_select_swarm(task) for task in tasks]
def save_swarm_types(self, filename: str) -> None:
"""Export swarm types to JSON"""
try: try:
all_data = self.collection.get(
include=["metadatas", "documents"]
)
swarm_types = [
SwarmType(
id=id_,
name=metadata["name"],
description=document,
metadata={
k: v
for k, v in metadata.items()
if k != "name"
},
)
for id_, metadata, document in zip(
all_data["ids"],
all_data["metadatas"],
all_data["documents"],
)
]
with open(filename, "w") as f: with open(filename, "w") as f:
json.dump([st.dict() for st in self.swarm_types], f) json.dump(
[st.dict() for st in swarm_types], f, indent=2
)
logger.info(f"Saved swarm types to {filename}") logger.info(f"Saved swarm types to {filename}")
except Exception as e: except Exception as e:
logger.error(f"Error saving swarm types: {str(e)}") logger.error(f"Error saving swarm types: {str(e)}")
raise raise
def load_swarm_types(self, filename: str): def load_swarm_types(self, filename: str) -> None:
""" """Import swarm types from JSON"""
Loads swarm types from a JSON file.
Args:
filename (str): The name of the file from which to load the swarm types.
"""
try: try:
with open(filename, "r") as f: with open(filename, "r") as f:
swarm_types_data = json.load(f) swarm_types_data = json.load(f)
self.swarm_types = [ swarm_types = [SwarmType(**st) for st in swarm_types_data]
SwarmType(**st) for st in swarm_types_data self.add_swarm_types(swarm_types)
]
logger.info(f"Loaded swarm types from {filename}") logger.info(f"Loaded swarm types from {filename}")
except Exception as e: except Exception as e:
logger.error(f"Error loading swarm types: {str(e)}") logger.error(f"Error loading swarm types: {str(e)}")
raise raise
def initialize_swarm_types(matcher: SwarmMatcher): def initialize_default_swarm_types(matcher: SwarmMatcher) -> None:
logger.debug("Initializing swarm types") """Initialize the matcher with default swarm types"""
swarm_types = [ swarm_types = [
SwarmType( SwarmType(
name="AgentRearrange", name="AgentRearrange",
description="Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation and minimizing bottlenecks", description="""
Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation
and minimizing bottlenecks. Specialized in orchestration, coordination, pipeline optimization,
task scheduling, resource allocation, workflow management, agent organization, and process optimization.
Best for tasks requiring complex agent interactions and workflow optimization.
""",
metadata={
"category": "optimization",
"complexity": "high",
},
), ),
SwarmType( SwarmType(
name="MixtureOfAgents", name="MixtureOfAgents",
description="Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach to problem-solving and leveraging individual strengths", description="""
Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach
to problem-solving and leveraging individual strengths. Focuses on multi-agent systems,
expert collaboration, distributed intelligence, collective problem solving, agent specialization,
team coordination, hybrid approaches, and knowledge synthesis. Ideal for complex problems
requiring multiple areas of expertise.
""",
metadata={
"category": "collaboration",
"complexity": "high",
},
), ),
SwarmType( SwarmType(
name="SpreadSheetSwarm", name="SpreadSheetSwarm",
description="Collaborative data processing and analysis in a spreadsheet-like environment, facilitating real-time data sharing and visualization", description="""
Collaborative data processing and analysis in a spreadsheet-like environment, facilitating
real-time data sharing and visualization. Specializes in data analysis, tabular processing,
collaborative editing, data transformation, spreadsheet operations, data visualization,
real-time collaboration, and structured data handling. Perfect for data-intensive tasks
requiring structured analysis.
""",
metadata={
"category": "data_processing",
"complexity": "medium",
},
), ),
SwarmType( SwarmType(
name="SequentialWorkflow", name="SequentialWorkflow",
description="Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical approach to task execution", description="""
Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical
approach to task execution. Focuses on linear processing, waterfall methodology, step-by-step
execution, ordered tasks, sequential operations, process flow, systematic approach, and staged
execution. Best for tasks requiring strict order and dependencies.
""",
metadata={"category": "workflow", "complexity": "low"},
), ),
SwarmType( SwarmType(
name="ConcurrentWorkflow", name="ConcurrentWorkflow",
description="Process multiple tasks or data sources concurrently in parallel, maximizing productivity and reducing processing time", description="""
Process multiple tasks or data sources concurrently in parallel, maximizing productivity
and reducing processing time. Specializes in parallel processing, multi-threading,
asynchronous execution, distributed computing, concurrent operations, simultaneous tasks,
parallel workflows, and scalable processing. Ideal for independent tasks that can be
processed simultaneously.
""",
metadata={"category": "workflow", "complexity": "medium"},
), ),
] ]
for swarm_type in swarm_types: matcher.add_swarm_types(swarm_types)
matcher.add_swarm_type(swarm_type) logger.info("Initialized default swarm types")
logger.debug("Swarm types initialized")
def swarm_matcher(task: str, *args, **kwargs): def create_swarm_matcher(
""" persist_dir: str = "./chroma_db",
Runs the SwarmMatcher example with predefined tasks and swarm types. collection_name: str = "swarm_types",
""" ) -> SwarmMatcher:
config = SwarmMatcherConfig() """Convenience function to create and initialize a swarm matcher"""
config = SwarmMatcherConfig(
persist_directory=persist_dir, collection_name=collection_name
)
matcher = SwarmMatcher(config) matcher = SwarmMatcher(config)
initialize_swarm_types(matcher) initialize_default_swarm_types(matcher)
return matcher
matcher.save_swarm_types("swarm_types.json")
swarm_type = matcher.auto_select_swarm(task) # Example usage
def swarm_matcher(task: str) -> str:
# Create and initialize matcher
matcher = create_swarm_matcher()
logger.info(f"{swarm_type}") swarm_type = matcher.auto_select_swarm(task)
print(f"Task: {task}\nSelected Swarm: {swarm_type}\n")
return swarm_type return swarm_type
# # Example usage
# if __name__ == "__main__":
# # Create and initialize matcher
# matcher = create_swarm_matcher()
# # Example tasks
# tasks = [
# "Analyze this spreadsheet of sales data and create visualizations",
# "Coordinate multiple AI agents to solve a complex problem",
# "Process these tasks one after another in a specific order",
# "Write multiple blog posts about the latest advancements in swarm intelligence all at once",
# "Write a blog post about the latest advancements in swarm intelligence",
# ]
# # Process tasks
# for task in tasks:
# swarm_type = matcher.auto_select_swarm(task)
# print(f"Task: {task}\nSelected Swarm: {swarm_type}\n")

@ -13,6 +13,7 @@ from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
from tenacity import retry, stop_after_attempt, wait_fixed from tenacity import retry, stop_after_attempt, wait_fixed
from swarms.structs.swarm_matcher import swarm_matcher from swarms.structs.swarm_matcher import swarm_matcher
from swarms.prompts.ag_prompt import aggregator_system_prompt
SwarmType = Literal[ SwarmType = Literal[
"AgentRearrange", "AgentRearrange",
@ -57,6 +58,7 @@ class SwarmRouter:
swarm (Union[AgentRearrange, MixtureOfAgents, SpreadSheetSwarm, SequentialWorkflow, ConcurrentWorkflow]): swarm (Union[AgentRearrange, MixtureOfAgents, SpreadSheetSwarm, SequentialWorkflow, ConcurrentWorkflow]):
The instantiated swarm object. The instantiated swarm object.
logs (List[SwarmLog]): A list of log entries captured during operations. logs (List[SwarmLog]): A list of log entries captured during operations.
auto_generate_prompt (bool): A flag to enable/disable auto generation of prompts.
Available Swarm Types: Available Swarm Types:
- AgentRearrange: Rearranges agents for optimal task execution. - AgentRearrange: Rearranges agents for optimal task execution.
@ -99,6 +101,37 @@ class SwarmRouter:
f"SwarmRouter initialized with swarm type: {swarm_type}", f"SwarmRouter initialized with swarm type: {swarm_type}",
) )
self.activate_ape()
def activate_ape(self):
"""Activate automatic prompt engineering for agents that support it"""
try:
logger.info("Activating automatic prompt engineering...")
activated_count = 0
for agent in self.agents:
if hasattr(agent, "auto_generate_prompt"):
agent.auto_generate_prompt = (
self.auto_generate_prompts
)
activated_count += 1
logger.debug(
f"Activated APE for agent: {agent.name if hasattr(agent, 'name') else 'unnamed'}"
)
logger.info(
f"Successfully activated APE for {activated_count} agents"
)
self._log(
"info",
f"Activated automatic prompt engineering for {activated_count} agents",
)
except Exception as e:
error_msg = f"Error activating automatic prompt engineering: {str(e)}"
logger.error(error_msg)
self._log("error", error_msg)
raise RuntimeError(error_msg) from e
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) @retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def reliability_check(self): def reliability_check(self):
logger.info("Logger initializing checks") logger.info("Logger initializing checks")
@ -137,7 +170,9 @@ class SwarmRouter:
ValueError: If an invalid swarm type is provided. ValueError: If an invalid swarm type is provided.
""" """
if self.swarm_type == "auto": if self.swarm_type == "auto":
self.swarm_type = swarm_matcher(task) self.swarm_type = str(swarm_matcher(task))
self._create_swarm(self.swarm_type)
elif self.swarm_type == "AgentRearrange": elif self.swarm_type == "AgentRearrange":
return AgentRearrange( return AgentRearrange(
@ -154,8 +189,9 @@ class SwarmRouter:
return MixtureOfAgents( return MixtureOfAgents(
name=self.name, name=self.name,
description=self.description, description=self.description,
agents=self.agents, reference_agents=self.agents,
aggregator_agent=[self.agents[-1]], aggregator_system_prompt=aggregator_system_prompt.get_prompt(),
aggregator_agent=self.agents[-1],
layers=self.max_loops, layers=self.max_loops,
*args, *args,
**kwargs, **kwargs,

@ -1,39 +1,19 @@
import os
import uuid import uuid
from collections import Counter from collections import Counter
from datetime import datetime from datetime import datetime
from typing import Any, List, Optional from typing import Any, List, Optional
from dotenv import load_dotenv
from loguru import logger from loguru import logger
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from sentence_transformers import SentenceTransformer, util from sentence_transformers import SentenceTransformer, util
from swarm_models import OpenAIChat
from swarms import Agent from swarms import Agent
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Pretrained model for embeddings # Pretrained model for embeddings
embedding_model = SentenceTransformer( embedding_model = SentenceTransformer(
"all-MiniLM-L6-v2" "all-MiniLM-L6-v2"
) # A small, fast model for embedding ) # A small, fast model for embedding
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Pydantic Models for Logging # Pydantic Models for Logging
class AgentLogInput(BaseModel): class AgentLogInput(BaseModel):
@ -83,14 +63,18 @@ class TreeAgent(Agent):
def __init__( def __init__(
self, self,
name: str = None,
description: str = None,
system_prompt: str = None, system_prompt: str = None,
llm: callable = model, llm: callable = None,
agent_name: Optional[str] = None, agent_name: Optional[str] = None,
*args, *args,
**kwargs, **kwargs,
): ):
agent_name = agent_name agent_name = agent_name
super().__init__( super().__init__(
name=name,
description=description,
system_prompt=system_prompt, system_prompt=system_prompt,
llm=llm, llm=llm,
agent_name=agent_name, agent_name=agent_name,

@ -11,10 +11,13 @@ def bootup():
"""Bootup swarms""" """Bootup swarms"""
logging.disable(logging.CRITICAL) logging.disable(logging.CRITICAL)
os.environ["WANDB_SILENT"] = "true" os.environ["WANDB_SILENT"] = "true"
# Dynamically set WORKSPACE_DIR based on the current directory
os.environ["WORKSPACE_DIR"] = os.path.join( # Auto set workspace directory
os.getcwd(), "agent_workspace" workspace_dir = os.path.join(os.getcwd(), "agent_workspace")
) if not os.path.exists(workspace_dir):
os.makedirs(workspace_dir)
os.environ["WORKSPACE_DIR"] = workspace_dir
warnings.filterwarnings("ignore", category=DeprecationWarning) warnings.filterwarnings("ignore", category=DeprecationWarning)
# Use ThreadPoolExecutor to run disable_logging and auto_update concurrently # Use ThreadPoolExecutor to run disable_logging and auto_update concurrently

Loading…
Cancel
Save