pull/631/merge
Your Name 2 months ago
parent 02c77356fa
commit 0bc66981fc

1
.gitignore vendored

@ -12,6 +12,7 @@ dataframe/
static/generated
runs
Financial-Analysis-Agent_state.json
experimental
artifacts_five
encryption
errors

@ -0,0 +1,68 @@
import os
from swarms import Agent
from swarm_models import OpenAIChat
from swarms.structs.agents_available import showcase_available_agents
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize the Claims Director agent
director_agent = Agent(
agent_name="ClaimsDirector",
agent_description="Oversees and coordinates the medical insurance claims processing workflow",
system_prompt="""You are the Claims Director responsible for managing the medical insurance claims process.
Assign and prioritize tasks between claims processors and auditors. Ensure claims are handled efficiently
and accurately while maintaining compliance with insurance policies and regulations.""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="director_agent.json",
)
# Initialize Claims Processor agent
processor_agent = Agent(
agent_name="ClaimsProcessor",
agent_description="Reviews and processes medical insurance claims, verifying coverage and eligibility",
system_prompt="""Review medical insurance claims for completeness and accuracy. Verify patient eligibility,
coverage details, and process claims according to policy guidelines. Flag any claims requiring special review.""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="processor_agent.json",
)
# Initialize Claims Auditor agent
auditor_agent = Agent(
agent_name="ClaimsAuditor",
agent_description="Audits processed claims for accuracy and compliance with policies and regulations",
system_prompt="""Audit processed insurance claims for accuracy and compliance. Review claim decisions,
identify potential fraud or errors, and ensure all processing follows established guidelines and regulations.""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="auditor_agent.json",
)
# Create a list of agents
agents = [director_agent, processor_agent, auditor_agent]
print(showcase_available_agents(agents=agents))

@ -1,125 +0,0 @@
import os
import json
from pydantic import BaseModel, Field
from swarm_models import OpenAIFunctionCaller
from dotenv import load_dotenv
from typing import Any, List
load_dotenv()
class Flow(BaseModel):
id: str = Field(
description="A unique identifier for the flow. This should be a short, descriptive name that captures the main purpose of the flow. Use - to separate words and make it lowercase."
)
plan: str = Field(
description="The comprehensive plan detailing how the flow will accomplish the given task. This should include the high-level strategy, key milestones, and expected outcomes. The plan should clearly articulate what the overall goal is, what success looks like, and how progress will be measured throughout execution."
)
failures_prediction: str = Field(
description="A thorough analysis of potential failure modes and mitigation strategies. This should identify technical risks, edge cases, error conditions, and possible points of failure in the flow. For each identified risk, include specific preventive measures, fallback approaches, and recovery procedures to ensure robustness and reliability."
)
rationale: str = Field(
description="The detailed reasoning and justification for why this specific flow design is optimal for the given task. This should explain the key architectural decisions, tradeoffs considered, alternatives evaluated, and why this approach best satisfies the requirements. Include both technical and business factors that influenced the design."
)
flow: str = Field(
description="The precise execution flow defining how agents interact and coordinate. Use -> to indicate sequential processing where one agent must complete before the next begins (e.g. agent1 -> agent2 -> agent3). Use , to indicate parallel execution where multiple agents can run simultaneously (e.g. agent1 -> agent2, agent3, agent4). The flow should clearly show the dependencies and parallelization opportunities between agents. You must only use the agent names provided in the task description do not make up new agent names and do not use any other formatting."
)
class AgentRearrangeBuilder(BaseModel):
name: str = Field(
description="The name of the swarm. This should be a short, descriptive name that captures the main purpose of the flow."
)
description: str = Field(
description="A brief description of the swarm. This should be a concise summary of the main purpose of the swarm."
)
flows: List[Flow] = Field(
description="A list of flows that are optimal for the given task. Each flow should be a detailed plan, failure prediction, rationale, and execution flow."
)
swarm_flow: str = Field(
description="The flow defining how each team should communicate and coordinate with eachother.Use -> to indicate sequential processing where one id must complete before the next begins (e.g. team1 -> team2 -> team3). Use , to indicate parallel execution where multiple teams can run simultaneously (e.g. team1 -> team2, team3, team4). The flow should clearly show the dependencies and parallelization opportunities between teams. You must only use the team names provided in the id do not make up new team names and do not use any other formatting."
)
# def flow_generator(task: str) -> Flow:
def setup_model(base_model: BaseModel = Flow):
model = OpenAIFunctionCaller(
system_prompt="""You are an expert flow architect specializing in designing multi-agent workflows. Your role is to analyze tasks and create optimal execution flows that coordinate multiple AI agents effectively.
When given a task, you will:
1. Develop a comprehensive plan breaking down the task into logical steps
2. Carefully consider potential failure modes and build in robust error handling
3. Provide clear rationale for your architectural decisions and agent coordination strategy
4. Design a precise flow showing both sequential dependencies and parallel execution opportunities
Your flows should maximize:
- Efficiency through smart parallelization
- Reliability through thorough error handling
- Clarity through well-structured agent interactions
- Effectiveness through strategic task decomposition
Format your flow using -> for sequential steps and , for parallel execution. Be specific about agent roles and interactions.
""",
base_model=base_model,
openai_api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.5,
)
return model
def generate_flow(task: str) -> Any:
model = setup_model()
flow = model.run(task)
print(json.dumps(flow, indent=4))
return flow
def generate_agent_rearrange(task: str) -> Any:
model = setup_model(base_model=AgentRearrangeBuilder)
flow = model.run(task)
print(json.dumps(flow, indent=4))
return flow
if __name__ == "__main__":
# Basic patient diagnosis flow
# generate_flow("Diagnose a patient's symptoms and create a treatment plan. You have 3 agents to use: Diagnostician, Specialist, CareCoordinator")
# # Complex multi-condition case
# generate_flow("""Handle a complex patient case with multiple chronic conditions requiring ongoing care coordination.
# The patient has diabetes, heart disease, and chronic pain.
# Create a comprehensive diagnosis and treatment plan.
# You have 3 agents to use: Diagnostician, Specialist, CareCoordinator""")
# # Emergency trauma case
# generate_flow("""Process an emergency trauma case requiring rapid diagnosis and immediate intervention.
# Patient presents with multiple injuries from a car accident.
# Develop immediate and long-term treatment plans.
# You have 3 agents to use: Diagnostician, Specialist, CareCoordinator""")
# # Long-term care planning
# generate_flow("""Design a 6-month care plan for an elderly patient with declining cognitive function.
# Include regular assessments, specialist consultations, and family coordination.
# You have 3 agents to use: Diagnostician, Specialist, CareCoordinator""")
# # Mental health assessment
# generate_flow("""Conduct a comprehensive mental health assessment and develop treatment strategy.
# Patient shows signs of depression and anxiety with possible underlying conditions.
# Create both immediate intervention and long-term support plans.
# You have 3 agents to use: Diagnostician, Specialist, CareCoordinator""")
generate_agent_rearrange(
"""Build a complete automated hedge fund system.
Design and implement a sophisticated trading strategy incorporating multiple asset classes,
risk management protocols, and automated execution systems.
The system should include:
- Market analysis and research capabilities
- Portfolio optimization and risk management
- Automated trade execution and settlement
- Compliance and regulatory monitoring
- Performance tracking and reporting
- Fund operations and administration
Create a comprehensive architecture that integrates all these components into a fully automated system."""
)

@ -16,51 +16,12 @@ model = OpenAIChat(
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Define specialized system prompts for each agent
DATA_EXTRACTOR_PROMPT = """You are a highly specialized private equity agent focused on data extraction from various documents. Your expertise includes:
1. Extracting key financial metrics (revenue, EBITDA, growth rates, etc.) from financial statements and reports
2. Identifying and extracting important contract terms from legal documents
3. Pulling out relevant market data from industry reports and analyses
4. Extracting operational KPIs from management presentations and internal reports
5. Identifying and extracting key personnel information from organizational charts and bios
Provide accurate, structured data extracted from various document types to support investment analysis."""
SUMMARIZER_PROMPT = """You are an expert private equity agent specializing in summarizing complex documents. Your core competencies include:
1. Distilling lengthy financial reports into concise executive summaries
2. Summarizing legal documents, highlighting key terms and potential risks
3. Condensing industry reports to capture essential market trends and competitive dynamics
4. Summarizing management presentations to highlight key strategic initiatives and projections
5. Creating brief overviews of technical documents, emphasizing critical points for non-technical stakeholders
Deliver clear, concise summaries that capture the essence of various documents while highlighting information crucial for investment decisions."""
FINANCIAL_ANALYST_PROMPT = """You are a specialized private equity agent focused on financial analysis. Your key responsibilities include:
1. Analyzing historical financial statements to identify trends and potential issues
2. Evaluating the quality of earnings and potential adjustments to EBITDA
3. Assessing working capital requirements and cash flow dynamics
4. Analyzing capital structure and debt capacity
5. Evaluating financial projections and underlying assumptions
Provide thorough, insightful financial analysis to inform investment decisions and valuation."""
MARKET_ANALYST_PROMPT = """You are a highly skilled private equity agent specializing in market analysis. Your expertise covers:
1. Analyzing industry trends, growth drivers, and potential disruptors
2. Evaluating competitive landscape and market positioning
3. Assessing market size, segmentation, and growth potential
4. Analyzing customer dynamics, including concentration and loyalty
5. Identifying potential regulatory or macroeconomic impacts on the market
Deliver comprehensive market analysis to assess the attractiveness and risks of potential investments."""
OPERATIONAL_ANALYST_PROMPT = """You are an expert private equity agent focused on operational analysis. Your core competencies include:
1. Evaluating operational efficiency and identifying improvement opportunities
2. Analyzing supply chain and procurement processes
3. Assessing sales and marketing effectiveness
4. Evaluating IT systems and digital capabilities
5. Identifying potential synergies in merger or add-on acquisition scenarios
Provide detailed operational analysis to uncover value creation opportunities and potential risks."""
# Initialize specialized agents
data_extractor_agent = Agent(
agent_name="Data-Extractor",
system_prompt=DATA_EXTRACTOR_PROMPT,
system_prompt="You are a data extraction specialist. Extract relevant information from provided content.",
llm=model,
max_loops=1,
autosave=True,
@ -75,7 +36,7 @@ data_extractor_agent = Agent(
summarizer_agent = Agent(
agent_name="Document-Summarizer",
system_prompt=SUMMARIZER_PROMPT,
system_prompt="You are a document summarization specialist. Provide clear and concise summaries.",
llm=model,
max_loops=1,
autosave=True,
@ -90,7 +51,7 @@ summarizer_agent = Agent(
financial_analyst_agent = Agent(
agent_name="Financial-Analyst",
system_prompt=FINANCIAL_ANALYST_PROMPT,
system_prompt="You are a financial analysis specialist. Analyze financial aspects of content.",
llm=model,
max_loops=1,
autosave=True,
@ -105,7 +66,7 @@ financial_analyst_agent = Agent(
market_analyst_agent = Agent(
agent_name="Market-Analyst",
system_prompt=MARKET_ANALYST_PROMPT,
system_prompt="You are a market analysis specialist. Analyze market-related aspects.",
llm=model,
max_loops=1,
autosave=True,
@ -120,7 +81,7 @@ market_analyst_agent = Agent(
operational_analyst_agent = Agent(
agent_name="Operational-Analyst",
system_prompt=OPERATIONAL_ANALYST_PROMPT,
system_prompt="You are an operational analysis specialist. Analyze operational aspects.",
llm=model,
max_loops=1,
autosave=True,
@ -141,12 +102,14 @@ router = SwarmRouter(
agents=[
data_extractor_agent,
summarizer_agent,
# financial_analyst_agent,
# market_analyst_agent,
# operational_analyst_agent,
financial_analyst_agent,
market_analyst_agent,
operational_analyst_agent,
],
swarm_type="auto", # or "SequentialWorkflow" or "ConcurrentWorkflow" or
# auto_generate_prompts=True,
swarm_type="SequentialWorkflow", # or "SequentialWorkflow" or "ConcurrentWorkflow" or
auto_generate_prompts=True,
output_type="all",
)
# Example usage
@ -156,7 +119,3 @@ if __name__ == "__main__":
"Where is the best place to find template term sheets for series A startups. Provide links and references"
)
print(result)
# Retrieve and print logs
for log in router.get_logs():
print(f"{log.timestamp} - {log.level}: {log.message}")

@ -0,0 +1,99 @@
import os
from swarm_models import OpenAIChat
from swarms import Agent, run_agents_with_tasks_concurrently
# Fetch the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize agents for different roles
delaware_ccorp_agent = Agent(
agent_name="Delaware-CCorp-Hiring-Agent",
system_prompt="""
Create a comprehensive hiring description for a Delaware C Corporation,
including all relevant laws and regulations, such as the Delaware General
Corporation Law (DGCL) and the Delaware Corporate Law. Ensure the description
covers the requirements for hiring employees, contractors, and officers,
including the necessary paperwork, tax obligations, and benefits. Also,
outline the procedures for compliance with Delaware's employment laws,
including anti-discrimination laws, workers' compensation, and unemployment
insurance. Provide guidance on how to navigate the complexities of Delaware's
corporate law and ensure that all hiring practices are in compliance with
state and federal regulations.
""",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
output_type="str",
artifacts_on=True,
artifacts_output_path="delaware_ccorp_hiring_description.md",
artifacts_file_extension=".md",
)
indian_foreign_agent = Agent(
agent_name="Indian-Foreign-Hiring-Agent",
system_prompt="""
Create a comprehensive hiring description for an Indian or foreign country,
including all relevant laws and regulations, such as the Indian Contract Act,
the Indian Labour Laws, and the Foreign Exchange Management Act (FEMA).
Ensure the description covers the requirements for hiring employees,
contractors, and officers, including the necessary paperwork, tax obligations,
and benefits. Also, outline the procedures for compliance with Indian and
foreign employment laws, including anti-discrimination laws, workers'
compensation, and unemployment insurance. Provide guidance on how to navigate
the complexities of Indian and foreign corporate law and ensure that all hiring
practices are in compliance with state and federal regulations. Consider the
implications of hiring foreign nationals and the requirements for obtaining
necessary visas and work permits.
""",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
output_type="str",
artifacts_on=True,
artifacts_output_path="indian_foreign_hiring_description.md",
artifacts_file_extension=".md",
)
# List of agents and corresponding tasks
agents = [delaware_ccorp_agent, indian_foreign_agent]
tasks = [
"""
Create a comprehensive hiring description for an Agent Engineer, including
required skills and responsibilities. Ensure the description covers the
necessary technical expertise, such as proficiency in AI/ML frameworks,
programming languages, and data structures. Outline the key responsibilities,
including designing and developing AI agents, integrating with existing systems,
and ensuring scalability and performance.
""",
"""
Generate a detailed job description for a Prompt Engineer, including
required skills and responsibilities. Ensure the description covers the
necessary technical expertise, such as proficiency in natural language processing,
machine learning, and software development. Outline the key responsibilities,
including designing and optimizing prompts for AI systems, ensuring prompt
quality and consistency, and collaborating with cross-functional teams.
""",
]
# Run agents with tasks concurrently
results = run_agents_with_tasks_concurrently(
agents,
tasks,
all_cores=True,
device="cpu",
)
# Print the results
for result in results:
print(result)

@ -43,8 +43,7 @@ agent = Agent(
)
print(
agent.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria. Create a report on this question."
)
agent.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria. Create a report on this question.",
all_cores=True,
)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "6.0.4"
version = "6.0.9"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]
@ -78,6 +78,7 @@ swarm-models = "*"
clusterops = "*"
chromadb = "*"
reportlab = "*"
doc-master = "*"
[tool.poetry.scripts]
swarms = "swarms.cli.main:main"

@ -0,0 +1,119 @@
import os
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize the boss agent (Director)
boss_agent = Agent(
agent_name="BossAgent",
system_prompt="""
You are the BossAgent responsible for managing and overseeing a swarm of agents analyzing company expenses.
Your job is to dynamically assign tasks, prioritize their execution, and ensure that all agents collaborate efficiently.
After receiving a report on the company's expenses, you will break down the work into smaller tasks,
assigning specific tasks to each agent, such as detecting recurring high costs, categorizing expenditures,
and identifying unnecessary transactions. Ensure the results are communicated back in a structured way
so the finance team can take actionable steps to cut off unproductive spending. You also monitor and
dynamically adapt the swarm to optimize their performance. Finally, you summarize their findings
into a coherent report.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="boss_agent.json",
)
# Initialize worker 1: Expense Analyzer
worker1 = Agent(
agent_name="ExpenseAnalyzer",
system_prompt="""
Your task is to carefully analyze the company's expense data provided to you.
You will focus on identifying high-cost recurring transactions, categorizing expenditures
(e.g., marketing, operations, utilities, etc.), and flagging areas where there seems to be excessive spending.
You will provide a detailed breakdown of each category, along with specific recommendations for cost-cutting.
Pay close attention to monthly recurring subscriptions, office supplies, and non-essential expenditures.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker1.json",
)
# Initialize worker 2: Summary Generator
worker2 = Agent(
agent_name="SummaryGenerator",
system_prompt="""
After receiving the detailed breakdown from the ExpenseAnalyzer,
your task is to create a concise summary of the findings. You will focus on the most actionable insights,
such as highlighting the specific transactions that can be immediately cut off and summarizing the areas
where the company is overspending. Your summary will be used by the BossAgent to generate the final report.
Be clear and to the point, emphasizing the urgency of cutting unnecessary expenses.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker2.json",
)
# Swarm-Level Prompt (Collaboration Prompt)
swarm_prompt = """
As a swarm, your collective goal is to analyze the company's expenses and identify transactions that should be cut off.
You will work collaboratively to break down the entire process of expense analysis into manageable steps.
The BossAgent will direct the flow and assign tasks dynamically to the agents. The ExpenseAnalyzer will first
focus on breaking down the expense report, identifying high-cost recurring transactions, categorizing them,
and providing recommendations for potential cost reduction. After the analysis, the SummaryGenerator will then
consolidate all the findings into an actionable summary that the finance team can use to immediately cut off unnecessary expenses.
Together, your collaboration is essential to streamlining and improving the companys financial health.
"""
# Create a list of agents
agents = [boss_agent, worker1, worker2]
# Define the flow pattern for the swarm
flow = "BossAgent -> ExpenseAnalyzer -> SummaryGenerator"
# Using AgentRearrange class to manage the swarm
agent_system = AgentRearrange(
agents=agents,
flow=flow,
return_json=False,
output_type="final",
max_loops=1,
docs=["SECURITY.md"],
)
# Input task for the swarm
task = f"""
{swarm_prompt}
The company has been facing a rising number of unnecessary expenses, and the finance team needs a detailed
analysis of recent transactions to identify which expenses can be cut off to improve profitability.
Analyze the provided transaction data and create a detailed report on cost-cutting opportunities,
focusing on recurring transactions and non-essential expenditures.
"""
# Run the swarm system with the task
output = agent_system.run(task)
print(output)

@ -35,3 +35,4 @@ aiofiles
swarm-models
clusterops
reportlab
doc-master

@ -0,0 +1,117 @@
import os
from dotenv import load_dotenv
from swarms import Agent, SequentialWorkflow
from swarm_models import OpenAIChat
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize specialized agents
data_extractor_agent = Agent(
agent_name="Data-Extractor",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="data_extractor_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
summarizer_agent = Agent(
agent_name="Document-Summarizer",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="summarizer_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
financial_analyst_agent = Agent(
agent_name="Financial-Analyst",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="financial_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
market_analyst_agent = Agent(
agent_name="Market-Analyst",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="market_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
operational_analyst_agent = Agent(
agent_name="Operational-Analyst",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="operational_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
# Initialize the SwarmRouter
router = SequentialWorkflow(
name="pe-document-analysis-swarm",
description="Analyze documents for private equity due diligence and investment decision-making",
max_loops=1,
agents=[
data_extractor_agent,
summarizer_agent,
financial_analyst_agent,
market_analyst_agent,
operational_analyst_agent,
],
output_type="all",
)
# Example usage
if __name__ == "__main__":
# Run a comprehensive private equity document analysis task
result = router.run(
"Where is the best place to find template term sheets for series A startups. Provide links and references"
)
print(result)

@ -115,6 +115,15 @@ def create_agents_from_yaml(
auto_generate_prompt=agent_config.get(
"auto_generate_prompt", "False"
),
artifacts_on=agent_config.get(
"artifacts_on", "False"
),
artifacts_file_extension=agent_config.get(
"artifacts_file_extension", ".md"
),
artifacts_output_path=agent_config.get(
"artifacts_output_path", ""
),
*args,
**kwargs,
)
@ -138,7 +147,7 @@ def create_agents_from_yaml(
flow=swarm_config.get("flow"),
autosave=swarm_config.get("autosave"),
return_json=swarm_config.get("return_json"),
*args,
rules=swarm_config.get("rules", "") * args,
**kwargs,
)
logger.info(

@ -74,7 +74,9 @@ from swarms.structs.multi_agent_exec import (
run_agents_with_different_tasks,
run_agent_with_timeout,
run_agents_with_resource_monitoring,
run_agents_with_tasks_concurrently,
)
from swarms.structs.agents_available import showcase_available_agents
__all__ = [
"Agent",
@ -143,4 +145,6 @@ __all__ = [
"run_agent_with_timeout",
"run_agents_with_resource_monitoring",
"swarm_router",
"run_agents_with_tasks_concurrently",
"showcase_available_agents",
]

@ -299,7 +299,6 @@ class Agent:
rules: str = None, # type: ignore
planning: Optional[str] = False,
planning_prompt: Optional[str] = None,
device: str = None,
custom_planning_prompt: str = None,
memory_chunk_size: int = 2000,
agent_ops_on: bool = False,
@ -331,6 +330,9 @@ class Agent:
artifacts_on: bool = False,
artifacts_output_path: str = None,
artifacts_file_extension: str = None,
device: str = "cpu",
all_cores: bool = True,
device_id: int = 0,
*args,
**kwargs,
):
@ -408,7 +410,6 @@ class Agent:
self.execute_tool = execute_tool
self.planning = planning
self.planning_prompt = planning_prompt
self.device = device
self.custom_planning_prompt = custom_planning_prompt
self.rules = rules
self.custom_tools_prompt = custom_tools_prompt
@ -441,6 +442,9 @@ class Agent:
self.artifacts_on = artifacts_on
self.artifacts_output_path = artifacts_output_path
self.artifacts_file_extension = artifacts_file_extension
self.device = device
self.all_cores = all_cores
self.device_id = device_id
# Initialize the short term memory
self.short_memory = Conversation(
@ -729,17 +733,19 @@ class Agent:
# Check parameters
def check_parameters(self):
if self.llm is None:
raise ValueError("Language model is not provided")
raise ValueError("Language model is not provided. Choose a model from the available models in swarm_models or create a class with a run(task: str) method and or a __call__ method.")
if self.max_loops is None:
if self.max_loops is None or self.max_loops == 0:
raise ValueError("Max loops is not provided")
if self.max_tokens == 0:
if self.max_tokens == 0 or self.max_tokens is None:
raise ValueError("Max tokens is not provided")
if self.context_length == 0:
if self.context_length == 0 or self.context_length is None:
raise ValueError("Context length is not provided")
# Main function
def _run(
self,
@ -2264,6 +2270,9 @@ class Agent:
ValueError: If an invalid device is specified.
Exception: If any other error occurs during execution.
"""
device = device or self.device
device_id = device_id or self.device_id
try:
logger.info(f"Attempting to run on device: {device}")
if device == "cpu":

@ -0,0 +1,93 @@
from typing import List, Any
from loguru import logger
from swarms.structs.agent import Agent
def get_agent_name(agent: Any) -> str:
"""Helper function to safely get agent name
Args:
agent (Any): The agent object to get name from
Returns:
str: The agent's name if found, 'Unknown' otherwise
"""
if isinstance(agent, Agent) and hasattr(agent, "agent_name"):
return agent.agent_name
return "Unknown"
def get_agent_description(agent: Any) -> str:
"""Helper function to get agent description or system prompt preview
Args:
agent (Any): The agent object
Returns:
str: Description or first 100 chars of system prompt
"""
if not isinstance(agent, Agent):
return "N/A"
if hasattr(agent, "description") and agent.description:
return agent.description
if hasattr(agent, "system_prompt") and agent.system_prompt:
return f"{agent.system_prompt[:150]}..."
return "N/A"
def showcase_available_agents(
name: str = None,
description: str = None,
agents: List[Agent] = [],
update_agents_on: bool = False,
) -> str:
"""
Generate a formatted string showcasing all available agents and their descriptions.
Args:
agents (List[Agent]): List of Agent objects to showcase.
update_agents_on (bool, optional): If True, updates each agent's system prompt with
the showcase information. Defaults to False.
Returns:
str: Formatted string containing agent information, including names, descriptions
and IDs for all available agents.
"""
logger.info(f"Showcasing {len(agents)} available agents")
formatted_agents = []
header = f"\n####### Agents available in the swarm: {name} ############\n"
header += f"{description}\n"
row_format = "{:<5} | {:<20} | {:<50}"
header_row = row_format.format("ID", "Agent Name", "Description")
separator = "-" * 80
formatted_agents.append(header)
formatted_agents.append(separator)
formatted_agents.append(header_row)
formatted_agents.append(separator)
for idx, agent in enumerate(agents):
if not isinstance(agent, Agent):
logger.warning(
f"Skipping non-Agent object: {type(agent)}"
)
continue
agent_name = get_agent_name(agent)
description = (
get_agent_description(agent)[:100] + "..."
if len(get_agent_description(agent)) > 100
else get_agent_description(agent)
)
formatted_agents.append(
row_format.format(idx + 1, agent_name, description)
)
showcase = "\n".join(formatted_agents)
return showcase

@ -5,10 +5,13 @@ from dataclasses import dataclass
import threading
from typing import List, Union, Any, Callable
from multiprocessing import cpu_count
import os
from swarms.structs.agent import Agent
from swarms.utils.calculate_func_metrics import profile_func
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
# Type definitions
@ -332,6 +335,103 @@ def run_agents_with_resource_monitoring(
# Implementation details...
@profile_func
def _run_agents_with_tasks_concurrently(
agents: List[AgentType],
tasks: List[str] = [],
batch_size: int = None,
max_workers: int = None,
) -> List[Any]:
"""
Run multiple agents with corresponding tasks concurrently.
Args:
agents: List of Agent instances to run
tasks: List of task strings to execute
batch_size: Number of agents to run in parallel
max_workers: Maximum number of threads
Returns:
List of outputs from each agent
"""
if len(agents) != len(tasks):
raise ValueError(
"The number of agents must match the number of tasks."
)
cpu_cores = os.cpu_count()
batch_size = batch_size or cpu_cores
max_workers = max_workers or cpu_cores * 2
results = []
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def run_agent_task_pair(
agent: AgentType, task: str, executor: ThreadPoolExecutor
) -> Any:
return await run_agent_async(agent, task, executor)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for i in range(0, len(agents), batch_size):
batch_agents = agents[i : i + batch_size]
batch_tasks = tasks[i : i + batch_size]
batch_results = loop.run_until_complete(
asyncio.gather(
*(
run_agent_task_pair(agent, task, executor)
for agent, task in zip(
batch_agents, batch_tasks
)
)
)
)
results.extend(batch_results)
return results
def run_agents_with_tasks_concurrently(
agents: List[AgentType],
tasks: List[str] = [],
batch_size: int = None,
max_workers: int = None,
device: str = "cpu",
device_id: int = 0,
all_cores: bool = True,
) -> List[Any]:
"""
Executes a list of agents with their corresponding tasks concurrently on a specified device.
This function orchestrates the concurrent execution of a list of agents with their respective tasks on a specified device, either CPU or GPU. It leverages the `exec_callable_with_clusterops` function to manage the execution on the specified device.
Args:
agents (List[AgentType]): A list of Agent instances or callable functions to execute concurrently.
tasks (List[str], optional): A list of task strings to execute for each agent. Defaults to an empty list.
batch_size (int, optional): The number of agents to run in parallel. Defaults to None.
max_workers (int, optional): The maximum number of threads to use for execution. Defaults to None.
device (str, optional): The device to use for execution. Defaults to "cpu".
device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0.
all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True.
Returns:
List[Any]: A list of outputs from each agent execution.
"""
return exec_callable_with_clusterops(
device,
device_id,
all_cores,
_run_agents_with_tasks_concurrently,
agents,
tasks,
batch_size,
max_workers,
)
# # Example usage:
# # Initialize your agents with the same model to avoid re-creating it
# agents = [

@ -1,39 +1,52 @@
import threading
import traceback
import asyncio
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import Callable, Dict, List, Optional
from typing import Callable, Dict, List, Literal, Optional
from pydantic import BaseModel, Field
from swarms_memory import BaseVectorDatabase
from swarms.schemas.agent_step_schemas import ManySteps
from swarms.structs.agent import Agent
from swarms.structs.agents_available import showcase_available_agents
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.omni_agent_types import AgentType
from swarms.utils.add_docs_to_agents import handle_input_docs
from swarms.utils.loguru_logger import logger
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
from swarms.utils.swarm_reliability_checks import reliability_check
# Literal of output types
OutputType = Literal[
"all", "final", "list", "dict", ".json", ".md", ".txt", ".yaml", ".toml"
]
def swarm_id():
return uuid.uuid4().hex
class AgentRearrangeInput(BaseModel):
swarm_id: str
name: str
description: str
flow: str
max_loops: int
swarm_id: Optional[str] = None
name: Optional[str] = None
description: Optional[str] = None
flow: Optional[str] = None
max_loops: Optional[int] = None
time: str = Field(
default_factory=lambda: datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
),
description="The time the agent was created.",
)
def swarm_id():
return uuid.uuid4().hex
output_type: OutputType = Field(default="final")
class AgentRearrangeOutput(BaseModel):
Input: AgentRearrangeInput
outputs: List[ManySteps]
Input: Optional[AgentRearrangeInput] = None
outputs: Optional[List[ManySteps]] = None
time: str = Field(
default_factory=lambda: datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
@ -47,16 +60,38 @@ class AgentRearrange(BaseSwarm):
A class representing a swarm of agents for rearranging tasks.
Attributes:
agents (dict): A dictionary of agents, where the key is the agent's name and the value is the agent object.
flow (str): The flow pattern of the tasks.
id (str): Unique identifier for the swarm
name (str): Name of the swarm
description (str): Description of the swarm's purpose
agents (callable): Dictionary mapping agent names to Agent objects
flow (str): The flow pattern defining task execution order
max_loops (int): Maximum number of execution loops
verbose (bool): Whether to enable verbose logging
memory_system (BaseVectorDatabase): Memory system for storing agent interactions
human_in_the_loop (bool): Whether human intervention is enabled
custom_human_in_the_loop (Callable): Custom function for human intervention
return_json (bool): Whether to return output in JSON format
output_type (OutputType): Format of output ("all", "final", "list", or "dict")
swarm_history (dict): History of agent interactions
input_config (AgentRearrangeInput): Input configuration schema
output_schema (AgentRearrangeOutput): Output schema
Methods:
__init__(agents: List[Agent] = None, flow: str = None): Initializes the AgentRearrange object.
add_agent(agent: Agent): Adds an agent to the swarm.
remove_agent(agent_name: str): Removes an agent from the swarm.
add_agents(agents: List[Agent]): Adds multiple agents to the swarm.
validate_flow(): Validates the flow pattern.
run(task): Runs the swarm to rearrange the tasks.
__init__(): Initializes the AgentRearrange object
reliability_checks(): Validates swarm configuration
set_custom_flow(): Sets a custom flow pattern
add_agent(): Adds an agent to the swarm
track_history(): Records agent interaction history
remove_agent(): Removes an agent from the swarm
add_agents(): Adds multiple agents to the swarm
validate_flow(): Validates the flow pattern
run(): Executes the swarm's task processing
astream(): Runs the swarm with streaming output
batch_run(): Processes multiple tasks in batches
abatch_run(): Asynchronously processes multiple tasks in batches
concurrent_run(): Processes multiple tasks concurrently
handle_input_docs(): Adds document content to agent prompts
"""
def __init__(
@ -64,7 +99,7 @@ class AgentRearrange(BaseSwarm):
id: str = swarm_id(),
name: str = "AgentRearrange",
description: str = "A swarm of agents for rearranging tasks.",
agents: List[AgentType] = None,
agents: List[Agent] = None,
flow: str = None,
max_loops: int = 1,
verbose: bool = True,
@ -74,25 +109,28 @@ class AgentRearrange(BaseSwarm):
Callable[[str], str]
] = None,
return_json: bool = False,
output_type: OutputType = "final",
docs: List[str] = None,
doc_folder: str = None,
*args,
**kwargs,
):
"""
Initializes the AgentRearrange object.
Args:
agents (List[Agent], optional): A list of Agent objects. Defaults to None.
flow (str, optional): The flow pattern of the tasks. Defaults to None.
"""
# reliability_check(
# agents=agents,
# name=name,
# description=description,
# flow=flow,
# max_loops=max_loops,
# )
super(AgentRearrange, self).__init__(
name=name,
description=description,
agents=agents,
agents=agents if agents else [],
*args,
**kwargs,
)
self.id = id
self.agents = {agent.name: agent for agent in agents}
self.agents = {agent.agent_name: agent for agent in agents}
self.flow = flow if flow is not None else ""
self.verbose = verbose
self.max_loops = max_loops if max_loops > 0 else 1
@ -100,14 +138,14 @@ class AgentRearrange(BaseSwarm):
self.human_in_the_loop = human_in_the_loop
self.custom_human_in_the_loop = custom_human_in_the_loop
self.return_json = return_json
self.output_type = output_type
self.docs = docs
self.doc_folder = doc_folder
self.swarm_history = {
agent.agent_name: [] for agent in agents
}
self.lock = threading.Lock()
self.id = uuid.uuid4().hex if id is None else id
# Run the relianility checks
self.reliability_checks()
self.id = uuid.uuid4().hex if id is None else id
# Output schema
self.input_config = AgentRearrangeInput(
@ -116,6 +154,7 @@ class AgentRearrange(BaseSwarm):
description=self.description,
flow=self.flow,
max_loops=self.max_loops,
output_type=self.output_type,
)
# Output schema
@ -124,31 +163,36 @@ class AgentRearrange(BaseSwarm):
outputs=[],
)
def reliability_checks(self):
logger.info("Running reliability checks.")
if self.agents is None:
raise ValueError("No agents found in the swarm.")
if self.flow is None:
raise ValueError("No flow found in the swarm.")
# Run the reliability checks to validate the swarm
# self.handle_input_docs()
if self.max_loops is None:
raise ValueError("No max_loops found in the swarm.")
# Show the agents whose in the swarm
self.showcase_agents()
logger.info(
"AgentRearrange initialized with agents: {}".format(
list(self.agents.keys())
)
def showcase_agents(self):
# Get formatted agent info once
agents_available = showcase_available_agents(
name=self.name,
description=self.description,
agents=self.agents,
)
# Verbose is True
if self.verbose is True:
logger.add("agent_rearrange.log")
# Update all agents in one pass using values()
for agent in self.agents.values():
if isinstance(agent, Agent):
agent.system_prompt += agents_available
def set_custom_flow(self, flow: str):
self.flow = flow
logger.info(f"Custom flow set: {flow}")
def handle_input_docs(self):
self.agents = handle_input_docs(
agents=self.agents,
docs=self.docs,
doc_folder=self.doc_folder,
)
def add_agent(self, agent: Agent):
"""
Adds an agent to the swarm.
@ -156,8 +200,8 @@ class AgentRearrange(BaseSwarm):
Args:
agent (Agent): The agent to be added.
"""
logger.info(f"Adding agent {agent.name} to the swarm.")
self.agents[agent.name] = agent
logger.info(f"Adding agent {agent.agent_name} to the swarm.")
self.agents[agent.agent_name] = agent
def track_history(
self,
@ -183,7 +227,7 @@ class AgentRearrange(BaseSwarm):
agents (List[Agent]): A list of Agent objects.
"""
for agent in agents:
self.agents[agent.name] = agent
self.agents[agent.agent_name] = agent
def validate_flow(self):
"""
@ -226,10 +270,10 @@ class AgentRearrange(BaseSwarm):
"Duplicate agent names in the flow are not allowed."
)
print("Flow is valid.")
logger.info(f"Flow: {self.flow} is valid.")
return True
def run(
def _run(
self,
task: str = None,
img: str = None,
@ -241,51 +285,73 @@ class AgentRearrange(BaseSwarm):
Runs the swarm to rearrange the tasks.
Args:
task: The initial task to be processed.
task (str, optional): The initial task to be processed. Defaults to None.
img (str, optional): Image input for agents that support it. Defaults to None.
custom_tasks (Dict[str, str], optional): Custom tasks for specific agents. Defaults to None.
output_type (str, optional): Format of the output. Can be:
- "all": String containing all agent responses concatenated
- "final": Only the final agent's response
- "list": List of all agent responses
- "dict": Dict mapping agent names to their responses
Defaults to "final".
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Returns:
str: The final processed task.
Union[str, List[str], Dict[str, str]]: The processed output in the specified format
Raises:
ValueError: If flow validation fails
Exception: For any other errors during execution
"""
try:
if not self.validate_flow():
logger.error("Flow validation failed")
return "Invalid flow configuration."
tasks = self.flow.split("->")
current_task = task
all_responses = []
response_dict = {}
logger.info(
f"Starting task execution with {len(tasks)} steps"
)
# If custom_tasks have the agents name and tasks then combine them
# Handle custom tasks
if custom_tasks is not None:
logger.info("Processing custom tasks")
c_agent_name, c_task = next(
iter(custom_tasks.items())
)
# Find the position of the custom agent in the tasks list
position = tasks.index(c_agent_name)
# If there is a prebois agent merge its task with the custom tasks
if position > 0:
tasks[position - 1] += "->" + c_task
else:
# If there is no prevous agent just insert the custom tasks
tasks.insert(position, c_task)
# Set the loop counter
loop_count = 0
while loop_count < self.max_loops:
logger.info(
f"Starting loop {loop_count + 1}/{self.max_loops}"
)
for task in tasks:
is_last = task == tasks[-1]
agent_names = [
name.strip() for name in task.split(",")
]
if len(agent_names) > 1:
# Parallel processing
logger.info(
f"Running agents in parallel: {agent_names}"
)
results = []
for agent_name in agent_names:
if agent_name == "H":
# Human in the loop intervention
if (
self.human_in_the_loop
and self.custom_human_in_the_loop
@ -299,29 +365,39 @@ class AgentRearrange(BaseSwarm):
current_task = input(
"Enter your response:"
)
results.append(current_task)
response_dict[agent_name] = (
current_task
)
else:
agent = self.agents[agent_name]
result = agent.run(
current_task,
img,
is_last,
task=current_task,
img=img,
is_last=is_last,
*args,
**kwargs,
)
results.append(result)
response_dict[agent_name] = result
self.output_schema.outputs.append(
agent.agent_output
)
logger.debug(
f"Agent {agent_name} output: {result}"
)
current_task = "; ".join(results)
all_responses.extend(results)
else:
# Sequential processing
logger.info(
f"Running agents sequentially: {agent_names}"
f"Running agent sequentially: {agent_names[0]}"
)
agent_name = agent_names[0]
if agent_name == "H":
# Human-in-the-loop intervention
if (
self.human_in_the_loop
and self.custom_human_in_the_loop
@ -335,214 +411,238 @@ class AgentRearrange(BaseSwarm):
current_task = input(
"Enter the next task: "
)
response_dict[agent_name] = current_task
else:
agent = self.agents[agent_name]
current_task = agent.run(
current_task,
img,
is_last,
task=current_task,
img=img,
is_last=is_last,
*args,
**kwargs,
)
response_dict[agent_name] = current_task
self.output_schema.outputs.append(
agent.agent_output
)
logger.debug(
f"Agent {agent_name} output: {current_task}"
)
all_responses.append(current_task)
loop_count += 1
# return current_task
logger.info("Task execution completed")
if self.return_json:
return self.output_schema.model_dump_json(indent=4)
else:
return current_task
# Handle different output types
if self.output_type == "all":
output = " ".join(all_responses)
elif self.output_type == "list":
output = all_responses
elif self.output_type == "dict":
output = response_dict
else: # "final"
output = current_task
return output
except Exception as e:
logger.error(f"An error occurred: {e}")
logger.error(f"An error occurred: {e} \n {traceback.format_exc()}")
return e
async def astream(
def run(
self,
task: str = None,
img: str = None,
custom_tasks: Dict[str, str] = None,
device: str = "cpu",
device_id: int = 1,
all_cores: bool = True,
all_gpus: bool = False,
*args,
**kwargs,
):
"""
Runs the swarm with LangChain's astream_events v1 API enabled.
NOTICE: Be sure to only call this method if you are using LangChain-based models in your swarm.
This is useful for enhancing user experience by providing real-time updates of how each agent
in the swarm is processing the current task.
Execute the agent rearrangement task with specified compute resources.
Args:
task: The initial prompt (aka task) passed to the first agent(s) in the swarm.
task (str, optional): The task to execute. Defaults to None.
img (str, optional): Path to input image if required. Defaults to None.
device (str, optional): Computing device to use ('cpu' or 'gpu'). Defaults to "cpu".
device_id (int, optional): ID of specific device to use. Defaults to 1.
all_cores (bool, optional): Whether to use all CPU cores. Defaults to True.
all_gpus (bool, optional): Whether to use all available GPUs. Defaults to False.
*args: Additional positional arguments passed to _run().
**kwargs: Additional keyword arguments passed to _run().
Returns:
str: The final output generated.
The result from executing the task through the cluster operations wrapper.
"""
try:
if not self.validate_flow():
return "Invalid flow configuration."
return exec_callable_with_clusterops(
device=device,
device_id=device_id,
all_cores=all_cores,
all_gpus=all_gpus,
func=self._run,
task=task,
img=img,
*args,
**kwargs,
)
tasks = self.flow.split("->")
current_task = task
def __call__(self, task: str, *args, **kwargs):
"""
Make the class callable by executing the run() method.
# If custom_tasks have the agents name and tasks then combine them
if custom_tasks is not None:
c_agent_name, c_task = next(
iter(custom_tasks.items())
)
Args:
task (str): The task to execute.
*args: Additional positional arguments passed to run().
**kwargs: Additional keyword arguments passed to run().
# Find the position of the custom agent in the tasks list
position = tasks.index(c_agent_name)
Returns:
The result from executing run().
"""
return self.run(task=task, *args, **kwargs)
# If there is a prebois agent merge its task with the custom tasks
if position > 0:
tasks[position - 1] += "->" + c_task
else:
# If there is no prevous agent just insert the custom tasks
tasks.insert(position, c_task)
def batch_run(
self,
tasks: List[str],
img: Optional[List[str]] = None,
batch_size: int = 10,
device: str = "cpu",
device_id: int = None,
all_cores: bool = True,
all_gpus: bool = False,
*args,
**kwargs,
) -> List[str]:
"""
Process multiple tasks in batches.
logger.info("TASK:", task)
Args:
tasks: List of tasks to process
img: Optional list of images corresponding to tasks
batch_size: Number of tasks to process simultaneously
device: Computing device to use
device_id: Specific device ID if applicable
all_cores: Whether to use all CPU cores
all_gpus: Whether to use all available GPUs
# Set the loop counter
loop_count = 0
while loop_count < self.max_loops:
for task in tasks:
agent_names = [
name.strip() for name in task.split(",")
]
if len(agent_names) > 1:
# Parallel processing
logger.info(
f"Running agents in parallel: {agent_names}"
)
Returns:
List of results corresponding to input tasks
"""
results = []
for agent_name in agent_names:
if agent_name == "H":
# Human in the loop intervention
if (
self.human_in_the_loop
and self.custom_human_in_the_loop
):
current_task = (
self.custom_human_in_the_loop(
current_task
)
)
else:
current_task = input(
"Enter your response:"
)
else:
agent = self.agents[agent_name]
result = None
# As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API
# Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs
# https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference
# Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs
# https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/
async for evt in agent.astream_events(
current_task, version="v1"
):
# print(evt) # <- useful when building/debugging
if evt["event"] == "on_llm_end":
result = evt["data"]["output"]
print(agent.name, result)
results.append(result)
current_task = ""
for index, res in enumerate(results):
current_task += (
"# OUTPUT of "
+ agent_names[index]
+ ""
+ res
+ "\n\n"
)
else:
# Sequential processing
logger.info(
f"Running agents sequentially: {agent_names}"
for i in range(0, len(tasks), batch_size):
batch_tasks = tasks[i : i + batch_size]
batch_imgs = (
img[i : i + batch_size]
if img
else [None] * len(batch_tasks)
)
# Process batch using concurrent execution
batch_results = [
self.run(
task=task,
img=img_path,
device=device,
device_id=device_id,
all_cores=all_cores,
all_gpus=all_gpus,
*args,
**kwargs,
)
for task, img_path in zip(batch_tasks, batch_imgs)
]
results.extend(batch_results)
agent_name = agent_names[0]
if agent_name == "H":
# Human-in-the-loop intervention
if (
self.human_in_the_loop
and self.custom_human_in_the_loop
):
current_task = (
self.custom_human_in_the_loop(
current_task
)
)
else:
current_task = input(
"Enter the next task: "
)
else:
agent = self.agents[agent_name]
result = None
# As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API
# Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs
# https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference
# Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs
# https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/
async for evt in agent.astream_events(
f"SYSTEM: {agent.system_prompt}\nINPUT:{current_task}",
version="v1",
):
# print(evt) # <- useful when building/debugging
if evt["event"] == "on_llm_end":
result = evt["data"]["output"]
print(
agent.name, "result", result
)
current_task = result
return results
loop_count += 1
async def abatch_run(
self,
tasks: List[str],
img: Optional[List[str]] = None,
batch_size: int = 10,
*args,
**kwargs,
) -> List[str]:
"""
Asynchronously process multiple tasks in batches.
return current_task
except Exception as e:
logger.error(f"An error occurred: {e}")
return e
Args:
tasks: List of tasks to process
img: Optional list of images corresponding to tasks
batch_size: Number of tasks to process simultaneously
def process_agent_or_swarm(
self, name: str, task: str, img: str, is_last, *args, **kwargs
):
Returns:
List of results corresponding to input tasks
"""
results = []
for i in range(0, len(tasks), batch_size):
batch_tasks = tasks[i : i + batch_size]
batch_imgs = (
img[i : i + batch_size]
if img
else [None] * len(batch_tasks)
)
# Process batch using asyncio.gather
batch_coros = [
self.astream(task=task, img=img_path, *args, **kwargs)
for task, img_path in zip(batch_tasks, batch_imgs)
]
batch_results = await asyncio.gather(*batch_coros)
results.extend(batch_results)
return results
process_agent_or_swarm: Processes the agent or sub-swarm based on the given name.
def concurrent_run(
self,
tasks: List[str],
img: Optional[List[str]] = None,
max_workers: Optional[int] = None,
device: str = "cpu",
device_id: int = None,
all_cores: bool = True,
all_gpus: bool = False,
*args,
**kwargs,
) -> List[str]:
"""
Process multiple tasks concurrently using ThreadPoolExecutor.
Args:
name (str): The name of the agent or sub-swarm to process.
task (str): The task to be executed.
img (str): The image to be processed by the agents.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
tasks: List of tasks to process
img: Optional list of images corresponding to tasks
max_workers: Maximum number of worker threads
device: Computing device to use
device_id: Specific device ID if applicable
all_cores: Whether to use all CPU cores
all_gpus: Whether to use all available GPUs
Returns:
str: The result of the last executed task.
List of results corresponding to input tasks
"""
if name.startswith("Human"):
return self.human_intervention(task)
elif name in self.sub_swarm:
return self.run_sub_swarm(
task, name, img, *args, **kwargs
)
else:
agent = self.agents[name]
return agent.run(task, img, is_last, *args, **kwargs)
def human_intervention(self, task: str) -> str:
if self.human_in_the_loop and self.custom_human_in_the_loop:
return self.custom_human_in_the_loop(task)
else:
return input(
"Human intervention required. Enter your response: "
with ThreadPoolExecutor(max_workers=max_workers) as executor:
imgs = img if img else [None] * len(tasks)
futures = [
executor.submit(
self.run,
task=task,
img=img_path,
device=device,
device_id=device_id,
all_cores=all_cores,
all_gpus=all_gpus,
*args,
**kwargs,
)
for task, img_path in zip(tasks, imgs)
]
return [future.result() for future in futures]
def rearrange(

@ -1,15 +1,18 @@
from typing import List
from swarms.structs.agent import Agent
from swarms.utils.loguru_logger import logger
from swarms.structs.rearrange import AgentRearrange
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.rearrange import AgentRearrange, OutputType
from concurrent.futures import ThreadPoolExecutor, as_completed
from swarms.structs.agents_available import showcase_available_agents
class SequentialWorkflow(BaseSwarm):
class SequentialWorkflow:
"""
Initializes a SequentialWorkflow object.
Initializes a SequentialWorkflow object, which orchestrates the execution of a sequence of agents.
Args:
name (str, optional): The name of the workflow. Defaults to "SequentialWorkflow".
description (str, optional): A description of the workflow. Defaults to "Sequential Workflow, where agents are executed in a sequence."
agents (List[Agent], optional): The list of agents in the workflow. Defaults to None.
max_loops (int, optional): The maximum number of loops to execute the workflow. Defaults to 1.
*args: Variable length argument list.
@ -23,49 +26,170 @@ class SequentialWorkflow(BaseSwarm):
self,
name: str = "SequentialWorkflow",
description: str = "Sequential Workflow, where agents are executed in a sequence.",
agents: List[Agent] = None,
agents: List[Agent] = [],
max_loops: int = 1,
output_type: OutputType = "all",
return_json: bool = False,
shared_memory_system: callable = None,
*args,
**kwargs,
):
if agents is None or len(agents) == 0:
raise ValueError("Agents list cannot be None or empty")
self.name = name
self.description = description
self.agents = agents
self.max_loops = max_loops
self.output_type = output_type
self.return_json = return_json
self.shared_memory_system = shared_memory_system
if max_loops == 0:
raise ValueError("max_loops cannot be 0")
self.reliability_check()
try:
super().__init__(
self.agent_rearrange = AgentRearrange(
name=name,
description=description,
agents=agents,
flow=self.sequential_flow(),
max_loops=max_loops,
output_type=output_type,
return_json=return_json,
shared_memory_system=shared_memory_system,
*args,
**kwargs,
)
self.name = name
self.description = description
self.agents = agents
self.flow = " -> ".join(
agent.agent_name for agent in agents
# Handle agent showcase
self.handle_agent_showcase()
def sequential_flow(self):
# Only create flow if agents exist
if self.agents:
# Create flow by joining agent names with arrows
agent_names = []
for agent in self.agents:
try:
# Try to get agent_name, fallback to name if not available
agent_name = getattr(agent, 'agent_name', None) or agent.name
agent_names.append(agent_name)
except AttributeError:
logger.warning(f"Could not get name for agent {agent}")
continue
if agent_names:
flow = " -> ".join(agent_names)
else:
flow = ""
logger.warning("No valid agent names found to create flow")
else:
flow = ""
logger.warning("No agents provided to create flow")
return flow
def reliability_check(self):
if self.agents is None or len(self.agents) == 0:
raise ValueError("Agents list cannot be None or empty")
if self.max_loops == 0:
raise ValueError("max_loops cannot be 0")
if self.output_type not in OutputType:
raise ValueError("output_type must be 'all', 'final', 'list', 'dict', '.json', '.md', '.txt', '.yaml', or '.toml'")
logger.info("Checks completed your swarm is ready.")
def handle_agent_showcase(self):
# Get the showcase string once instead of regenerating for each agent
showcase_str = showcase_available_agents(
name=self.name,
description=self.description,
agents=self.agents,
)
self.agent_rearrange = AgentRearrange(
name=name,
description=description,
agents=agents,
flow=self.flow,
max_loops=max_loops,
# Append showcase string to each agent's existing system prompt
for agent in self.agents:
agent.system_prompt += showcase_str
def run(
self,
task: str,
device: str = "cpu",
all_cpus: bool = False,
auto_gpu: bool = False,
*args,
**kwargs,
) -> str:
"""
Executes a task through the agents in the dynamically constructed flow.
Args:
task (str): The task for the agents to execute.
Returns:
str: The final result after processing through all agents.
Raises:
ValueError: If task is None or empty
Exception: If any error occurs during task execution
"""
try:
logger.info(
f"Executing task with dynamic flow: {self.flow}"
)
return self.agent_rearrange.run(
task,
device=device,
all_cpus=all_cpus,
auto_gpu=auto_gpu,
*args,
**kwargs,
)
except Exception as e:
logger.error(
f"Error initializing SequentialWorkflow: {str(e)}"
f"An error occurred while executing the task: {e}"
)
raise e
def __call__(self, task: str, *args, **kwargs) -> str:
return self.run(task, *args, **kwargs)
def run_batched(self, tasks: List[str]) -> List[str]:
"""
Executes a batch of tasks through the agents in the dynamically constructed flow.
Args:
tasks (List[str]): The tasks for the agents to execute.
Returns:
List[str]: The final results after processing through all agents.
Raises:
ValueError: If tasks is None or empty
Exception: If any error occurs during task execution
"""
if not tasks or not all(
isinstance(task, str) for task in tasks
):
raise ValueError(
"Tasks must be a non-empty list of strings"
)
try:
logger.info(
f"Executing batch of tasks with dynamic flow: {self.flow}"
)
return [self.agent_rearrange.run(task) for task in tasks]
except Exception as e:
logger.error(
f"An error occurred while executing the batch of tasks: {e}"
)
raise
def run(self, task: str) -> str:
async def run_async(self, task: str) -> str:
"""
Runs the task through the agents in the dynamically constructed flow.
Executes the task through the agents in the dynamically constructed flow asynchronously.
Args:
task (str): The task for the agents to execute.
@ -82,11 +206,51 @@ class SequentialWorkflow(BaseSwarm):
try:
logger.info(
f"Running task with dynamic flow: {self.flow}"
f"Executing task with dynamic flow asynchronously: {self.flow}"
)
return await self.agent_rearrange.run_async(task)
except Exception as e:
logger.error(
f"An error occurred while executing the task asynchronously: {e}"
)
raise
async def run_concurrent(self, tasks: List[str]) -> List[str]:
"""
Executes a batch of tasks through the agents in the dynamically constructed flow concurrently.
Args:
tasks (List[str]): The tasks for the agents to execute.
Returns:
List[str]: The final results after processing through all agents.
Raises:
ValueError: If tasks is None or empty
Exception: If any error occurs during task execution
"""
if not tasks or not all(
isinstance(task, str) for task in tasks
):
raise ValueError(
"Tasks must be a non-empty list of strings"
)
try:
logger.info(
f"Executing batch of tasks with dynamic flow concurrently: {self.flow}"
)
return self.agent_rearrange.run(task)
with ThreadPoolExecutor() as executor:
results = [
executor.submit(self.agent_rearrange.run, task)
for task in tasks
]
return [
result.result()
for result in as_completed(results)
]
except Exception as e:
logger.error(
f"An error occurred while running the task: {e}"
f"An error occurred while executing the batch of tasks concurrently: {e}"
)
raise

@ -4,6 +4,7 @@ import uuid
from typing import Any, Callable, Dict, List, Optional
from swarms.utils.loguru_logger import logger
from swarms.utils.any_to_str import any_to_str
def swarm_id():
@ -29,23 +30,27 @@ class SwarmRearrange:
A class representing a swarm of swarms for rearranging tasks.
Attributes:
swarms (dict): A dictionary of swarms, where the key is the swarm's name and the value is the swarm object.
flow (str): The flow pattern of the tasks.
max_loops (int): The maximum number of loops to run the swarm.
verbose (bool): A flag indicating whether to log verbose messages.
human_in_the_loop (bool): A flag indicating whether human intervention is required.
custom_human_in_the_loop (Callable[[str], str], optional): A custom function for human-in-the-loop intervention.
return_json (bool): A flag indicating whether to return the result in JSON format.
swarm_history (dict): A dictionary to keep track of the history of each swarm.
lock (threading.Lock): A lock for thread-safe operations.
id (str): Unique identifier for the swarm arrangement
name (str): Name of the swarm arrangement
description (str): Description of what this swarm arrangement does
swarms (dict): A dictionary of swarms, where the key is the swarm's name and the value is the swarm object
flow (str): The flow pattern of the tasks
max_loops (int): The maximum number of loops to run the swarm
verbose (bool): A flag indicating whether to log verbose messages
human_in_the_loop (bool): A flag indicating whether human intervention is required
custom_human_in_the_loop (Callable[[str], str], optional): A custom function for human-in-the-loop intervention
return_json (bool): A flag indicating whether to return the result in JSON format
swarm_history (dict): A dictionary to keep track of the history of each swarm
lock (threading.Lock): A lock for thread-safe operations
Methods:
__init__(swarms: List[swarm] = None, flow: str = None): Initializes the SwarmRearrange object.
add_swarm(swarm: swarm): Adds an swarm to the swarm.
remove_swarm(swarm_name: str): Removes an swarm from the swarm.
add_swarms(swarms: List[swarm]): Adds multiple swarms to the swarm.
validate_flow(): Validates the flow pattern.
run(task): Runs the swarm to rearrange the tasks.
__init__(id: str, name: str, description: str, swarms: List[swarm], flow: str, max_loops: int, verbose: bool,
human_in_the_loop: bool, custom_human_in_the_loop: Callable, return_json: bool): Initializes the SwarmRearrange object
add_swarm(swarm: swarm): Adds an swarm to the swarm
remove_swarm(swarm_name: str): Removes an swarm from the swarm
add_swarms(swarms: List[swarm]): Adds multiple swarms to the swarm
validate_flow(): Validates the flow pattern
run(task): Runs the swarm to rearrange the tasks
"""
def __init__(
@ -69,8 +74,16 @@ class SwarmRearrange:
Initializes the SwarmRearrange object.
Args:
swarms (List[swarm], optional): A list of swarm objects. Defaults to None.
flow (str, optional): The flow pattern of the tasks. Defaults to None.
id (str): Unique identifier for the swarm arrangement. Defaults to generated UUID.
name (str): Name of the swarm arrangement. Defaults to "SwarmRearrange".
description (str): Description of what this swarm arrangement does.
swarms (List[swarm]): A list of swarm objects. Defaults to empty list.
flow (str): The flow pattern of the tasks. Defaults to None.
max_loops (int): Maximum number of loops to run. Defaults to 1.
verbose (bool): Whether to log verbose messages. Defaults to True.
human_in_the_loop (bool): Whether human intervention is required. Defaults to False.
custom_human_in_the_loop (Callable): Custom function for human intervention. Defaults to None.
return_json (bool): Whether to return results as JSON. Defaults to False.
"""
self.id = id
self.name = name
@ -271,6 +284,7 @@ class SwarmRearrange:
result = swarm.run(
current_task, img, *args, **kwargs
)
result = any_to_str(result)
logger.info(
f"Swarm {swarm_name} returned result of type: {type(result)}"
)
@ -312,6 +326,7 @@ class SwarmRearrange:
result = swarm.run(
current_task, img, *args, **kwargs
)
result = any_to_str(result)
logger.info(
f"Swarm {swarm_name} returned result of type: {type(result)}"
)
@ -371,6 +386,7 @@ def swarm_arrange(
flow,
)
result = swarm_arrangement.run(task, *args, **kwargs)
result = any_to_str(result)
logger.info(
f"Swarm arrangement {name} executed successfully with output type {output_type}."
)

File diff suppressed because it is too large Load Diff

@ -2,18 +2,22 @@ import uuid
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Union
from doc_master import doc_master
from loguru import logger
from pydantic import BaseModel, Field
from tenacity import retry, stop_after_attempt, wait_fixed
from swarms.prompts.ag_prompt import aggregator_system_prompt
from swarms.structs.agent import Agent
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.mixture_of_agents import MixtureOfAgents
from swarms.structs.rearrange import AgentRearrange
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
from tenacity import retry, stop_after_attempt, wait_fixed
from swarms.structs.swarm_matcher import swarm_matcher
from swarms.prompts.ag_prompt import aggregator_system_prompt
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
SwarmType = Literal[
"AgentRearrange",
@ -25,6 +29,11 @@ SwarmType = Literal[
]
class Document(BaseModel):
file_path: str
data: str
class SwarmLog(BaseModel):
"""
A Pydantic model to capture log entries.
@ -37,36 +46,78 @@ class SwarmLog(BaseModel):
swarm_type: SwarmType
task: str = ""
metadata: Dict[str, Any] = Field(default_factory=dict)
documents: List[Document] = []
class SwarmRouter:
"""
A class to dynamically route tasks to different swarm types based on user selection or automatic matching.
A class that dynamically routes tasks to different swarm types based on user selection or automatic matching.
This class enables users to specify a swarm type or let the system automatically determine the best swarm type for a given task. It then runs the task on the selected or matched swarm type, ensuring type validation, logging, and metadata capture.
The SwarmRouter enables flexible task execution by either using a specified swarm type or automatically determining
the most suitable swarm type for a given task. It handles task execution while managing logging, type validation,
and metadata capture.
Args:
name (str, optional): Name identifier for the SwarmRouter instance. Defaults to "swarm-router".
description (str, optional): Description of the SwarmRouter's purpose. Defaults to "Routes your task to the desired swarm".
max_loops (int, optional): Maximum number of execution loops. Defaults to 1.
agents (List[Union[Agent, Callable]], optional): List of Agent objects or callables to use. Defaults to empty list.
swarm_type (SwarmType, optional): Type of swarm to use. Defaults to "SequentialWorkflow".
autosave (bool, optional): Whether to enable autosaving. Defaults to False.
flow (str, optional): Flow configuration string. Defaults to None.
return_json (bool, optional): Whether to return results as JSON. Defaults to False.
auto_generate_prompts (bool, optional): Whether to auto-generate agent prompts. Defaults to False.
shared_memory_system (Any, optional): Shared memory system for agents. Defaults to None.
rules (str, optional): Rules to inject into every agent. Defaults to None.
documents (List[str], optional): List of document file paths to use. Defaults to empty list.
output_type (str, optional): Output format type. Defaults to "string".
Attributes:
name (str): The name of the SwarmRouter instance.
description (str): A description of the SwarmRouter instance.
max_loops (int): The maximum number of loops to perform.
agents (List[Union[Agent, Callable]]): A list of Agent objects to be used in the swarm.
swarm_type (SwarmType): The type of swarm to be used, which can be specified or automatically determined.
autosave (bool): A flag to enable/disable autosave.
flow (str): The flow of the swarm.
return_json (bool): A flag to enable/disable returning the result in JSON format.
auto_generate_prompts (bool): A flag to enable/disable auto generation of prompts.
swarm (Union[AgentRearrange, MixtureOfAgents, SpreadSheetSwarm, SequentialWorkflow, ConcurrentWorkflow]):
The instantiated swarm object.
logs (List[SwarmLog]): A list of log entries captured during operations.
auto_generate_prompt (bool): A flag to enable/disable auto generation of prompts.
name (str): Name identifier for the SwarmRouter instance
description (str): Description of the SwarmRouter's purpose
max_loops (int): Maximum number of execution loops
agents (List[Union[Agent, Callable]]): List of Agent objects or callables
swarm_type (SwarmType): Type of swarm being used
autosave (bool): Whether autosaving is enabled
flow (str): Flow configuration string
return_json (bool): Whether results are returned as JSON
auto_generate_prompts (bool): Whether prompt auto-generation is enabled
shared_memory_system (Any): Shared memory system for agents
rules (str): Rules injected into every agent
documents (List[str]): List of document file paths
output_type (str): Output format type
logs (List[SwarmLog]): List of execution logs
swarm: The instantiated swarm object
Available Swarm Types:
- AgentRearrange: Rearranges agents for optimal task execution.
- MixtureOfAgents: Combines different types of agents for diverse task handling.
- SpreadSheetSwarm: Utilizes spreadsheet-like operations for task management.
- SequentialWorkflow: Executes tasks in a sequential manner.
- ConcurrentWorkflow: Executes tasks concurrently for parallel processing.
- "auto" will automatically conduct embedding search to find the best swarm for your task
- AgentRearrange: Optimizes agent arrangement for task execution
- MixtureOfAgents: Combines multiple agent types for diverse tasks
- SpreadSheetSwarm: Uses spreadsheet-like operations for task management
- SequentialWorkflow: Executes tasks sequentially
- ConcurrentWorkflow: Executes tasks in parallel
- "auto": Automatically selects best swarm type via embedding search
Methods:
run(task: str, device: str = "cpu", all_cores: bool = False, all_gpus: bool = False, *args, **kwargs) -> Any:
Executes a task using the configured swarm
batch_run(tasks: List[str], *args, **kwargs) -> List[Any]:
Executes multiple tasks in sequence
threaded_run(task: str, *args, **kwargs) -> Any:
Executes a task in a separate thread
async_run(task: str, *args, **kwargs) -> Any:
Executes a task asynchronously
concurrent_run(task: str, *args, **kwargs) -> Any:
Executes a task using concurrent execution
concurrent_batch_run(tasks: List[str], *args, **kwargs) -> List[Any]:
Executes multiple tasks concurrently
get_logs() -> List[SwarmLog]:
Retrieves execution logs
"""
def __init__(
@ -78,8 +129,12 @@ class SwarmRouter:
swarm_type: SwarmType = "SequentialWorkflow", # "SpreadSheetSwarm" # "auto"
autosave: bool = False,
flow: str = None,
return_json: bool = True,
return_json: bool = False,
auto_generate_prompts: bool = False,
shared_memory_system: Any = None,
rules: str = None,
documents: List[str] = [], # A list of docs file paths
output_type: str = "string", # Md, PDF, Txt, csv
*args,
**kwargs,
):
@ -92,6 +147,10 @@ class SwarmRouter:
self.flow = flow
self.return_json = return_json
self.auto_generate_prompts = auto_generate_prompts
self.shared_memory_system = shared_memory_system
self.rules = rules
self.documents = documents
self.output_type = output_type
self.logs = []
self.reliability_check()
@ -101,8 +160,51 @@ class SwarmRouter:
f"SwarmRouter initialized with swarm type: {swarm_type}",
)
# Handle Automated Prompt Engineering
self.activate_ape()
# Handle shared memory
if self.shared_memory_system is not None:
self.activate_shared_memory()
# Handle rules
if self.rules is not None:
self.handle_rules()
# if self.documents is not None:
# self.handle_docs()
def handle_docs(self):
# Process all documents in parallel using list comprehension
data = "".join(
[doc_master(file_path=doc) for doc in self.documents]
)
# Update all agents' prompts at once
doc_prompt = f"##### Documents Available ########## {data}"
for agent in self.agents:
agent.system_prompt += doc_prompt
# Add documents to the logs
# self.logs.append(Document(file_path=self.documents, data=data))
def activate_shared_memory(self):
logger.info("Activating shared memory with all agents ")
for agent in self.agents:
agent.long_term_memory = self.shared_memory_system
logger.info("All agents now have the same memory system")
def handle_rules(self):
logger.info("Injecting rules to every agent!")
for agent in self.agents:
agent.system_prompt += f"### Swarm Rules ### {self.rules}"
logger.info("Finished injecting rules")
def activate_ape(self):
"""Activate automatic prompt engineering for agents that support it"""
try:
@ -134,7 +236,7 @@ class SwarmRouter:
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def reliability_check(self):
logger.info("Logger initializing checks")
logger.info("Initializing reliability checks")
if not self.agents:
raise ValueError("No agents provided for the swarm.")
@ -143,7 +245,9 @@ class SwarmRouter:
if self.max_loops == 0:
raise ValueError("max_loops cannot be 0.")
logger.info("Checks completed your swarm is ready.")
logger.info(
"Reliability checks completed your swarm is ready."
)
def _create_swarm(
self, task: str = None, *args, **kwargs
@ -182,6 +286,7 @@ class SwarmRouter:
max_loops=self.max_loops,
flow=self.flow,
return_json=self.return_json,
output_type=self.output_type,
*args,
**kwargs,
)
@ -206,12 +311,19 @@ class SwarmRouter:
*args,
**kwargs,
)
elif self.swarm_type == "SequentialWorkflow":
elif (
self.swarm_type == "SequentialWorkflow"
or self.swarm_type == "sequential"
or self.swarm_type == "Sequential"
):
return SequentialWorkflow(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
shared_memory_system=self.shared_memory_system,
output_type=self.output_type,
return_json=self.return_json,
*args,
**kwargs,
)
@ -227,7 +339,9 @@ class SwarmRouter:
**kwargs,
)
else:
raise ValueError(f"Invalid swarm type: {self.swarm_type}")
raise ValueError(
f"Invalid swarm type: {self.swarm_type} try again with a valid swarm type such as 'SequentialWorkflow' or 'ConcurrentWorkflow' or 'auto' or 'AgentRearrange' or 'MixtureOfAgents' or 'SpreadSheetSwarm'"
)
def _log(
self,
@ -256,7 +370,7 @@ class SwarmRouter:
logger.log(level.upper(), message)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def run(self, task: str, *args, **kwargs) -> Any:
def _run(self, task: str, *args, **kwargs) -> Any:
"""
Dynamically run the specified task on the selected or matched swarm type.
@ -281,6 +395,7 @@ class SwarmRouter:
metadata=kwargs,
)
result = self.swarm.run(task, *args, **kwargs)
self._log(
"success",
f"Task completed successfully on {self.swarm_type} swarm",
@ -297,6 +412,56 @@ class SwarmRouter:
)
raise
def run(
self,
task: str,
device: str = "cpu",
all_cores: bool = True,
all_gpus: bool = False,
*args,
**kwargs,
) -> Any:
"""
Execute a task on the selected swarm type with specified compute resources.
Args:
task (str): The task to be executed by the swarm.
device (str, optional): Device to run on - "cpu" or "gpu". Defaults to "cpu".
all_cores (bool, optional): Whether to use all CPU cores. Defaults to True.
all_gpus (bool, optional): Whether to use all available GPUs. Defaults to False.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
Any: The result of the swarm's execution.
Raises:
Exception: If an error occurs during task execution.
"""
return exec_callable_with_clusterops(
func=self._run,
device=device,
all_cores=all_cores,
all_gpus=all_gpus,
task=task,
*args,
**kwargs,
)
def __call__(self, task: str, *args, **kwargs) -> Any:
"""
Make the SwarmRouter instance callable.
Args:
task (str): The task to be executed by the swarm.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
Any: The result of the swarm's execution.
"""
return self.run(task=task, *args, **kwargs)
def batch_run(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:
@ -446,14 +611,25 @@ class SwarmRouter:
Raises:
Exception: If an error occurs during task execution.
"""
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
results = []
with ThreadPoolExecutor() as executor:
# Submit all tasks to executor
futures = [
executor.submit(self.run, task, *args, **kwargs)
for task in tasks
]
results = [future.result() for future in futures]
# Process results as they complete rather than waiting for all
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
logger.error(f"Task execution failed: {str(e)}")
results.append(None)
return results
@ -468,6 +644,7 @@ def swarm_router(
return_json: bool = True,
auto_generate_prompts: bool = False,
task: str = None,
rules: str = None,
*args,
**kwargs,
) -> SwarmRouter:
@ -518,11 +695,14 @@ def swarm_router(
flow=flow,
return_json=return_json,
auto_generate_prompts=auto_generate_prompts,
rules=rules,
)
logger.info(f"Executing task with SwarmRouter: {task}")
result = swarm_router.run(task, *args, **kwargs)
logger.info("Task execution completed successfully")
logger.info(
f"Task execution completed successfully: {result}"
)
return result
except ValueError as e:

@ -0,0 +1,141 @@
from typing import Any, List, Optional, Union
from pathlib import Path
from loguru import logger
from doc_master import doc_master
from concurrent.futures import ThreadPoolExecutor, as_completed
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def _process_document(doc_path: Union[str, Path]) -> str:
"""Safely process a single document with retries.
Args:
doc_path: Path to the document to process
Returns:
Processed document text
Raises:
Exception: If document processing fails after retries
"""
try:
return doc_master(
file_path=str(doc_path), output_type="string"
)
except Exception as e:
logger.error(
f"Error processing document {doc_path}: {str(e)}"
)
raise
def handle_input_docs(
agents: Any,
docs: Optional[List[Union[str, Path]]] = None,
doc_folder: Optional[Union[str, Path]] = None,
max_workers: int = 4,
chunk_size: int = 1000000,
) -> Any:
"""
Add document content to agent prompts with improved reliability and performance.
Args:
agents: Dictionary mapping agent names to Agent objects
docs: List of document paths
doc_folder: Path to folder containing documents
max_workers: Maximum number of parallel document processing workers
chunk_size: Maximum characters to process at once to avoid memory issues
Raises:
ValueError: If neither docs nor doc_folder is provided
RuntimeError: If document processing fails
"""
if not agents:
logger.warning(
"No agents provided, skipping document distribution"
)
return
if not docs and not doc_folder:
logger.warning(
"No documents or folder provided, skipping document distribution"
)
return
logger.info("Starting document distribution to agents")
try:
processed_docs = []
# Process individual documents in parallel
if docs:
with ThreadPoolExecutor(
max_workers=max_workers
) as executor:
future_to_doc = {
executor.submit(_process_document, doc): doc
for doc in docs
}
for future in as_completed(future_to_doc):
doc = future_to_doc[future]
try:
processed_docs.append(future.result())
except Exception as e:
logger.error(
f"Failed to process document {doc}: {str(e)}"
)
raise RuntimeError(
f"Document processing failed: {str(e)}"
)
# Process folder if specified
elif doc_folder:
try:
folder_content = doc_master(
folder_path=str(doc_folder), output_type="string"
)
processed_docs.append(folder_content)
except Exception as e:
logger.error(
f"Failed to process folder {doc_folder}: {str(e)}"
)
raise RuntimeError(
f"Folder processing failed: {str(e)}"
)
# Combine and chunk the processed documents
combined_data = "\n".join(processed_docs)
# Update agent prompts in chunks to avoid memory issues
for agent in agents.values():
try:
for i in range(0, len(combined_data), chunk_size):
chunk = combined_data[i : i + chunk_size]
if i == 0:
agent.system_prompt += (
"\nDocuments:\n" + chunk
)
else:
agent.system_prompt += chunk
except Exception as e:
logger.error(
f"Failed to update agent prompt: {str(e)}"
)
raise RuntimeError(
f"Agent prompt update failed: {str(e)}"
)
logger.info(
f"Successfully added documents to {len(agents)} agents"
)
return agents
except Exception as e:
logger.error(f"Document distribution failed: {str(e)}")
raise RuntimeError(f"Document distribution failed: {str(e)}")

@ -0,0 +1,102 @@
from typing import Union, Dict, List, Tuple, Any
def any_to_str(data: Union[str, Dict, List, Tuple, Any]) -> str:
"""Convert any input data type to a nicely formatted string.
This function handles conversion of various Python data types into a clean string representation.
It recursively processes nested data structures and handles None values gracefully.
Args:
data: Input data of any type to convert to string. Can be:
- Dictionary
- List/Tuple
- String
- None
- Any other type that can be converted via str()
Returns:
str: A formatted string representation of the input data.
- Dictionaries are formatted as "key: value" pairs separated by commas
- Lists/tuples are comma-separated
- None returns empty string
- Other types are converted using str()
Examples:
>>> any_to_str({'a': 1, 'b': 2})
'a: 1, b: 2'
>>> any_to_str([1, 2, 3])
'1, 2, 3'
>>> any_to_str(None)
''
"""
try:
if isinstance(data, dict):
# Format dictionary with newlines and indentation
items = []
for k, v in data.items():
value = any_to_str(v)
items.append(f"{k}: {value}")
return "\n".join(items)
elif isinstance(data, (list, tuple)):
# Format sequences with brackets and proper spacing
items = [any_to_str(x) for x in data]
if len(items) == 0:
return "[]" if isinstance(data, list) else "()"
return (
f"[{', '.join(items)}]"
if isinstance(data, list)
else f"({', '.join(items)})"
)
elif data is None:
return "None"
else:
# Handle strings and other types
if isinstance(data, str):
return f'"{data}"'
return str(data)
except Exception as e:
return f"Error converting data: {str(e)}"
def main():
# Example 1: Dictionary
print("Dictionary:")
print(
any_to_str(
{
"name": "John",
"age": 30,
"hobbies": ["reading", "hiking"],
}
)
)
print("\nNested Dictionary:")
print(
any_to_str(
{
"user": {
"id": 123,
"details": {"city": "New York", "active": True},
},
"data": [1, 2, 3],
}
)
)
print("\nList and Tuple:")
print(any_to_str([1, "text", None, (1, 2)]))
print(any_to_str((True, False, None)))
print("\nEmpty Collections:")
print(any_to_str([]))
print(any_to_str({}))
if __name__ == "__main__":
main()

@ -0,0 +1,34 @@
from typing import Union, Dict, List
from swarms.artifacts.main_artifact import Artifact
def handle_artifact_outputs(
file_path: str,
data: Union[str, Dict, List],
output_type: str = "txt",
folder_path: str = "./artifacts",
) -> str:
"""
Handle different types of data and create files in various formats.
Args:
file_path: Path where the file should be saved
data: Input data that can be string, dict or list
output_type: Type of output file (txt, md, pdf, csv, json)
folder_path: Folder to save artifacts
Returns:
str: Path to the created file
"""
# Create artifact with appropriate file type
artifact = Artifact(
folder_path=folder_path,
file_path=file_path,
file_type=output_type,
contents=data,
edit_count=0,
)
# Save the file
# artifact.save()
artifact.save_as(output_format=output_type)

@ -0,0 +1,78 @@
from loguru import logger
from typing import List, Union, Callable, Optional
from swarms.structs.agent import Agent
def reliability_check(
agents: List[Union[Agent, Callable]],
max_loops: int,
name: Optional[str] = None,
description: Optional[str] = None,
flow: Optional[str] = None,
) -> None:
"""
Performs reliability checks on swarm configuration parameters.
Args:
agents: List of Agent objects or callables that will be executed
max_loops: Maximum number of execution loops
name: Name identifier for the swarm
description: Description of the swarm's purpose
Raises:
ValueError: If any parameters fail validation checks
TypeError: If parameters are of incorrect type
"""
logger.info("Initializing swarm reliability checks")
# Type checking
if not isinstance(agents, list):
raise TypeError("agents parameter must be a list")
if not isinstance(max_loops, int):
raise TypeError("max_loops must be an integer")
# Validate agents
if not agents:
raise ValueError("Agents list cannot be empty")
for i, agent in enumerate(agents):
if not isinstance(agent, (Agent, Callable)):
raise TypeError(
f"Agent at index {i} must be an Agent instance or Callable"
)
# Validate max_loops
if max_loops <= 0:
raise ValueError("max_loops must be greater than 0")
if max_loops > 1000:
logger.warning(
"Large max_loops value detected. This may impact performance."
)
# Validate name
if name is None:
raise ValueError("name parameter is required")
if not isinstance(name, str):
raise TypeError("name must be a string")
if len(name.strip()) == 0:
raise ValueError("name cannot be empty or just whitespace")
# Validate description
if description is None:
raise ValueError("description parameter is required")
if not isinstance(description, str):
raise TypeError("description must be a string")
if len(description.strip()) == 0:
raise ValueError(
"description cannot be empty or just whitespace"
)
# Validate flow
if flow is None:
raise ValueError("flow parameter is required")
if not isinstance(flow, str):
raise TypeError("flow must be a string")
logger.info("All reliability checks passed successfully")

@ -0,0 +1,77 @@
import os
from typing import Any
from clusterops import (
execute_on_gpu,
execute_on_multiple_gpus,
execute_with_cpu_cores,
list_available_gpus,
)
from loguru import logger
def exec_callable_with_clusterops(
device: str = "cpu",
device_id: int = 0,
all_cores: bool = True,
all_gpus: bool = False,
func: callable = None,
*args,
**kwargs,
) -> Any:
"""
Executes a given function on a specified device, either CPU or GPU.
This method attempts to execute a given function on a specified device, either CPU or GPU. It logs the device selection and the number of cores or GPU ID used. If the device is set to CPU, it can use all available cores or a specific core specified by `device_id`. If the device is set to GPU, it uses the GPU specified by `device_id`.
Args:
device (str, optional): The device to use for execution. Defaults to "cpu".
device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0.
all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True.
all_gpus (bool, optional): If True, uses all available GPUs. Defaults to False.
func (callable): The function to execute.
*args: Additional positional arguments to be passed to the execution method.
**kwargs: Additional keyword arguments to be passed to the execution method.
Returns:
Any: The result of the execution.
Raises:
ValueError: If an invalid device is specified.
Exception: If any other error occurs during execution.
"""
try:
logger.info(f"Attempting to run on device: {device}")
if device == "cpu":
logger.info("Device set to CPU")
if all_cores is True:
count = os.cpu_count()
logger.info(f"Using all available CPU cores: {count}")
else:
count = device_id
logger.info(f"Using specific CPU core: {count}")
return execute_with_cpu_cores(
count, func, *args, **kwargs
)
# If device gpu
elif device == "gpu":
logger.info("Device set to GPU")
return execute_on_gpu(device_id, func, *args, **kwargs)
elif device == "gpu" and all_gpus is True:
logger.info("Device set to GPU and running all gpus")
gpus = [int(gpu) for gpu in list_available_gpus()]
return execute_on_multiple_gpus(
gpus, func, *args, **kwargs
)
else:
raise ValueError(
f"Invalid device specified: {device}. Supported devices are 'cpu' and 'gpu'."
)
except ValueError as e:
logger.error(f"Invalid device specified: {e}")
raise e
except Exception as e:
logger.error(f"An error occurred during execution: {e}")
raise e

@ -1,170 +0,0 @@
import os
from pydantic import BaseModel, Field
from swarm_models import OpenAIFunctionCaller
from dotenv import load_dotenv
from typing import Any
from swarms.utils.loguru_logger import logger
from swarms.tools.prebuilt.code_executor import CodeExecutor
load_dotenv()
class Tool(BaseModel):
id: str = Field(
description="A unique identifier for the task. This should be a short, descriptive name that captures the main purpose of the task. Use - to separate words and make it lowercase."
)
plan: str = Field(
description="The comprehensive plan detailing how the task will accomplish the given task. This should include the high-level strategy, key milestones, and expected outcomes. The plan should clearly articulate what the overall goal is, what success looks like, and how progress will be measured throughout execution."
)
failures_prediction: str = Field(
description="A thorough analysis of potential failure modes and mitigation strategies. This should identify technical risks, edge cases, error conditions, and possible points of failure in the task. For each identified risk, include specific preventive measures, fallback approaches, and recovery procedures to ensure robustness and reliability."
)
rationale: str = Field(
description="The detailed reasoning and justification for why this specific task design is optimal for the given task. This should explain the key architectural decisions, tradeoffs considered, alternatives evaluated, and why this approach best satisfies the requirements. Include both technical and business factors that influenced the design."
)
code: str = Field(
description="Generate the code for the task. This should be a python function that takes in a task and returns a result. The code should be a complete and working implementation of the task. Include all necessary imports and dependencies and add types, docstrings, and comments to the code. Make sure the main code executes successfully. No placeholders or comments. Make sure the main function executes successfully."
)
def setup_model(base_model: BaseModel = Tool):
model = OpenAIFunctionCaller(
system_prompt="""You are an expert Python developer specializing in building reliable API integrations and developer tools. Your role is to generate production-ready code that follows best practices for API interactions and tool development.
When given a task, you will:
1. Design robust error handling and retry mechanisms for API calls
2. Implement proper authentication and security measures
3. Structure code for maintainability and reusability
4. Add comprehensive logging and monitoring
5. Include detailed type hints and documentation
6. Write unit tests to verify functionality
Your code should follow these principles:
- Use modern Python features and idioms
- Handle rate limits and API quotas gracefully
- Validate inputs and outputs thoroughly
- Follow security best practices for API keys and secrets
- Include clear error messages and debugging info
- Be well-documented with docstrings and comments
- Use appropriate design patterns
- Follow PEP 8 style guidelines
The generated code should be complete, tested, and ready for production use. Include all necessary imports, error handling, and helper functions.
""",
base_model=base_model,
openai_api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.5,
)
return model
def generate_tool(task: str) -> Any:
model = setup_model()
response = model.run(task)
logger.info(f"Response: {response}")
# If response is a dict, get code directly
if isinstance(response, dict):
# return response.get("code", "")
code = response.get("code", "")
logger.info(f"Code: {code}")
return code
# If response is a Tool object, access code attribute
elif isinstance(response, Tool):
code = response.code
logger.info(f"Code: {code}")
return code
# If response is a string (raw code)
elif isinstance(response, str):
code = response
logger.info(f"Code: {code}")
return code
logger.error(f"Unexpected response type: {type(response)}")
return ""
def execute_generated_code(code: str) -> Any:
"""
Attempts to execute the generated Python code, handling errors and retrying if necessary.
Args:
code (str): The Python code to be executed.
Returns:
Any: Output of the code execution, or error details if execution fails.
"""
logger.info("Starting code execution")
try:
exec_namespace = {}
exec(code, exec_namespace)
# Check for any callable functions in the namespace
main_function = None
for item in exec_namespace.values():
if callable(item) and not item.__name__.startswith("__"):
main_function = item
break
if main_function:
result = main_function()
logger.info(
f"Code execution successful. Function result: {result}"
)
return result
elif "result" in exec_namespace:
logger.info(
f"Code execution successful. Result variable: {exec_namespace['result']}"
)
return exec_namespace["result"]
else:
logger.warning(
"Code execution completed but no result found"
)
return "No result or function found in executed code."
except Exception as e:
logger.error(
f"Code execution failed with error: {str(e)}",
exc_info=True,
)
return e
def retry_until_success(task: str, max_retries: int = 5):
"""
Generates and executes code until the execution is successful.
Args:
task (str): Task description to generate the required code.
"""
attempts = 0
while attempts < max_retries:
logger.info(f"Attempt {attempts + 1} of {max_retries}")
tool = generate_tool(task)
logger.debug(f"Generated code:\n{tool}")
# result = execute_generated_code(tool)
result = CodeExecutor().execute(code=tool)
logger.info(f"Result: {result}")
if isinstance(result, Exception):
logger.error(
f"Attempt {attempts + 1} failed: {str(result)}"
)
print("Retrying with updated code...")
attempts += 1
else:
logger.info(
f"Success on attempt {attempts + 1}. Result: {result}"
)
print(f"Code executed successfully: {result}")
break
else:
logger.error("Max retries reached. Execution failed.")
print("Max retries reached. Execution failed.")
# Usage
retry_until_success(
"Write a function to fetch and display weather information from a given API."
)
Loading…
Cancel
Save