pull/617/head
Your Name 2 months ago
parent 9fced20f0c
commit 4e588ed232

@ -0,0 +1 @@
# 5.8.7

@ -33,11 +33,14 @@ agent = Agent(
context_length=200000,
return_step_meta=False,
# output_type="json",
output_type="string",
output_type="json", # "json", "dict", "csv" OR "string" soon "yaml" and
streaming_on=False,
# auto_generate_prompt=True,
)
agent.run(
print(
agent.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria"
)
)

@ -0,0 +1,28 @@
from swarms import Prompt
from swarm_models import OpenAIChat
import os
model = OpenAIChat(
api_key=os.getenv("OPENAI_API_KEY"), model_name="gpt-4o-mini", temperature=0.1
)
# Aggregator system prompt
prompt_generator_sys_prompt = Prompt(
name="prompt-generator-sys-prompt-o1",
description="Generate the most reliable prompt for a specific problem",
content="""
Your purpose is to craft extremely reliable and production-grade system prompts for other agents.
# Instructions
- Understand the prompt required for the agent.
- Utilize a combination of the most effective prompting strategies available, including chain of thought, many shot, few shot, and instructions-examples-constraints.
- Craft the prompt by blending the most suitable prompting strategies.
- Ensure the prompt is production-grade ready and educates the agent on how to reason and why to reason in that manner.
- Provide constraints if necessary and as needed.
- The system prompt should be extensive and cover a vast array of potential scenarios to specialize the agent.
""",
auto_generate_prompt=True,
llm=model,
)
print(prompt_generator_sys_prompt.get_prompt())

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "5.8.6"
version = "5.8.7"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -13,7 +13,8 @@ from pydantic.v1 import validator
from swarms_cloud.utils.log_to_swarms_database import log_agent_data
from swarms_cloud.utils.capture_system_data import capture_system_data
from swarms.tools.base_tool import BaseTool
# from swarms.agents.ape_agent import auto_generate_prompt
from typing import Any
class Prompt(BaseModel):
"""
@ -70,11 +71,15 @@ class Prompt(BaseModel):
default="prompts",
description="The folder path within WORKSPACE_DIR where the prompt will be autosaved",
)
auto_generate_prompt: bool = Field(
default=False,
description="Flag to enable or disable auto-generating the prompt",
)
parent_folder: str = Field(
default=os.getenv("WORKSPACE_DIR"),
description="The folder where the autosave folder is in",
)
# tools: List[callable] = None
llm: Any = None
@validator("edit_history", pre=True, always=True)
def initialize_history(cls, v, values):
@ -87,6 +92,15 @@ class Prompt(BaseModel):
] # Store initial version in history
return v
def __init__(self, **data):
super().__init__(**data)
if self.autosave:
self._autosave()
if self.auto_generate_prompt and self.llm:
self.auto_generate_prompt()
def edit_prompt(self, new_content: str) -> None:
"""
Edits the prompt content and updates the version control.
@ -239,6 +253,16 @@ class Prompt(BaseModel):
json.dump(self.model_dump(), file)
logger.info(f"Autosaved prompt {self.id} to {file_path}.")
# def auto_generate_prompt(self):
# logger.info(f"Auto-generating prompt for {self.name}")
# task = self.name + " " + self.description + " " + self.content
# prompt = auto_generate_prompt(task, llm=self.llm, max_tokens=4000, use_second_sys_prompt=True)
# logger.info("Generated prompt successfully, updating content")
# self.edit_prompt(prompt)
# logger.info("Prompt content updated")
# return "Prompt auto-generated successfully."
class Config:
"""Pydantic configuration for better JSON serialization."""

@ -57,7 +57,7 @@ from clusterops import (
execute_with_cpu_cores,
)
from swarms.agents.ape_agent import auto_generate_prompt
import yaml
# Utils
# Custom stopping condition
@ -799,18 +799,29 @@ class Agent:
while attempt < self.retry_attempts and not success:
try:
if self.long_term_memory is not None:
logger.info("Querying long term memory...")
logger.info(
"Querying long term memory..."
)
self.memory_query(task_prompt)
# Generate response using LLM
response_args = (
(task_prompt, *args) if img is None else (task_prompt, img, *args)
(task_prompt, *args)
if img is None
else (task_prompt, img, *args)
)
response = self.call_llm(
*response_args, **kwargs
)
response = self.call_llm(*response_args, **kwargs)
# Check if response is a dictionary and has 'choices' key
if isinstance(response, dict) and 'choices' in response:
response = response['choices'][0]['message']['content']
if (
isinstance(response, dict)
and "choices" in response
):
response = response["choices"][0][
"message"
]["content"]
elif isinstance(response, str):
# If response is already a string, use it as is
pass
@ -821,14 +832,14 @@ class Agent:
# Check and execute tools
if self.tools is not None:
print(f"self.tools is not None: {response}")
print(
f"self.tools is not None: {response}"
)
self.parse_and_execute_tools(response)
# Log the step metadata
logged = self.log_step_metadata(
loop_count,
task_prompt,
response
loop_count, task_prompt, response
)
logger.info(logged)
@ -843,8 +854,7 @@ class Agent:
# Add the response to the memory
self.short_memory.add(
role=self.agent_name,
content=response
role=self.agent_name, content=response
)
# Add to all responses
@ -978,8 +988,25 @@ class Agent:
]
# return self.agent_output_type(all_responses)
# More flexible output types
if self.output_type == "string":
return concat_strings(all_responses)
elif self.output_type == "list":
return all_responses
elif self.output_type == "json":
return self.agent_output.model_dump_json(indent=4)
elif self.output_type == "csv":
return self.dict_to_csv(
self.agent_output.model_dump()
)
elif self.output_type == "dict":
return self.agent_output.model_dump()
elif self.output_type == "yaml":
return yaml.safe_dump(self.agent_output.model_dump(), sort_keys=False)
else:
raise ValueError(
f"Invalid output type: {self.output_type}"
)
except Exception as error:
logger.info(
@ -988,20 +1015,79 @@ class Agent:
raise error
def __call__(
self, task: str = None, img: str = None, *args, **kwargs
):
self,
task: Optional[str] = None,
img: Optional[str] = None,
is_last: bool = False,
device: str = "cpu", # gpu
device_id: int = 0,
all_cores: bool = True,
*args,
**kwargs,
) -> Any:
"""Call the agent
Args:
task (str): _description_
img (str, optional): _description_. Defaults to None.
task (Optional[str]): The task to be performed. Defaults to None.
img (Optional[str]): The image to be processed. Defaults to None.
is_last (bool): Indicates if this is the last task. Defaults to False.
device (str): The device to use for execution. Defaults to "cpu".
device_id (int): The ID of the GPU to use if device is set to "gpu". Defaults to 0.
all_cores (bool): If True, uses all available CPU cores. Defaults to True.
"""
try:
return self.run(task, img, *args, **kwargs)
if task is not None:
return self.run(
task=task,
is_last=is_last,
device=device,
device_id=device_id,
all_cores=all_cores,
*args,
**kwargs,
)
elif img is not None:
return self.run(
img=img,
is_last=is_last,
device=device,
device_id=device_id,
all_cores=all_cores,
*args,
**kwargs,
)
else:
raise ValueError(
"Either 'task' or 'img' must be provided."
)
except Exception as error:
logger.error(f"Error calling agent: {error}")
raise error
def dict_to_csv(self, data: dict) -> str:
"""
Convert a dictionary to a CSV string.
Args:
data (dict): The dictionary to convert.
Returns:
str: The CSV string representation of the dictionary.
"""
import csv
import io
output = io.StringIO()
writer = csv.writer(output)
# Write header
writer.writerow(data.keys())
# Write values
writer.writerow(data.values())
return output.getvalue()
def parse_and_execute_tools(self, response: str, *args, **kwargs):
# Extract json from markdown
# response = extract_code_from_markdown(response)
@ -1860,17 +1946,25 @@ class Agent:
"""Parse the output from the LLM"""
try:
if isinstance(response, dict):
if 'choices' in response:
return response['choices'][0]['message']['content']
if "choices" in response:
return response["choices"][0]["message"][
"content"
]
else:
return json.dumps(response) # Convert dict to string
return json.dumps(
response
) # Convert dict to string
elif isinstance(response, str):
return response
else:
return str(response) # Convert any other type to string
return str(
response
) # Convert any other type to string
except Exception as e:
logger.error(f"Error parsing LLM output: {e}")
return str(response) # Return string representation as fallback
return str(
response
) # Return string representation as fallback
def log_step_metadata(
self, loop: int, task: str, response: str

@ -1,4 +1,3 @@
import os
from typing import List, Optional
import chromadb
@ -8,8 +7,17 @@ from typing import Union, Callable, Any
from swarms import Agent
class AgentRAG:
"""A vector database for storing and retrieving agents based on their characteristics."""
class AgentRouter:
"""
Initialize the AgentRouter.
Args:
collection_name (str): Name of the collection in the vector database.
persist_directory (str): Directory to persist the vector database.
n_agents (int): Number of agents to return in queries.
*args: Additional arguments to pass to the chromadb Client.
**kwargs: Additional keyword arguments to pass to the chromadb Client.
"""
def __init__(
self,
@ -19,12 +27,6 @@ class AgentRAG:
*args,
**kwargs,
):
"""
Initialize the AgentRAG.
Args:
persist_directory (str): The directory to persist the ChromaDB data.
"""
self.collection_name = collection_name
self.n_agents = n_agents
self.persist_directory = persist_directory
@ -44,6 +46,9 @@ class AgentRAG:
Args:
agent (Agent): The agent to add.
Raises:
Exception: If there's an error adding the agent to the vector database.
"""
try:
agent_text = f"{agent.name} {agent.description} {agent.system_prompt}"
@ -65,6 +70,12 @@ class AgentRAG:
def add_agents(
self, agents: List[Union[Agent, Callable, Any]]
) -> None:
"""
Add multiple agents to the vector database.
Args:
agents (List[Union[Agent, Callable, Any]]): List of agents to add.
"""
for agent in agents:
self.add_agent(agent)
@ -108,9 +119,14 @@ class AgentRAG:
Args:
task (str): The task description.
*args: Additional arguments to pass to the collection.query method.
**kwargs: Additional keyword arguments to pass to the collection.query method.
Returns:
Optional[Agent]: The best matching agent, if found.
Raises:
Exception: If there's an error finding the best agent.
"""
try:
results = self.collection.query(
@ -150,171 +166,171 @@ class AgentRAG:
raise
# Example usage
if __name__ == "__main__":
from dotenv import load_dotenv
from swarm_models import OpenAIChat
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize the vector database
vector_db = AgentRAG()
# Define specialized system prompts for each agent
DATA_EXTRACTOR_PROMPT = """You are a highly specialized private equity agent focused on data extraction from various documents. Your expertise includes:
1. Extracting key financial metrics (revenue, EBITDA, growth rates, etc.) from financial statements and reports
2. Identifying and extracting important contract terms from legal documents
3. Pulling out relevant market data from industry reports and analyses
4. Extracting operational KPIs from management presentations and internal reports
5. Identifying and extracting key personnel information from organizational charts and bios
Provide accurate, structured data extracted from various document types to support investment analysis."""
SUMMARIZER_PROMPT = """You are an expert private equity agent specializing in summarizing complex documents. Your core competencies include:
1. Distilling lengthy financial reports into concise executive summaries
2. Summarizing legal documents, highlighting key terms and potential risks
3. Condensing industry reports to capture essential market trends and competitive dynamics
4. Summarizing management presentations to highlight key strategic initiatives and projections
5. Creating brief overviews of technical documents, emphasizing critical points for non-technical stakeholders
Deliver clear, concise summaries that capture the essence of various documents while highlighting information crucial for investment decisions."""
FINANCIAL_ANALYST_PROMPT = """You are a specialized private equity agent focused on financial analysis. Your key responsibilities include:
1. Analyzing historical financial statements to identify trends and potential issues
2. Evaluating the quality of earnings and potential adjustments to EBITDA
3. Assessing working capital requirements and cash flow dynamics
4. Analyzing capital structure and debt capacity
5. Evaluating financial projections and underlying assumptions
Provide thorough, insightful financial analysis to inform investment decisions and valuation."""
MARKET_ANALYST_PROMPT = """You are a highly skilled private equity agent specializing in market analysis. Your expertise covers:
1. Analyzing industry trends, growth drivers, and potential disruptors
2. Evaluating competitive landscape and market positioning
3. Assessing market size, segmentation, and growth potential
4. Analyzing customer dynamics, including concentration and loyalty
5. Identifying potential regulatory or macroeconomic impacts on the market
Deliver comprehensive market analysis to assess the attractiveness and risks of potential investments."""
OPERATIONAL_ANALYST_PROMPT = """You are an expert private equity agent focused on operational analysis. Your core competencies include:
1. Evaluating operational efficiency and identifying improvement opportunities
2. Analyzing supply chain and procurement processes
3. Assessing sales and marketing effectiveness
4. Evaluating IT systems and digital capabilities
5. Identifying potential synergies in merger or add-on acquisition scenarios
Provide detailed operational analysis to uncover value creation opportunities and potential risks."""
# Initialize specialized agents
data_extractor_agent = Agent(
agent_name="Data-Extractor",
system_prompt=DATA_EXTRACTOR_PROMPT,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="data_extractor_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
summarizer_agent = Agent(
agent_name="Document-Summarizer",
system_prompt=SUMMARIZER_PROMPT,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="summarizer_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
financial_analyst_agent = Agent(
agent_name="Financial-Analyst",
system_prompt=FINANCIAL_ANALYST_PROMPT,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="financial_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
market_analyst_agent = Agent(
agent_name="Market-Analyst",
system_prompt=MARKET_ANALYST_PROMPT,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="market_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
operational_analyst_agent = Agent(
agent_name="Operational-Analyst",
system_prompt=OPERATIONAL_ANALYST_PROMPT,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="operational_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
# Create agents (using the agents from the original code)
agents_to_add = [
data_extractor_agent,
summarizer_agent,
financial_analyst_agent,
market_analyst_agent,
operational_analyst_agent,
]
# Add agents to the vector database
for agent in agents_to_add:
vector_db.add_agent(agent)
# Example task
task = "Analyze the financial statements of a potential acquisition target and identify key growth drivers."
# Find the best agent for the task
best_agent = vector_db.find_best_agent(task)
if best_agent:
logger.info(f"Best agent for the task: {best_agent.name}")
# Use the best agent to perform the task
result = best_agent.run(task)
print(f"Task result: {result}")
# Update the agent's history in the database
vector_db.update_agent_history(best_agent.name)
else:
print("No suitable agent found for the task.")
# Save the vector database
# # Example usage
# if __name__ == "__main__":
# from dotenv import load_dotenv
# from swarm_models import OpenAIChat
# load_dotenv()
# # Get the OpenAI API key from the environment variable
# api_key = os.getenv("GROQ_API_KEY")
# # Model
# model = OpenAIChat(
# openai_api_base="https://api.groq.com/openai/v1",
# openai_api_key=api_key,
# model_name="llama-3.1-70b-versatile",
# temperature=0.1,
# )
# # Initialize the vector database
# vector_db = AgentRouter()
# # Define specialized system prompts for each agent
# DATA_EXTRACTOR_PROMPT = """You are a highly specialized private equity agent focused on data extraction from various documents. Your expertise includes:
# 1. Extracting key financial metrics (revenue, EBITDA, growth rates, etc.) from financial statements and reports
# 2. Identifying and extracting important contract terms from legal documents
# 3. Pulling out relevant market data from industry reports and analyses
# 4. Extracting operational KPIs from management presentations and internal reports
# 5. Identifying and extracting key personnel information from organizational charts and bios
# Provide accurate, structured data extracted from various document types to support investment analysis."""
# SUMMARIZER_PROMPT = """You are an expert private equity agent specializing in summarizing complex documents. Your core competencies include:
# 1. Distilling lengthy financial reports into concise executive summaries
# 2. Summarizing legal documents, highlighting key terms and potential risks
# 3. Condensing industry reports to capture essential market trends and competitive dynamics
# 4. Summarizing management presentations to highlight key strategic initiatives and projections
# 5. Creating brief overviews of technical documents, emphasizing critical points for non-technical stakeholders
# Deliver clear, concise summaries that capture the essence of various documents while highlighting information crucial for investment decisions."""
# FINANCIAL_ANALYST_PROMPT = """You are a specialized private equity agent focused on financial analysis. Your key responsibilities include:
# 1. Analyzing historical financial statements to identify trends and potential issues
# 2. Evaluating the quality of earnings and potential adjustments to EBITDA
# 3. Assessing working capital requirements and cash flow dynamics
# 4. Analyzing capital structure and debt capacity
# 5. Evaluating financial projections and underlying assumptions
# Provide thorough, insightful financial analysis to inform investment decisions and valuation."""
# MARKET_ANALYST_PROMPT = """You are a highly skilled private equity agent specializing in market analysis. Your expertise covers:
# 1. Analyzing industry trends, growth drivers, and potential disruptors
# 2. Evaluating competitive landscape and market positioning
# 3. Assessing market size, segmentation, and growth potential
# 4. Analyzing customer dynamics, including concentration and loyalty
# 5. Identifying potential regulatory or macroeconomic impacts on the market
# Deliver comprehensive market analysis to assess the attractiveness and risks of potential investments."""
# OPERATIONAL_ANALYST_PROMPT = """You are an expert private equity agent focused on operational analysis. Your core competencies include:
# 1. Evaluating operational efficiency and identifying improvement opportunities
# 2. Analyzing supply chain and procurement processes
# 3. Assessing sales and marketing effectiveness
# 4. Evaluating IT systems and digital capabilities
# 5. Identifying potential synergies in merger or add-on acquisition scenarios
# Provide detailed operational analysis to uncover value creation opportunities and potential risks."""
# # Initialize specialized agents
# data_extractor_agent = Agent(
# agent_name="Data-Extractor",
# system_prompt=DATA_EXTRACTOR_PROMPT,
# llm=model,
# max_loops=1,
# autosave=True,
# verbose=True,
# dynamic_temperature_enabled=True,
# saved_state_path="data_extractor_agent.json",
# user_name="pe_firm",
# retry_attempts=1,
# context_length=200000,
# output_type="string",
# )
# summarizer_agent = Agent(
# agent_name="Document-Summarizer",
# system_prompt=SUMMARIZER_PROMPT,
# llm=model,
# max_loops=1,
# autosave=True,
# verbose=True,
# dynamic_temperature_enabled=True,
# saved_state_path="summarizer_agent.json",
# user_name="pe_firm",
# retry_attempts=1,
# context_length=200000,
# output_type="string",
# )
# financial_analyst_agent = Agent(
# agent_name="Financial-Analyst",
# system_prompt=FINANCIAL_ANALYST_PROMPT,
# llm=model,
# max_loops=1,
# autosave=True,
# verbose=True,
# dynamic_temperature_enabled=True,
# saved_state_path="financial_analyst_agent.json",
# user_name="pe_firm",
# retry_attempts=1,
# context_length=200000,
# output_type="string",
# )
# market_analyst_agent = Agent(
# agent_name="Market-Analyst",
# system_prompt=MARKET_ANALYST_PROMPT,
# llm=model,
# max_loops=1,
# autosave=True,
# verbose=True,
# dynamic_temperature_enabled=True,
# saved_state_path="market_analyst_agent.json",
# user_name="pe_firm",
# retry_attempts=1,
# context_length=200000,
# output_type="string",
# )
# operational_analyst_agent = Agent(
# agent_name="Operational-Analyst",
# system_prompt=OPERATIONAL_ANALYST_PROMPT,
# llm=model,
# max_loops=1,
# autosave=True,
# verbose=True,
# dynamic_temperature_enabled=True,
# saved_state_path="operational_analyst_agent.json",
# user_name="pe_firm",
# retry_attempts=1,
# context_length=200000,
# output_type="string",
# )
# # Create agents (using the agents from the original code)
# agents_to_add = [
# data_extractor_agent,
# summarizer_agent,
# financial_analyst_agent,
# market_analyst_agent,
# operational_analyst_agent,
# ]
# # Add agents to the vector database
# for agent in agents_to_add:
# vector_db.add_agent(agent)
# # Example task
# task = "Analyze the financial statements of a potential acquisition target and identify key growth drivers."
# # Find the best agent for the task
# best_agent = vector_db.find_best_agent(task)
# if best_agent:
# logger.info(f"Best agent for the task: {best_agent.name}")
# # Use the best agent to perform the task
# result = best_agent.run(task)
# print(f"Task result: {result}")
# # Update the agent's history in the database
# vector_db.update_agent_history(best_agent.name)
# else:
# print("No suitable agent found for the task.")
# # Save the vector database

@ -0,0 +1,20 @@
from typing import List, Any, Dict, Optional
class AutoSwarmBuilder:
def __init__(self, task: str, num_agents: int = 10, batch_size: int = 10):
self.task = task
self.num_agents = num_agents
self.batch_size = batch_size
def run(self, task: str, image_url: str = None, *args, **kwargs):
pass
def _create_swarm(self):
pass
def _create_agents(self):
pass
def _run_agents(self):
pass

@ -138,7 +138,6 @@ class ConcurrentWorkflow(BaseSwarm):
self.reliability_check()
def reliability_check(self):
try:
logger.info("Starting reliability checks")

@ -1,74 +1,81 @@
import os
import asyncio
from typing import List, Any
from swarms import Agent
from swarm_models import OpenAIChat
import uvloop
from multiprocessing import cpu_count
from swarms.utils.calculate_func_metrics import profile_func
from typing import List
# Use uvloop for faster asyncio event loop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class (can be reused)
model = OpenAIChat(
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
def run_single_agent(agent: Agent, task: str) -> Any:
"""
Run a single agent on the given task.
Args:
agent (Agent): The agent to run.
task (str): The task for the agent to perform.
# Function to run a single agent on the task (synchronous)
def run_single_agent(agent, task):
Returns:
Any: The result of the agent's execution.
"""
return agent.run(task)
# Asynchronous wrapper for agent tasks
async def run_agent_async(agent, task):
async def run_agent_async(agent: Agent, task: str) -> Any:
"""
Asynchronous wrapper for agent tasks.
Args:
agent (Agent): The agent to run asynchronously.
task (str): The task for the agent to perform.
Returns:
Any: The result of the agent's execution.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, run_single_agent, agent, task
)
# Asynchronous function to run agents concurrently
async def run_agents_concurrently_async(agents, task: str):
async def run_agents_concurrently_async(
agents: List[Agent], task: str
) -> List[Any]:
"""
Run multiple agents concurrently on the same task with optimized performance.
:param agents: List of Agent instances to run concurrently.
:param task: The task string to execute by all agents.
:return: A list of outputs from each agent.
"""
Args:
agents (List[Agent]): List of Agent instances to run concurrently.
task (str): The task string to execute by all agents.
# Run all agents asynchronously using asyncio.gather
Returns:
List[Any]: A list of outputs from each agent.
"""
results = await asyncio.gather(
*(run_agent_async(agent, task) for agent in agents)
)
return results
# Function to manage the overall process and batching
@profile_func
def run_agents_concurrently_multiprocess(
agents: List[Agent], task: str, batch_size: int = cpu_count()
):
) -> List[Any]:
"""
Manage and run multiple agents concurrently in batches, with optimized performance.
:param agents: List of Agent instances to run concurrently.
:param task: The task string to execute by all agents.
:param batch_size: Number of agents to run in parallel in each batch.
:return: A list of outputs from each agent.
"""
Args:
agents (List[Agent]): List of Agent instances to run concurrently.
task (str): The task string to execute by all agents.
batch_size (int, optional): Number of agents to run in parallel in each batch.
Defaults to the number of CPU cores.
Returns:
List[Any]: A list of outputs from each agent.
"""
results = []
loop = asyncio.get_event_loop()
# batch_size = cpu_count()
# Process agents in batches to avoid overwhelming system resources
for i in range(0, len(agents), batch_size):

Loading…
Cancel
Save