pull/609/head
Your Name 3 months ago
parent b4f7ee160e
commit 91fc52bd5e

@ -161,6 +161,7 @@ The `Agent` class offers a range of settings to tailor its behavior to specific
import os import os
from swarms import Agent from swarms import Agent
from swarm_models import OpenAIChat from swarm_models import OpenAIChat
from swarms.prompts.finance_agent_sys_prompt import ( from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT, FINANCIAL_AGENT_SYS_PROMPT,
) )
@ -191,14 +192,14 @@ agent = Agent(
retry_attempts=1, retry_attempts=1,
context_length=200000, context_length=200000,
return_step_meta=False, return_step_meta=False,
# output_type="json", output_type="string",
streaming_on=False,
) )
out = agent.run( agent.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria" "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria"
) )
print(out)
``` ```
----- -----
@ -264,6 +265,8 @@ agent = Agent(
retry_attempts=3, # Number of retry attempts for failed tasks retry_attempts=3, # Number of retry attempts for failed tasks
context_length=200000, # Maximum length of the context to consider context_length=200000, # Maximum length of the context to consider
long_term_memory=chromadb, # Integrates ChromaDB for long-term memory management long_term_memory=chromadb, # Integrates ChromaDB for long-term memory management
return_step_meta=False,
output_type="string",
) )
# Run the agent with a sample task # Run the agent with a sample task
@ -681,40 +684,187 @@ In this example, each `Agent` represents a task that is executed sequentially. T
```python ```python
import os
from swarms import Agent, SequentialWorkflow from swarms import Agent, SequentialWorkflow
from swarm_models import OpenAIChat
from swarm_models import Anthropic # model = Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"))
company = "Nvidia"
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize the language model agent (e.g., GPT-3)
llm = Anthropic()
# Initialize agents for individual tasks # Initialize the Managing Director agent
agent1 = Agent( managing_director = Agent(
agent_name="Blog generator", agent_name="Managing-Director",
system_prompt="Generate a blog post like stephen king", system_prompt=f"""
llm=llm, As the Managing Director at Blackstone, your role is to oversee the entire investment analysis process for potential acquisitions.
Your responsibilities include:
1. Setting the overall strategy and direction for the analysis
2. Coordinating the efforts of the various team members and ensuring a comprehensive evaluation
3. Reviewing the findings and recommendations from each team member
4. Making the final decision on whether to proceed with the acquisition
For the current potential acquisition of {company}, direct the tasks for the team to thoroughly analyze all aspects of the company, including its financials, industry position, technology, market potential, and regulatory compliance. Provide guidance and feedback as needed to ensure a rigorous and unbiased assessment.
""",
llm=model,
max_loops=1, max_loops=1,
dashboard=False, dashboard=False,
tools=[], streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="managing-director.json",
) )
agent2 = Agent(
agent_name="summarizer", # Initialize the Vice President of Finance
system_prompt="Sumamrize the blog post", vp_finance = Agent(
llm=llm, agent_name="VP-Finance",
system_prompt=f"""
As the Vice President of Finance at Blackstone, your role is to lead the financial analysis of potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a thorough review of {company}' financial statements, including income statements, balance sheets, and cash flow statements
2. Analyzing key financial metrics such as revenue growth, profitability margins, liquidity ratios, and debt levels
3. Assessing the company's historical financial performance and projecting future performance based on assumptions and market conditions
4. Identifying any financial risks or red flags that could impact the acquisition decision
5. Providing a detailed report on your findings and recommendations to the Managing Director
Be sure to consider factors such as the sustainability of {company}' business model, the strength of its customer base, and its ability to generate consistent cash flows. Your analysis should be data-driven, objective, and aligned with Blackstone's investment criteria.
""",
llm=model,
max_loops=1, max_loops=1,
dashboard=False, dashboard=False,
tools=[], streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="vp-finance.json",
) )
# Create the Sequential workflow # Initialize the Industry Analyst
workflow = SequentialWorkflow( industry_analyst = Agent(
agents=[agent1, agent2], max_loops=1, verbose=False agent_name="Industry-Analyst",
system_prompt=f"""
As the Industry Analyst at Blackstone, your role is to provide in-depth research and analysis on the industries and markets relevant to potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a comprehensive analysis of the industrial robotics and automation solutions industry, including market size, growth rates, key trends, and future prospects
2. Identifying the major players in the industry and assessing their market share, competitive strengths and weaknesses, and strategic positioning
3. Evaluating {company}' competitive position within the industry, including its market share, differentiation, and competitive advantages
4. Analyzing the key drivers and restraints for the industry, such as technological advancements, labor costs, regulatory changes, and economic conditions
5. Identifying potential risks and opportunities for {company} based on the industry analysis, such as disruptive technologies, emerging markets, or shifts in customer preferences
Your analysis should provide a clear and objective assessment of the attractiveness and future potential of the industrial robotics industry, as well as {company}' positioning within it. Consider both short-term and long-term factors, and provide evidence-based insights to inform the investment decision.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="industry-analyst.json",
) )
# Run the workflow # Initialize the Technology Expert
workflow.run( tech_expert = Agent(
"Generate a blog post on how swarms of agents can help businesses grow." agent_name="Tech-Expert",
system_prompt=f"""
As the Technology Expert at Blackstone, your role is to assess the technological capabilities, competitive advantages, and potential risks of companies being considered for acquisition.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a deep dive into {company}' proprietary technologies, including its robotics platforms, automation software, and AI capabilities
2. Assessing the uniqueness, scalability, and defensibility of {company}' technology stack and intellectual property
3. Comparing {company}' technologies to those of its competitors and identifying any key differentiators or technology gaps
4. Evaluating {company}' research and development capabilities, including its innovation pipeline, engineering talent, and R&D investments
5. Identifying any potential technology risks or disruptive threats that could impact {company}' long-term competitiveness, such as emerging technologies or expiring patents
Your analysis should provide a comprehensive assessment of {company}' technological strengths and weaknesses, as well as the sustainability of its competitive advantages. Consider both the current state of its technology and its future potential in light of industry trends and advancements.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="tech-expert.json",
)
# Initialize the Market Researcher
market_researcher = Agent(
agent_name="Market-Researcher",
system_prompt=f"""
As the Market Researcher at Blackstone, your role is to analyze the target company's customer base, market share, and growth potential to assess the commercial viability and attractiveness of the potential acquisition.
For the current potential acquisition of {company}, your tasks include:
1. Analyzing {company}' current customer base, including customer segmentation, concentration risk, and retention rates
2. Assessing {company}' market share within its target markets and identifying key factors driving its market position
3. Conducting a detailed market sizing and segmentation analysis for the industrial robotics and automation markets, including identifying high-growth segments and emerging opportunities
4. Evaluating the demand drivers and sales cycles for {company}' products and services, and identifying any potential risks or limitations to adoption
5. Developing financial projections and estimates for {company}' revenue growth potential based on the market analysis and assumptions around market share and penetration
Your analysis should provide a data-driven assessment of the market opportunity for {company} and the feasibility of achieving our investment return targets. Consider both bottom-up and top-down market perspectives, and identify any key sensitivities or assumptions in your projections.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="market-researcher.json",
)
# Initialize the Regulatory Specialist
regulatory_specialist = Agent(
agent_name="Regulatory-Specialist",
system_prompt=f"""
As the Regulatory Specialist at Blackstone, your role is to identify and assess any regulatory risks, compliance requirements, and potential legal liabilities associated with potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Identifying all relevant regulatory bodies and laws that govern the operations of {company}, including industry-specific regulations, labor laws, and environmental regulations
2. Reviewing {company}' current compliance policies, procedures, and track record to identify any potential gaps or areas of non-compliance
3. Assessing the potential impact of any pending or proposed changes to relevant regulations that could affect {company}' business or create additional compliance burdens
4. Evaluating the potential legal liabilities and risks associated with {company}' products, services, and operations, including product liability, intellectual property, and customer contracts
5. Providing recommendations on any regulatory or legal due diligence steps that should be taken as part of the acquisition process, as well as any post-acquisition integration considerations
Your analysis should provide a comprehensive assessment of the regulatory and legal landscape surrounding {company}, and identify any material risks or potential deal-breakers. Consider both the current state and future outlook, and provide practical recommendations to mitigate identified risks.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="regulatory-specialist.json",
)
# Create a list of agents
agents = [
managing_director,
vp_finance,
industry_analyst,
tech_expert,
market_researcher,
regulatory_specialist,
]
swarm = SequentialWorkflow(
name="blackstone-private-equity-advisors",
agents=agents,
)
print(
swarm.run(
"Analyze nvidia if it's a good deal to invest in now 10B"
)
) )
``` ```

@ -0,0 +1,182 @@
import os
from swarms import Agent, SequentialWorkflow
from swarm_models import OpenAIChat
# model = Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"))
company = "TGSC"
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize the Managing Director agent
managing_director = Agent(
agent_name="Managing-Director",
system_prompt=f"""
As the Managing Director at Blackstone, your role is to oversee the entire investment analysis process for potential acquisitions.
Your responsibilities include:
1. Setting the overall strategy and direction for the analysis
2. Coordinating the efforts of the various team members and ensuring a comprehensive evaluation
3. Reviewing the findings and recommendations from each team member
4. Making the final decision on whether to proceed with the acquisition
For the current potential acquisition of {company}, direct the tasks for the team to thoroughly analyze all aspects of the company, including its financials, industry position, technology, market potential, and regulatory compliance. Provide guidance and feedback as needed to ensure a rigorous and unbiased assessment.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="managing-director.json",
)
# Initialize the Vice President of Finance
vp_finance = Agent(
agent_name="VP-Finance",
system_prompt=f"""
As the Vice President of Finance at Blackstone, your role is to lead the financial analysis of potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a thorough review of {company}' financial statements, including income statements, balance sheets, and cash flow statements
2. Analyzing key financial metrics such as revenue growth, profitability margins, liquidity ratios, and debt levels
3. Assessing the company's historical financial performance and projecting future performance based on assumptions and market conditions
4. Identifying any financial risks or red flags that could impact the acquisition decision
5. Providing a detailed report on your findings and recommendations to the Managing Director
Be sure to consider factors such as the sustainability of {company}' business model, the strength of its customer base, and its ability to generate consistent cash flows. Your analysis should be data-driven, objective, and aligned with Blackstone's investment criteria.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="vp-finance.json",
)
# Initialize the Industry Analyst
industry_analyst = Agent(
agent_name="Industry-Analyst",
system_prompt=f"""
As the Industry Analyst at Blackstone, your role is to provide in-depth research and analysis on the industries and markets relevant to potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a comprehensive analysis of the industrial robotics and automation solutions industry, including market size, growth rates, key trends, and future prospects
2. Identifying the major players in the industry and assessing their market share, competitive strengths and weaknesses, and strategic positioning
3. Evaluating {company}' competitive position within the industry, including its market share, differentiation, and competitive advantages
4. Analyzing the key drivers and restraints for the industry, such as technological advancements, labor costs, regulatory changes, and economic conditions
5. Identifying potential risks and opportunities for {company} based on the industry analysis, such as disruptive technologies, emerging markets, or shifts in customer preferences
Your analysis should provide a clear and objective assessment of the attractiveness and future potential of the industrial robotics industry, as well as {company}' positioning within it. Consider both short-term and long-term factors, and provide evidence-based insights to inform the investment decision.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="industry-analyst.json",
)
# Initialize the Technology Expert
tech_expert = Agent(
agent_name="Tech-Expert",
system_prompt=f"""
As the Technology Expert at Blackstone, your role is to assess the technological capabilities, competitive advantages, and potential risks of companies being considered for acquisition.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a deep dive into {company}' proprietary technologies, including its robotics platforms, automation software, and AI capabilities
2. Assessing the uniqueness, scalability, and defensibility of {company}' technology stack and intellectual property
3. Comparing {company}' technologies to those of its competitors and identifying any key differentiators or technology gaps
4. Evaluating {company}' research and development capabilities, including its innovation pipeline, engineering talent, and R&D investments
5. Identifying any potential technology risks or disruptive threats that could impact {company}' long-term competitiveness, such as emerging technologies or expiring patents
Your analysis should provide a comprehensive assessment of {company}' technological strengths and weaknesses, as well as the sustainability of its competitive advantages. Consider both the current state of its technology and its future potential in light of industry trends and advancements.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="tech-expert.json",
)
# Initialize the Market Researcher
market_researcher = Agent(
agent_name="Market-Researcher",
system_prompt=f"""
As the Market Researcher at Blackstone, your role is to analyze the target company's customer base, market share, and growth potential to assess the commercial viability and attractiveness of the potential acquisition.
For the current potential acquisition of {company}, your tasks include:
1. Analyzing {company}' current customer base, including customer segmentation, concentration risk, and retention rates
2. Assessing {company}' market share within its target markets and identifying key factors driving its market position
3. Conducting a detailed market sizing and segmentation analysis for the industrial robotics and automation markets, including identifying high-growth segments and emerging opportunities
4. Evaluating the demand drivers and sales cycles for {company}' products and services, and identifying any potential risks or limitations to adoption
5. Developing financial projections and estimates for {company}' revenue growth potential based on the market analysis and assumptions around market share and penetration
Your analysis should provide a data-driven assessment of the market opportunity for {company} and the feasibility of achieving our investment return targets. Consider both bottom-up and top-down market perspectives, and identify any key sensitivities or assumptions in your projections.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="market-researcher.json",
)
# Initialize the Regulatory Specialist
regulatory_specialist = Agent(
agent_name="Regulatory-Specialist",
system_prompt=f"""
As the Regulatory Specialist at Blackstone, your role is to identify and assess any regulatory risks, compliance requirements, and potential legal liabilities associated with potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Identifying all relevant regulatory bodies and laws that govern the operations of {company}, including industry-specific regulations, labor laws, and environmental regulations
2. Reviewing {company}' current compliance policies, procedures, and track record to identify any potential gaps or areas of non-compliance
3. Assessing the potential impact of any pending or proposed changes to relevant regulations that could affect {company}' business or create additional compliance burdens
4. Evaluating the potential legal liabilities and risks associated with {company}' products, services, and operations, including product liability, intellectual property, and customer contracts
5. Providing recommendations on any regulatory or legal due diligence steps that should be taken as part of the acquisition process, as well as any post-acquisition integration considerations
Your analysis should provide a comprehensive assessment of the regulatory and legal landscape surrounding {company}, and identify any material risks or potential deal-breakers. Consider both the current state and future outlook, and provide practical recommendations to mitigate identified risks.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="regulatory-specialist.json",
)
# Create a list of agents
agents = [
managing_director,
vp_finance,
industry_analyst,
tech_expert,
market_researcher,
regulatory_specialist,
]
swarm = SequentialWorkflow(
name="blackstone-private-equity-advisors",
agents=agents,
)
print(
swarm.run(
"Analyze nvidia if it's a good deal to invest in now 10B"
)
)

@ -1,4 +1,4 @@
from swarm_arange import SwarmRearrange from swarms.structs.swarm_arange import SwarmRearrange
from rearrange_example_blackstone import ( from rearrange_example_blackstone import (
blackstone_acquisition_analysis, blackstone_acquisition_analysis,
blackstone_investment_strategy, blackstone_investment_strategy,

@ -66,6 +66,7 @@ from swarms.structs.yaml_model import (
pydantic_type_to_yaml_schema, pydantic_type_to_yaml_schema,
) )
from swarms.structs.swarm_router import SwarmRouter, SwarmType from swarms.structs.swarm_router import SwarmRouter, SwarmType
from swarms.structs.swarm_arange import SwarmRearrange
__all__ = [ __all__ = [
"Agent", "Agent",
@ -128,4 +129,5 @@ __all__ = [
"SpreadSheetSwarm", "SpreadSheetSwarm",
"SwarmRouter", "SwarmRouter",
"SwarmType", "SwarmType",
"SwarmRearrange",
] ]

@ -37,7 +37,13 @@ class SequentialWorkflow(BaseSwarm):
self.agents = agents self.agents = agents
self.flow = " -> ".join(agent.agent_name for agent in agents) self.flow = " -> ".join(agent.agent_name for agent in agents)
self.agent_rearrange = AgentRearrange( self.agent_rearrange = AgentRearrange(
agents, self.flow, max_loops=max_loops, *args, **kwargs name=name,
description=description,
agents=agents,
flow=self.flow,
max_loops=max_loops,
*args,
**kwargs,
) )
def run(self, task: str) -> str: def run(self, task: str) -> str:

@ -1,336 +0,0 @@
from typing import Callable, List, Dict, Any
from swarms.structs.base_swarm import BaseSwarm
from loguru import logger
import time
import uuid
class SwarmArrangeInput:
id: str = uuid.uuid4().hex
time_stamp: str = time.strftime("%Y-%m-%d %H:%M:%S")
name: str
description: str
swarms: List[Callable] = []
output_type: str
flow: str = ""
class SwarmArrangeOutput:
input_config: SwarmArrangeInput = None
class SwarmArrange:
"""
A class for arranging and executing multiple swarms sequentially.
Attributes:
name (str): The name of the SwarmArrange instance.
description (str): A description of the SwarmArrange instance.
swarms (List[Callable]): A list of swarms to be arranged and executed.
output_type (str): The type of output expected from the SwarmArrange instance.
flow (str): The flow pattern of the swarms to be executed.
"""
def __init__(
self,
name: str = "SwarmArrange-01",
description: str = "Combine multiple swarms and execute them sequentially",
swarms: List[Any] = [],
output_type: str = "json",
flow: str = None,
):
"""
Initializes the SwarmArrange instance.
Args:
name (str, optional): The name of the SwarmArrange instance. Defaults to "SwarmArrange-01".
description (str, optional): A description of the SwarmArrange instance. Defaults to "Combine multiple swarms and execute them sequentially".
swarms (List[Callable], optional): A list of swarms to be arranged and executed. Defaults to None.
output_type (str, optional): The type of output expected from the SwarmArrange instance. Defaults to "json".
flow (str, optional): The flow pattern of the swarms to be executed. Defaults to None.
Raises:
ValueError: If the name or description is None.
"""
if not name:
raise ValueError("Name cannot be None")
if not description:
raise ValueError("Description cannot be None")
self.name = name
self.description = description
self.swarms = swarms
self.output_type = output_type
self.flow = flow
self.reliability_check()
# self.log = SwarmArrangeInput(
# name=name,
# description=description,
# swarms=swarms,
# output_type=output_type,
# flow=flow,
# )
def reliability_check(self):
"""
Performs a reliability check on the SwarmArrange instance.
This method checks if the swarms provided are valid and logs the results.
"""
logger.info(
f"Initializing the SwarmArrange with name: {self.name} and description: {self.description}"
)
if self.swarms is None:
logger.warning(
"No swarms detected. Please input a callable with a .run(task: str) method for reliable operation."
)
else:
logger.info(
"SwarmArrange initialized with swarms. Proceeding with reliability check."
)
# Additional logging for reliability check
logger.info(
"Checking if all swarms are callable or instances of BaseSwarm."
)
for swarm in self.swarms:
if not callable(swarm) and not isinstance(
swarm, BaseSwarm
):
logger.error(
f"Swarm {swarm} is not a callable or an instance of BaseSwarm. This may cause reliability issues."
)
return False
logger.info("All swarms are valid. SwarmArrange is reliable.")
return True
def set_custom_flow(self, flow: str):
"""
Sets a custom flow pattern for the SwarmArrange instance.
Args:
flow (str): The custom flow pattern to be set.
"""
self.flow = flow
logger.info(f"Custom flow set: {flow}")
def add_swarm(self, swarm: Callable):
"""
Adds an swarm to the SwarmArrange instance.
Args:
swarm (swarm): The swarm to be added.
"""
logger.info(f"Adding swarm {swarm.name} to the swarm.")
self.swarms[swarm.name] = swarm
def track_history(
self,
swarm_name: str,
result: str,
):
"""
Tracks the history of a swarm's execution.
Args:
swarm_name (str): The name of the swarm.
result (str): The result of the swarm's execution.
"""
self.swarm_history[swarm_name].append(result)
def remove_swarm(self, swarm_name: str):
"""
Removes an swarm from the SwarmArrange instance.
Args:
swarm_name (str): The name of the swarm to be removed.
"""
del self.swarms[swarm_name]
def add_swarms(self, swarms: List[Callable]):
"""
Adds multiple swarms to the SwarmArrange instance.
Args:
swarms (List[swarm]): A list of swarm objects.
"""
self.swarms.extend(swarms)
def validate_flow(self):
"""
Validates the flow pattern of the SwarmArrange instance.
Raises:
ValueError: If the flow pattern is incorrectly formatted or contains duplicate swarm names.
Returns:
bool: True if the flow pattern is valid.
"""
if "->" not in self.flow:
raise ValueError(
"Flow must include '->' to denote the direction of the task."
)
swarms_in_flow = []
# Split the flow into tasks
tasks = self.flow.split("->")
# For each task in the tasks
for task in tasks:
swarm_names = [name.strip() for name in task.split(",")]
# Loop over the swarm names
for swarm_name in swarm_names:
if (
swarm_name not in self.swarms
and swarm_name != "H"
):
raise ValueError(
f"swarm '{swarm_name}' is not registered."
)
swarms_in_flow.append(swarm_name)
# Check for duplicate swarm names in the flow
if len(set(swarms_in_flow)) != len(swarms_in_flow):
raise ValueError(
"Duplicate swarm names in the flow are not allowed."
)
logger.info("Flow is valid.")
return True
def run(
self,
task: str = None,
img: str = None,
custom_tasks: Dict[str, str] = None,
*args,
**kwargs,
):
"""
Runs the SwarmArrange instance to rearrange and execute the swarms.
Args:
task (str, optional): The initial task to be processed. Defaults to None.
img (str, optional): The image to be processed. Defaults to None.
custom_tasks (Dict[str, str], optional): Custom tasks to be executed. Defaults to None.
Returns:
str: The final processed task.
"""
try:
if not self.validate_flow():
return "Invalid flow configuration."
tasks = self.flow.split("->")
current_task = task
# If custom_tasks have the swarms name and tasks then combine them
if custom_tasks is not None:
c_swarm_name, c_task = next(
iter(custom_tasks.items())
)
# Find the position of the custom swarm in the tasks list
position = tasks.index(c_swarm_name)
# If there is a previous swarm merge its task with the custom tasks
if position > 0:
tasks[position - 1] += "->" + c_task
else:
# If there is no previous swarm just insert the custom tasks
tasks.insert(position, c_task)
# Set the loop counter
loop_count = 0
while loop_count < self.max_loops:
for task in tasks:
is_last = task == tasks[-1]
swarm_names = [
name.strip() for name in task.split(",")
]
if len(swarm_names) > 1:
# Parallel processing
logger.info(
f"Running swarms in parallel: {swarm_names}"
)
results = []
for swarm_name in swarm_names:
if swarm_name == "H":
# Human in the loop intervention
if (
self.human_in_the_loop
and self.custom_human_in_the_loop
):
current_task = (
self.custom_human_in_the_loop(
current_task
)
)
else:
current_task = input(
"Enter your response:"
)
else:
swarm = self.swarms[swarm_name]
result = swarm.run(
current_task,
# img,
# is_last,
*args,
**kwargs,
)
results.append(result)
self.output_schema.outputs.append(
swarm.swarm_output
)
current_task = "; ".join(results)
else:
# Sequential processing
logger.info(
f"Running swarms sequentially: {swarm_names}"
)
swarm_name = swarm_names[0]
if swarm_name == "H":
# Human-in-the-loop intervention
if (
self.human_in_the_loop
and self.custom_human_in_the_loop
):
current_task = (
self.custom_human_in_the_loop(
current_task
)
)
else:
current_task = input(
"Enter the next task: "
)
else:
swarm = self.swarms[swarm_name]
current_task = swarm.run(
current_task,
# img,
# is_last,
*args,
**kwargs,
)
self.output_schema.outputs.append(
swarm.swarm_output
)
loop_count += 1
# return current_task
if self.return_json:
return self.output_schema.model_dump_json(indent=4)
else:
return current_task
except Exception as e:
logger.error(f"An error occurred: {e}")
return e
Loading…
Cancel
Save