From 5ef4897f0705ed40307a60c633d7ee9909993b3c Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 31 Oct 2024 14:12:34 -0400 Subject: [PATCH] [FEATS][ run_agents_concurrently, run_agents_concurrently_async, run_single_agent, run_agents_concurrently_multiprocess, run_agents_sequentially, run_agents_with_different_tasks, run_agent_with_timeout, run_agents_with_resource_monitoring,] + [CLEANUP] --- auto_swarm_builder.py | 299 ++++++++ auto_swarm_router.py | 162 +++++ docs/mkdocs.yml | 1 + .../structs/various_execution_methods.md | 168 +++++ example.py | 2 +- new_prompt.py | 6 +- pyproject.toml | 2 +- swarms/agents/__init__.py | 3 - swarms/agents/cli_prompt_generator_func.py | 84 --- swarms/agents/prompt_generator_agent.py | 92 --- swarms/cli/main.py | 47 +- swarms/prompts/prompt.py | 12 +- swarms/prompts/prompt_generator.py | 4 +- swarms/structs/__init__.py | 19 + swarms/structs/agent.py | 6 +- swarms/structs/auto_swarm_builder.py | 20 - swarms/structs/multi_agent_exec.py | 344 ++++++++++ swarms/structs/multi_process_workflow.py | 4 +- swarms/structs/run_agents_in_parallel.py | 120 ---- ...n_agents_in_parallel_async_multiprocess.py | 130 ---- swarms/structs/sequential_workflow.py | 64 +- swarms/structs/swarm_matcher.py | 644 +++++++++++++----- swarms/structs/swarm_router.py | 42 +- swarms/structs/tree_swarm.py | 26 +- swarms/telemetry/bootup.py | 11 +- 25 files changed, 1626 insertions(+), 686 deletions(-) create mode 100644 auto_swarm_builder.py create mode 100644 auto_swarm_router.py create mode 100644 docs/swarms/structs/various_execution_methods.md delete mode 100644 swarms/agents/cli_prompt_generator_func.py delete mode 100644 swarms/agents/prompt_generator_agent.py delete mode 100644 swarms/structs/auto_swarm_builder.py create mode 100644 swarms/structs/multi_agent_exec.py delete mode 100644 swarms/structs/run_agents_in_parallel.py delete mode 100644 swarms/structs/run_agents_in_parallel_async_multiprocess.py diff --git a/auto_swarm_builder.py b/auto_swarm_builder.py new file mode 100644 index 00000000..177cfdc4 --- /dev/null +++ b/auto_swarm_builder.py @@ -0,0 +1,299 @@ +from loguru import logger + +import os +from typing import List + +from pydantic import BaseModel, Field +from swarm_models import OpenAIFunctionCaller, OpenAIChat + +from swarms.structs.agent import Agent +from swarms.structs.swarm_router import SwarmRouter + + +class AgentConfig(BaseModel): + """Configuration for an individual agent in a swarm""" + + name: str = Field( + description="The name of the agent", example="Research-Agent" + ) + description: str = Field( + description="A description of the agent's purpose and capabilities", + example="Agent responsible for researching and gathering information", + ) + system_prompt: str = Field( + description="The system prompt that defines the agent's behavior", + example="You are a research agent. Your role is to gather and analyze information...", + ) + max_loops: int = Field( + description="Maximum number of reasoning loops the agent can perform", + example=3, + ) + + +class SwarmConfig(BaseModel): + """Configuration for a swarm of cooperative agents""" + + name: str = Field( + description="The name of the swarm", + example="Research-Writing-Swarm", + ) + description: str = Field( + description="The description of the swarm's purpose and capabilities", + example="A swarm of agents that work together to research topics and write articles", + ) + agents: List[AgentConfig] = Field( + description="The list of agents that make up the swarm", + example=[ + AgentConfig( + name="Research-Agent", + description="Gathers information", + system_prompt="You are a research agent...", + max_loops=2, + ), + AgentConfig( + name="Writing-Agent", + description="Writes content", + system_prompt="You are a writing agent...", + max_loops=1, + ), + ], + ) + max_loops: int = Field( + description="The maximum number of loops to run the swarm", + example=1, + ) + + +# Get the OpenAI API key from the environment variable +api_key = os.getenv("OPENAI_API_KEY") + +# Create an instance of the OpenAIChat class +model = OpenAIChat( + openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1 +) + + +BOSS_SYSTEM_PROMPT = """ +Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective. + +### Instructions: + +1. **Task Assignment**: + - Analyze available worker agents when a task is presented. + - Delegate tasks to existing agents with clear, direct, and actionable instructions if an appropriate agent is available. + - If no suitable agent exists, create a new agent with a fitting system prompt to handle the task. + +2. **Agent Creation**: + - Name agents according to the task they are intended to perform (e.g., "Twitter Marketing Agent"). + - Provide each new agent with a concise and clear system prompt that includes its role, objectives, and any tools it can utilize. + +3. **Efficiency**: + - Minimize redundancy and maximize task completion speed. + - Avoid unnecessary agent creation if an existing agent can fulfill the task. + +4. **Communication**: + - Be explicit in task delegation instructions to avoid ambiguity and ensure effective task execution. + - Require agents to report back on task completion or encountered issues. + +5. **Reasoning and Decisions**: + - Offer brief reasoning when selecting or creating agents to maintain transparency. + - Avoid using an agent if unnecessary, with a clear explanation if no agents are suitable for a task. + +# Output Format + +Present your plan in clear, bullet-point format or short concise paragraphs, outlining task assignment, agent creation, efficiency strategies, and communication protocols. + +# Notes + +- Preserve transparency by always providing reasoning for task-agent assignments and creation. +- Ensure instructions to agents are unambiguous to minimize error. + +""" + + +class AutoSwarmBuilder: + """A class that automatically builds and manages swarms of AI agents. + + This class handles the creation, coordination and execution of multiple AI agents working + together as a swarm to accomplish complex tasks. It uses a boss agent to delegate work + and create new specialized agents as needed. + + Args: + name (str): The name of the swarm + description (str): A description of the swarm's purpose + verbose (bool, optional): Whether to output detailed logs. Defaults to True. + max_loops (int, optional): Maximum number of execution loops. Defaults to 1. + """ + + def __init__( + self, + name: str = None, + description: str = None, + verbose: bool = True, + max_loops: int = 1, + ): + self.name = name + self.description = description + self.verbose = verbose + self.max_loops = max_loops + self.agents_pool = [] + logger.info( + f"Initialized AutoSwarmBuilder: {name} {description}" + ) + + # @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) + def run(self, task: str, image_url: str = None, *args, **kwargs): + """Run the swarm on a given task. + + Args: + task (str): The task to be accomplished + image_url (str, optional): URL of an image input if needed. Defaults to None. + *args: Variable length argument list + **kwargs: Arbitrary keyword arguments + + Returns: + The output from the swarm's execution + """ + logger.info(f"Running swarm on task: {task}") + agents = self._create_agents(task, image_url, *args, **kwargs) + logger.info(f"Agents created {len(agents)}") + logger.info("Routing task through swarm") + output = self.swarm_router(agents, task, image_url) + logger.info(f"Swarm execution complete with output: {output}") + return output + + # @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) + def _create_agents(self, task: str, *args, **kwargs): + """Create the necessary agents for a task. + + Args: + task (str): The task to create agents for + *args: Variable length argument list + **kwargs: Arbitrary keyword arguments + + Returns: + list: List of created agents + """ + logger.info("Creating agents for task") + model = OpenAIFunctionCaller( + system_prompt=BOSS_SYSTEM_PROMPT, + api_key=os.getenv("OPENAI_API_KEY"), + temperature=0.1, + base_model=SwarmConfig, + ) + + agents_dictionary = model.run(task) + logger.info(f"Agents dictionary: {agents_dictionary}") + + # Convert dictionary to SwarmConfig if needed + if isinstance(agents_dictionary, dict): + agents_dictionary = SwarmConfig(**agents_dictionary) + + # Set swarm config + self.name = agents_dictionary.name + self.description = agents_dictionary.description + self.max_loops = getattr( + agents_dictionary, "max_loops", 1 + ) # Default to 1 if not set + + logger.info( + f"Swarm config: {self.name}, {self.description}, {self.max_loops}" + ) + + # Create agents from config + agents = [] + for agent_config in agents_dictionary.agents: + # Convert dict to AgentConfig if needed + if isinstance(agent_config, dict): + agent_config = AgentConfig(**agent_config) + + agent = self.build_agent( + agent_name=agent_config.name, + agent_description=agent_config.description, + agent_system_prompt=agent_config.system_prompt, + max_loops=agent_config.max_loops, + ) + agents.append(agent) + + return agents + + def build_agent( + self, + agent_name: str, + agent_description: str, + agent_system_prompt: str, + max_loops: int = 1, + ): + """Build a single agent with the given specifications. + + Args: + agent_name (str): Name of the agent + agent_description (str): Description of the agent's purpose + agent_system_prompt (str): The system prompt for the agent + + Returns: + Agent: The constructed agent instance + """ + logger.info(f"Building agent: {agent_name}") + agent = Agent( + agent_name=agent_name, + description=agent_description, + system_prompt=agent_system_prompt, + llm=model, + max_loops=max_loops, + autosave=True, + dashboard=False, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path=f"{agent_name}.json", + user_name="swarms_corp", + retry_attempts=1, + context_length=200000, + return_step_meta=False, + output_type="str", # "json", "dict", "csv" OR "string" soon "yaml" and + streaming_on=False, + auto_generate_prompt=True, + ) + + return agent + + # @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) + def swarm_router( + self, + agents: List[Agent], + task: str, + image_url: str = None, + *args, + **kwargs, + ): + """Route tasks between agents in the swarm. + + Args: + agents (List[Agent]): List of available agents + task (str): The task to route + image_url (str, optional): URL of an image input if needed. Defaults to None. + *args: Variable length argument list + **kwargs: Arbitrary keyword arguments + + Returns: + The output from the routed task execution + """ + logger.info("Routing task through swarm") + swarm_router_instance = SwarmRouter( + agents=agents, + swarm_type="auto", + max_loops=1, + ) + + return swarm_router_instance.run( + self.name + " " + self.description + " " + task, + ) + + +example = AutoSwarmBuilder() + +print( + example.run( + "Write multiple blog posts about the latest advancements in swarm intelligence all at once" + ) +) diff --git a/auto_swarm_router.py b/auto_swarm_router.py new file mode 100644 index 00000000..20115935 --- /dev/null +++ b/auto_swarm_router.py @@ -0,0 +1,162 @@ +import os +from dotenv import load_dotenv +from swarms import Agent +from swarm_models import OpenAIChat +from swarms.structs.swarm_router import SwarmRouter + +load_dotenv() + +# Get the OpenAI API key from the environment variable +api_key = os.getenv("GROQ_API_KEY") + +# Model +model = OpenAIChat( + openai_api_base="https://api.groq.com/openai/v1", + openai_api_key=api_key, + model_name="llama-3.1-70b-versatile", + temperature=0.1, +) +# Define specialized system prompts for each agent +DATA_EXTRACTOR_PROMPT = """You are a highly specialized private equity agent focused on data extraction from various documents. Your expertise includes: +1. Extracting key financial metrics (revenue, EBITDA, growth rates, etc.) from financial statements and reports +2. Identifying and extracting important contract terms from legal documents +3. Pulling out relevant market data from industry reports and analyses +4. Extracting operational KPIs from management presentations and internal reports +5. Identifying and extracting key personnel information from organizational charts and bios +Provide accurate, structured data extracted from various document types to support investment analysis.""" + +SUMMARIZER_PROMPT = """You are an expert private equity agent specializing in summarizing complex documents. Your core competencies include: +1. Distilling lengthy financial reports into concise executive summaries +2. Summarizing legal documents, highlighting key terms and potential risks +3. Condensing industry reports to capture essential market trends and competitive dynamics +4. Summarizing management presentations to highlight key strategic initiatives and projections +5. Creating brief overviews of technical documents, emphasizing critical points for non-technical stakeholders +Deliver clear, concise summaries that capture the essence of various documents while highlighting information crucial for investment decisions.""" + +FINANCIAL_ANALYST_PROMPT = """You are a specialized private equity agent focused on financial analysis. Your key responsibilities include: +1. Analyzing historical financial statements to identify trends and potential issues +2. Evaluating the quality of earnings and potential adjustments to EBITDA +3. Assessing working capital requirements and cash flow dynamics +4. Analyzing capital structure and debt capacity +5. Evaluating financial projections and underlying assumptions +Provide thorough, insightful financial analysis to inform investment decisions and valuation.""" + +MARKET_ANALYST_PROMPT = """You are a highly skilled private equity agent specializing in market analysis. Your expertise covers: +1. Analyzing industry trends, growth drivers, and potential disruptors +2. Evaluating competitive landscape and market positioning +3. Assessing market size, segmentation, and growth potential +4. Analyzing customer dynamics, including concentration and loyalty +5. Identifying potential regulatory or macroeconomic impacts on the market +Deliver comprehensive market analysis to assess the attractiveness and risks of potential investments.""" + +OPERATIONAL_ANALYST_PROMPT = """You are an expert private equity agent focused on operational analysis. Your core competencies include: +1. Evaluating operational efficiency and identifying improvement opportunities +2. Analyzing supply chain and procurement processes +3. Assessing sales and marketing effectiveness +4. Evaluating IT systems and digital capabilities +5. Identifying potential synergies in merger or add-on acquisition scenarios +Provide detailed operational analysis to uncover value creation opportunities and potential risks.""" + +# Initialize specialized agents +data_extractor_agent = Agent( + agent_name="Data-Extractor", + system_prompt=DATA_EXTRACTOR_PROMPT, + llm=model, + max_loops=1, + autosave=True, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="data_extractor_agent.json", + user_name="pe_firm", + retry_attempts=1, + context_length=200000, + output_type="string", +) + +summarizer_agent = Agent( + agent_name="Document-Summarizer", + system_prompt=SUMMARIZER_PROMPT, + llm=model, + max_loops=1, + autosave=True, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="summarizer_agent.json", + user_name="pe_firm", + retry_attempts=1, + context_length=200000, + output_type="string", +) + +financial_analyst_agent = Agent( + agent_name="Financial-Analyst", + system_prompt=FINANCIAL_ANALYST_PROMPT, + llm=model, + max_loops=1, + autosave=True, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="financial_analyst_agent.json", + user_name="pe_firm", + retry_attempts=1, + context_length=200000, + output_type="string", +) + +market_analyst_agent = Agent( + agent_name="Market-Analyst", + system_prompt=MARKET_ANALYST_PROMPT, + llm=model, + max_loops=1, + autosave=True, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="market_analyst_agent.json", + user_name="pe_firm", + retry_attempts=1, + context_length=200000, + output_type="string", +) + +operational_analyst_agent = Agent( + agent_name="Operational-Analyst", + system_prompt=OPERATIONAL_ANALYST_PROMPT, + llm=model, + max_loops=1, + autosave=True, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="operational_analyst_agent.json", + user_name="pe_firm", + retry_attempts=1, + context_length=200000, + output_type="string", +) + +# Initialize the SwarmRouter +router = SwarmRouter( + name="pe-document-analysis-swarm", + description="Analyze documents for private equity due diligence and investment decision-making", + max_loops=1, + agents=[ + data_extractor_agent, + summarizer_agent, + # financial_analyst_agent, + # market_analyst_agent, + # operational_analyst_agent, + ], + swarm_type="auto", # or "SequentialWorkflow" or "ConcurrentWorkflow" or + auto_generate_prompts=True, +) + +# Example usage +if __name__ == "__main__": + # Run a comprehensive private equity document analysis task + result = router.run( + "Where is the best place to find template term sheets for series A startups. Provide links and references" + ) + print(result) + + # Retrieve and print logs + for log in router.get_logs(): + print(f"{log.timestamp} - {log.level}: {log.message}") diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 82bd0fe0..f8c20923 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -176,6 +176,7 @@ nav: - ForestSwarm: "swarms/structs/forest_swarm.md" - SwarmRouter: "swarms/structs/swarm_router.md" - TaskQueueSwarm: "swarms/structs/taskqueue_swarm.md" + - Various Execution Methods: "swarms/structs/various_execution_methods.md" - Workflows: - ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md" - SequentialWorkflow: "swarms/structs/sequential_workflow.md" diff --git a/docs/swarms/structs/various_execution_methods.md b/docs/swarms/structs/various_execution_methods.md new file mode 100644 index 00000000..f1e7d48f --- /dev/null +++ b/docs/swarms/structs/various_execution_methods.md @@ -0,0 +1,168 @@ +# Concurrent Agents API Reference + +This documentation covers the API for running multiple agents concurrently using various execution strategies. The implementation uses `asyncio` with `uvloop` for enhanced performance and `ThreadPoolExecutor` for handling CPU-bound operations. + +## Table of Contents +- [Core Functions](#core-functions) +- [Advanced Functions](#advanced-functions) +- [Utility Functions](#utility-functions) +- [Resource Monitoring](#resource-monitoring) +- [Usage Examples](#usage-examples) + +## Core Functions + +### run_agents_concurrently() + +Primary function for running multiple agents concurrently with optimized performance using both uvloop and ThreadPoolExecutor. + +#### Arguments + +| Parameter | Type | Required | Default | Description | +|-------------|----------------|----------|----------------|-------------| +| agents | List[AgentType]| Yes | - | List of Agent instances to run concurrently | +| task | str | Yes | - | Task string to execute | +| batch_size | int | No | CPU count | Number of agents to run in parallel in each batch | +| max_workers | int | No | CPU count * 2 | Maximum number of threads in the executor | + +#### Returns +`List[Any]`: List of outputs from each agent + +#### Flow Diagram + +```mermaid +graph TD + A[Start] --> B[Initialize ThreadPoolExecutor] + B --> C[Split Agents into Batches] + C --> D[Process Batch] + D --> E{More Batches?} + E -->|Yes| D + E -->|No| F[Combine Results] + F --> G[Return Results] + + subgraph "Batch Processing" + D --> H[Run Agents Async] + H --> I[Wait for Completion] + I --> J[Collect Batch Results] + end +``` + +### run_agents_sequentially() + +Runs multiple agents sequentially for baseline comparison or simple use cases. + +#### Arguments + +| Parameter | Type | Required | Default | Description | +|-----------|----------------|----------|---------|-------------| +| agents | List[AgentType]| Yes | - | List of Agent instances to run | +| task | str | Yes | - | Task string to execute | + +#### Returns +`List[Any]`: List of outputs from each agent + +## Advanced Functions + +### run_agents_with_different_tasks() + +Runs multiple agents with different tasks concurrently. + +#### Arguments + +| Parameter | Type | Required | Default | Description | +|-----------------|-------------------------------|----------|----------------|-------------| +| agent_task_pairs| List[tuple[AgentType, str]] | Yes | - | List of (agent, task) tuples | +| batch_size | int | No | CPU count | Number of agents to run in parallel | +| max_workers | int | No | CPU count * 2 | Maximum number of threads | + +### run_agents_with_timeout() + +Runs multiple agents concurrently with timeout limits. + +#### Arguments + +| Parameter | Type | Required | Default | Description | +|-------------|----------------|----------|----------------|-------------| +| agents | List[AgentType]| Yes | - | List of Agent instances | +| task | str | Yes | - | Task string to execute | +| timeout | float | Yes | - | Timeout in seconds for each agent | +| batch_size | int | No | CPU count | Number of agents to run in parallel | +| max_workers | int | No | CPU count * 2 | Maximum number of threads | + +## Usage Examples + +```python +from swarms.structs.agent import Agent +from your_module import run_agents_concurrently + +# Initialize agents +agents = [ + Agent( + agent_name=f"Analysis-Agent-{i}", + system_prompt="You are a financial analysis expert", + llm=model, + max_loops=1 + ) + for i in range(5) +] + +# Basic concurrent execution +task = "Analyze the impact of rising interest rates on tech stocks" +outputs = run_agents_concurrently(agents, task) + +# Running with timeout +outputs_with_timeout = run_agents_with_timeout( + agents=agents, + task=task, + timeout=30.0, + batch_size=2 +) + +# Running different tasks +task_pairs = [ + (agents[0], "Analyze tech stocks"), + (agents[1], "Analyze energy stocks"), + (agents[2], "Analyze retail stocks") +] +different_outputs = run_agents_with_different_tasks(task_pairs) +``` + +## Resource Monitoring + +### ResourceMetrics + +A dataclass for system resource metrics. + +#### Properties + +| Property | Type | Description | +|----------------|-------|-------------| +| cpu_percent | float | Current CPU usage percentage | +| memory_percent | float | Current memory usage percentage | +| active_threads | int | Number of active threads | + +### run_agents_with_resource_monitoring() + +Runs agents with system resource monitoring and adaptive batch sizing. + +#### Arguments + +| Parameter | Type | Required | Default | Description | +|------------------|----------------|----------|---------|-------------| +| agents | List[AgentType]| Yes | - | List of Agent instances | +| task | str | Yes | - | Task string to execute | +| cpu_threshold | float | No | 90.0 | Max CPU usage percentage | +| memory_threshold | float | No | 90.0 | Max memory usage percentage | +| check_interval | float | No | 1.0 | Resource check interval in seconds | + +## Performance Considerations + +- All functions are decorated with `@profile_func` for performance monitoring +- Default batch sizes and worker counts are optimized based on CPU cores +- Resource monitoring helps prevent system overload +- Using `uvloop` provides better performance than standard `asyncio` + +## Error Handling + +- Functions handle asyncio event loop creation/retrieval +- Timeout mechanism prevents infinite waiting +- Resource monitoring allows for adaptive performance adjustment \ No newline at end of file diff --git a/example.py b/example.py index 4a5c328c..e53c08b0 100644 --- a/example.py +++ b/example.py @@ -35,7 +35,7 @@ agent = Agent( # output_type="json", output_type="json", # "json", "dict", "csv" OR "string" soon "yaml" and streaming_on=False, - # auto_generate_prompt=True, + auto_generate_prompt=True, ) diff --git a/new_prompt.py b/new_prompt.py index 592fc3ab..9375c8f3 100644 --- a/new_prompt.py +++ b/new_prompt.py @@ -3,7 +3,9 @@ from swarm_models import OpenAIChat import os model = OpenAIChat( - api_key=os.getenv("OPENAI_API_KEY"), model_name="gpt-4o-mini", temperature=0.1 + api_key=os.getenv("OPENAI_API_KEY"), + model_name="gpt-4o-mini", + temperature=0.1, ) # Aggregator system prompt @@ -25,4 +27,4 @@ prompt_generator_sys_prompt = Prompt( llm=model, ) -print(prompt_generator_sys_prompt.get_prompt()) \ No newline at end of file +# print(prompt_generator_sys_prompt.get_prompt()) diff --git a/pyproject.toml b/pyproject.toml index 3c7cf63a..424ac7f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "5.8.7" +version = "5.9.1" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/agents/__init__.py b/swarms/agents/__init__.py index 1996a1c5..d2156d64 100644 --- a/swarms/agents/__init__.py +++ b/swarms/agents/__init__.py @@ -14,8 +14,6 @@ from swarms.agents.tool_agent import ToolAgent from swarms.agents.create_agents_from_yaml import ( create_agents_from_yaml, ) -from swarms.agents.prompt_generator_agent import PromptGeneratorAgent - __all__ = [ "ToolAgent", @@ -30,5 +28,4 @@ __all__ = [ "check_exit", "check_end", "create_agents_from_yaml", - "PromptGeneratorAgent", ] diff --git a/swarms/agents/cli_prompt_generator_func.py b/swarms/agents/cli_prompt_generator_func.py deleted file mode 100644 index efe14044..00000000 --- a/swarms/agents/cli_prompt_generator_func.py +++ /dev/null @@ -1,84 +0,0 @@ -import os -from swarms import Agent -from swarm_models import OpenAIChat -from swarms.prompts.prompt_generator_optimizer import ( - prompt_generator_sys_prompt, -) -from dotenv import load_dotenv -from swarms.agents.prompt_generator_agent import PromptGeneratorAgent -from yaml import dump - -load_dotenv() - - -def generate_prompt( - num_loops: int = 1, - autosave: bool = True, - save_to_yaml: bool = False, - prompt: str = None, - save_format: str = "yaml", -) -> None: - """ - This function creates and runs a prompt generator agent with default settings for number of loops and autosave. - - Args: - num_loops (int, optional): The number of loops to run the agent. Defaults to 1. - autosave (bool, optional): Whether to autosave the agent's state. Defaults to True. - save_to_yaml (bool, optional): Whether to save the agent's configuration to a YAML file. Defaults to False. - prompt (str): The prompt to generate. - save_format (str, optional): The format in which to save the generated prompt. Defaults to "yaml". - - Returns: - None - """ - # Get the OpenAI API key from the environment variable - api_key = os.getenv("OPENAI_API_KEY") - - # Create an instance of the OpenAIChat class - model = OpenAIChat( - openai_api_key=api_key, - model_name="gpt-4o-mini", - temperature=0.1, - max_tokens=2000, - ) - - # Initialize the agent - agent = Agent( - agent_name="Prompt-Optimizer", - system_prompt=prompt_generator_sys_prompt.get_prompt(), - llm=model, - max_loops=num_loops, - autosave=autosave, - dashboard=False, - verbose=True, - dynamic_temperature_enabled=True, - saved_state_path="optimizer_agent.json", - user_name="swarms_corp", - retry_attempts=1, - context_length=200000, - return_step_meta=False, - output_type="string", - ) - - # Main Class - prompt_generator = PromptGeneratorAgent(agent) - - # Run the agent - prompt_generator.run(prompt, save_format) - - if save_to_yaml: - with open("agent_config.yaml", "w") as file: - dump(agent.config, file) - - -# # Example usage -# if __name__ == "__main__": -# try: -# create_and_run_agent( -# num_loops=1, -# autosave=True, -# save_to_yaml=True, -# prompt="Generate an amazing prompt for analyzing healthcare insurance documents", -# ) -# except Exception as e: -# logger.error(f"An error occurred: {e}") diff --git a/swarms/agents/prompt_generator_agent.py b/swarms/agents/prompt_generator_agent.py deleted file mode 100644 index 4f6de7da..00000000 --- a/swarms/agents/prompt_generator_agent.py +++ /dev/null @@ -1,92 +0,0 @@ -import json -import time -import uuid - -import yaml -from dotenv import load_dotenv -from loguru import logger - -from swarms.structs.agent import Agent -from swarms.prompts.prompt_generator_optimizer import ( - prompt_generator_sys_prompt, -) - - -load_dotenv() - - -class PromptGeneratorAgent: - """ - A class representing a prompt generator agent. - - Attributes: - ---------- - agent : Agent - The underlying agent instance. - """ - - def __init__(self, agent: Agent): - """ - Initializes the PromptGeneratorAgent instance. - - Args: - ---- - agent : Agent - The agent instance to be used for prompt generation. - """ - self.agent = agent - - def run(self, task: str, format: str = "json") -> str: - """ - Runs the prompt generator agent with the given task description and saves the generated prompt with the given metadata in the specified format. - - Args: - ---- - task : str - The task description to be used for prompt generation. - metadata : Dict[str, Any] - The metadata to be saved along with the prompt. - format : str, optional - The format in which the prompt should be saved (default is "json"). - - Returns: - ------- - str - The generated prompt. - """ - prompt = self.agent.run(task) - self.save_prompt(prompt, format) - return prompt - - def save_prompt( - self, - prompt: str, - format: str = "yaml", - ): - """ - Saves the generated prompt with the given metadata in the specified format using the prompt generator sys prompt model dump. - - Args: - ---- - prompt : str - The generated prompt to be saved. - metadata : Dict[str, Any] - The metadata to be saved along with the prompt. - format : str, optional - The format in which the prompt should be saved (default is "json"). - """ - data = { - "prompt_history": prompt_generator_sys_prompt.model_dump(), - "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), - "prompt": prompt, - } - if format == "json": - with open(f"prompt_{uuid.uuid4()}.json", "w") as f: - json.dump(data, f, indent=4) - elif format == "yaml": - with open(f"prompt_{uuid.uuid4()}.yaml", "w") as f: - yaml.dump(data, f) - else: - logger.error( - "Invalid format. Only 'json' and 'yaml' are supported." - ) diff --git a/swarms/cli/main.py b/swarms/cli/main.py index a798627b..738deec6 100644 --- a/swarms/cli/main.py +++ b/swarms/cli/main.py @@ -8,7 +8,6 @@ from swarms.cli.onboarding_process import OnboardingProcess from swarms.agents.create_agents_from_yaml import ( create_agents_from_yaml, ) -from swarms.agents.cli_prompt_generator_func import generate_prompt import subprocess console = Console() @@ -46,6 +45,7 @@ def show_help(): [bold white]run-agents[/bold white] : Run your Agents from your specified yaml file. Specify the yaml file with path the `--yaml-file` arg. Example: `--yaml-file agents.yaml` [bold white]generate-prompt[/bold white] : Generate a prompt through automated prompt engineering. Requires an OPENAI Key in your `.env` Example: --prompt "Generate a prompt for an agent to analyze legal docs" [bold white]auto-upgrade[/bold white] : Automatically upgrades Swarms to the latest version + [bold white]book-call[/bold white] : Book a strategy session with our team to discuss your use case and get personalized guidance For more details, visit: https://docs.swarms.world """ @@ -81,6 +81,18 @@ def redirect_to_docs(): time.sleep(2) +# Redirect to docs +def redirect_to_call(): + console.print( + "[bold yellow]Opening the Call page...[/bold yellow]" + ) + # Simulating API key retrieval process by opening the website + import webbrowser + + webbrowser.open("https://cal.com/swarms/swarms-strategy-session") + time.sleep(2) + + # Check and start cache (login system simulation) def check_login(): cache_file = "cache.txt" @@ -155,7 +167,8 @@ def main(): "check-login", "run-agents", "generate-prompt", # Added new command for generating prompts - "auto-upgrade", # Added new command for auto-upgrade + "auto-upgrade", # Added new command for auto-upgrade, + "book-call", ], help="Command to run", ) @@ -204,22 +217,24 @@ def main(): create_agents_from_yaml( yaml_file=args.yaml_file, return_type="tasks" ) - elif args.command == "generate-prompt": - if ( - args.prompt - ): # Corrected from args.prompt_task to args.prompt - generate_prompt( - num_loops=args.num_loops, - autosave=args.autosave, - save_to_yaml=args.save_to_yaml, - prompt=args.prompt, # Corrected from args.prompt_task to args.prompt - ) - else: - console.print( - "[bold red]Please specify a task for generating a prompt using '--prompt'.[/bold red]" - ) + # elif args.command == "generate-prompt": + # if ( + # args.prompt + # ): # Corrected from args.prompt_task to args.prompt + # generate_prompt( + # num_loops=args.num_loops, + # autosave=args.autosave, + # save_to_yaml=args.save_to_yaml, + # prompt=args.prompt, # Corrected from args.prompt_task to args.prompt + # ) + # else: + # console.print( + # "[bold red]Please specify a task for generating a prompt using '--prompt'.[/bold red]" + # ) elif args.command == "auto-upgrade": check_and_upgrade_version() + elif args.command == "book-call": + redirect_to_call() else: console.print( "[bold red]Unknown command! Type 'help' for usage.[/bold red]" diff --git a/swarms/prompts/prompt.py b/swarms/prompts/prompt.py index a5e95288..99cc21ca 100644 --- a/swarms/prompts/prompt.py +++ b/swarms/prompts/prompt.py @@ -13,9 +13,11 @@ from pydantic.v1 import validator from swarms_cloud.utils.log_to_swarms_database import log_agent_data from swarms_cloud.utils.capture_system_data import capture_system_data from swarms.tools.base_tool import BaseTool + # from swarms.agents.ape_agent import auto_generate_prompt from typing import Any + class Prompt(BaseModel): """ A class representing a prompt with content, edit history, and version control. @@ -91,13 +93,13 @@ class Prompt(BaseModel): values["content"] ] # Store initial version in history return v - + def __init__(self, **data): super().__init__(**data) - + if self.autosave: self._autosave() - + if self.auto_generate_prompt and self.llm: self.auto_generate_prompt() @@ -252,7 +254,7 @@ class Prompt(BaseModel): with open(file_path, "w") as file: json.dump(self.model_dump(), file) logger.info(f"Autosaved prompt {self.id} to {file_path}.") - + # def auto_generate_prompt(self): # logger.info(f"Auto-generating prompt for {self.name}") # task = self.name + " " + self.description + " " + self.content @@ -260,7 +262,7 @@ class Prompt(BaseModel): # logger.info("Generated prompt successfully, updating content") # self.edit_prompt(prompt) # logger.info("Prompt content updated") - + # return "Prompt auto-generated successfully." class Config: diff --git a/swarms/prompts/prompt_generator.py b/swarms/prompts/prompt_generator.py index d8be4f76..ad83529f 100644 --- a/swarms/prompts/prompt_generator.py +++ b/swarms/prompts/prompt_generator.py @@ -66,5 +66,5 @@ prompt_generator_sys_prompt.edit_prompt( """ ) -print(prompt_generator_sys_prompt.get_prompt()) -print(prompt_generator_sys_prompt.model_dump_json(indent=4)) +# print(prompt_generator_sys_prompt.get_prompt()) +# print(prompt_generator_sys_prompt.model_dump_json(indent=4)) diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 77ad5436..b8915583 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -67,6 +67,17 @@ from swarms.structs.yaml_model import ( ) from swarms.structs.swarm_router import SwarmRouter, SwarmType from swarms.structs.swarm_arange import SwarmRearrange +from swarms.structs.multi_agent_exec import ( + run_agents_concurrently, + run_agents_concurrently_async, + run_single_agent, + run_agents_concurrently_multiprocess, + run_agents_sequentially, + run_agents_with_different_tasks, + run_agent_with_timeout, + run_agents_with_resource_monitoring, + +) __all__ = [ "Agent", @@ -130,4 +141,12 @@ __all__ = [ "SwarmRouter", "SwarmType", "SwarmRearrange", + "run_agents_concurrently", + "run_agents_concurrently_async", + "run_single_agent", + "run_agents_concurrently_multiprocess", + "run_agents_sequentially", + "run_agents_with_different_tasks", + "run_agent_with_timeout", + "run_agents_with_resource_monitoring", ] diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index e9ddf8b0..4a467095 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -57,7 +57,7 @@ from clusterops import ( execute_with_cpu_cores, ) from swarms.agents.ape_agent import auto_generate_prompt -import yaml + # Utils # Custom stopping condition @@ -1014,7 +1014,9 @@ class Agent: elif self.output_type == "dict": return self.agent_output.model_dump() elif self.output_type == "yaml": - return yaml.safe_dump(self.agent_output.model_dump(), sort_keys=False) + return yaml.safe_dump( + self.agent_output.model_dump(), sort_keys=False + ) else: raise ValueError( f"Invalid output type: {self.output_type}" diff --git a/swarms/structs/auto_swarm_builder.py b/swarms/structs/auto_swarm_builder.py deleted file mode 100644 index 37a93e2a..00000000 --- a/swarms/structs/auto_swarm_builder.py +++ /dev/null @@ -1,20 +0,0 @@ -from typing import List, Any, Dict, Optional - - -class AutoSwarmBuilder: - def __init__(self, task: str, num_agents: int = 10, batch_size: int = 10): - self.task = task - self.num_agents = num_agents - self.batch_size = batch_size - - def run(self, task: str, image_url: str = None, *args, **kwargs): - pass - - def _create_swarm(self): - pass - - def _create_agents(self): - pass - - def _run_agents(self): - pass diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py new file mode 100644 index 00000000..36b7a81c --- /dev/null +++ b/swarms/structs/multi_agent_exec.py @@ -0,0 +1,344 @@ +import asyncio +from concurrent.futures import ThreadPoolExecutor +import psutil +from dataclasses import dataclass +import threading +from typing import List, Union, Any, Callable +from multiprocessing import cpu_count + +import uvloop +from swarms.structs.agent import Agent +from swarms.utils.calculate_func_metrics import profile_func + +# Use uvloop for faster asyncio event loop +asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + +# Type definitions +AgentType = Union[Agent, Callable] + + +def run_single_agent(agent: AgentType, task: str) -> Any: + """Run a single agent synchronously""" + return agent.run(task) + + +async def run_agent_async( + agent: AgentType, task: str, executor: ThreadPoolExecutor +) -> Any: + """ + Run an agent asynchronously using a thread executor. + + Args: + agent: Agent instance to run + task: Task string to execute + executor: ThreadPoolExecutor instance for handling CPU-bound operations + + Returns: + Agent execution result + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + executor, run_single_agent, agent, task + ) + + +async def run_agents_concurrently_async( + agents: List[AgentType], task: str, executor: ThreadPoolExecutor +) -> List[Any]: + """ + Run multiple agents concurrently using asyncio and thread executor. + + Args: + agents: List of Agent instances to run concurrently + task: Task string to execute + executor: ThreadPoolExecutor for CPU-bound operations + + Returns: + List of outputs from each agent + """ + results = await asyncio.gather( + *(run_agent_async(agent, task, executor) for agent in agents) + ) + return results + + +@profile_func +def run_agents_concurrently( + agents: List[AgentType], + task: str, + batch_size: int = None, + max_workers: int = None, +) -> List[Any]: + """ + Optimized concurrent agent runner using both uvloop and ThreadPoolExecutor. + + Args: + agents: List of Agent instances to run concurrently + task: Task string to execute + batch_size: Number of agents to run in parallel in each batch (defaults to CPU count) + max_workers: Maximum number of threads in the executor (defaults to CPU count * 2) + + Returns: + List of outputs from each agent + """ + # Optimize defaults based on system resources + cpu_cores = cpu_count() + batch_size = batch_size or cpu_cores + max_workers = max_workers or cpu_cores * 2 + + results = [] + + # Get or create event loop + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Create a shared thread pool executor with optimal worker count + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Process agents in batches + for i in range(0, len(agents), batch_size): + batch = agents[i : i + batch_size] + batch_results = loop.run_until_complete( + run_agents_concurrently_async(batch, task, executor) + ) + results.extend(batch_results) + + return results + + +@profile_func +def run_agents_concurrently_multiprocess( + agents: List[Agent], task: str, batch_size: int = cpu_count() +) -> List[Any]: + """ + Manage and run multiple agents concurrently in batches, with optimized performance. + + Args: + agents (List[Agent]): List of Agent instances to run concurrently. + task (str): The task string to execute by all agents. + batch_size (int, optional): Number of agents to run in parallel in each batch. + Defaults to the number of CPU cores. + + Returns: + List[Any]: A list of outputs from each agent. + """ + results = [] + loop = asyncio.get_event_loop() + + # Process agents in batches to avoid overwhelming system resources + for i in range(0, len(agents), batch_size): + batch = agents[i : i + batch_size] + batch_results = loop.run_until_complete( + run_agents_concurrently_async(batch, task) + ) + results.extend(batch_results) + + return results + +@profile_func +def run_agents_sequentially(agents: List[AgentType], task: str) -> List[Any]: + """ + Run multiple agents sequentially for baseline comparison. + + Args: + agents: List of Agent instances to run + task: Task string to execute + + Returns: + List of outputs from each agent + """ + return [run_single_agent(agent, task) for agent in agents] + + +@profile_func +def run_agents_with_different_tasks( + agent_task_pairs: List[tuple[AgentType, str]], + batch_size: int = None, + max_workers: int = None +) -> List[Any]: + """ + Run multiple agents with different tasks concurrently. + + Args: + agent_task_pairs: List of (agent, task) tuples + batch_size: Number of agents to run in parallel + max_workers: Maximum number of threads + + Returns: + List of outputs from each agent + """ + async def run_pair_async(pair: tuple[AgentType, str], executor: ThreadPoolExecutor) -> Any: + agent, task = pair + return await run_agent_async(agent, task, executor) + + cpu_cores = cpu_count() + batch_size = batch_size or cpu_cores + max_workers = max_workers or cpu_cores * 2 + results = [] + + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + for i in range(0, len(agent_task_pairs), batch_size): + batch = agent_task_pairs[i : i + batch_size] + batch_results = loop.run_until_complete( + asyncio.gather(*(run_pair_async(pair, executor) for pair in batch)) + ) + results.extend(batch_results) + + return results + + +async def run_agent_with_timeout( + agent: AgentType, + task: str, + timeout: float, + executor: ThreadPoolExecutor +) -> Any: + """ + Run an agent with a timeout limit. + + Args: + agent: Agent instance to run + task: Task string to execute + timeout: Timeout in seconds + executor: ThreadPoolExecutor instance + + Returns: + Agent execution result or None if timeout occurs + """ + try: + return await asyncio.wait_for( + run_agent_async(agent, task, executor), + timeout=timeout + ) + except asyncio.TimeoutError: + return None + +@profile_func +def run_agents_with_timeout( + agents: List[AgentType], + task: str, + timeout: float, + batch_size: int = None, + max_workers: int = None +) -> List[Any]: + """ + Run multiple agents concurrently with a timeout for each agent. + + Args: + agents: List of Agent instances + task: Task string to execute + timeout: Timeout in seconds for each agent + batch_size: Number of agents to run in parallel + max_workers: Maximum number of threads + + Returns: + List of outputs (None for timed out agents) + """ + cpu_cores = cpu_count() + batch_size = batch_size or cpu_cores + max_workers = max_workers or cpu_cores * 2 + results = [] + + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + for i in range(0, len(agents), batch_size): + batch = agents[i : i + batch_size] + batch_results = loop.run_until_complete( + asyncio.gather( + *(run_agent_with_timeout(agent, task, timeout, executor) + for agent in batch) + ) + ) + results.extend(batch_results) + + return results + + + + + +@dataclass +class ResourceMetrics: + cpu_percent: float + memory_percent: float + active_threads: int + +def get_system_metrics() -> ResourceMetrics: + """Get current system resource usage""" + return ResourceMetrics( + cpu_percent=psutil.cpu_percent(), + memory_percent=psutil.virtual_memory().percent, + active_threads=threading.active_count() + ) + +@profile_func +def run_agents_with_resource_monitoring( + agents: List[AgentType], + task: str, + cpu_threshold: float = 90.0, + memory_threshold: float = 90.0, + check_interval: float = 1.0 +) -> List[Any]: + """ + Run agents with system resource monitoring and adaptive batch sizing. + + Args: + agents: List of Agent instances + task: Task string to execute + cpu_threshold: Max CPU usage percentage + memory_threshold: Max memory usage percentage + check_interval: Resource check interval in seconds + + Returns: + List of outputs from each agent + """ + async def monitor_resources(): + while True: + metrics = get_system_metrics() + if metrics.cpu_percent > cpu_threshold or metrics.memory_percent > memory_threshold: + # Reduce batch size or pause execution + pass + await asyncio.sleep(check_interval) + + # Implementation details... + +# # Example usage: +# # Initialize your agents with the same model to avoid re-creating it +# agents = [ +# Agent( +# agent_name=f"Financial-Analysis-Agent_parallel_swarm{i}", +# system_prompt=FINANCIAL_AGENT_SYS_PROMPT, +# llm=model, +# max_loops=1, +# autosave=True, +# dashboard=False, +# verbose=False, +# dynamic_temperature_enabled=False, +# saved_state_path=f"finance_agent_{i}.json", +# user_name="swarms_corp", +# retry_attempts=1, +# context_length=200000, +# return_step_meta=False, +# ) +# for i in range(5) # Assuming you want 10 agents +# ] + +# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria" +# outputs = run_agents_concurrently(agents, task) + +# for i, output in enumerate(outputs): +# print(f"Output from agent {i+1}:\n{output}") + diff --git a/swarms/structs/multi_process_workflow.py b/swarms/structs/multi_process_workflow.py index af31d730..44051d0a 100644 --- a/swarms/structs/multi_process_workflow.py +++ b/swarms/structs/multi_process_workflow.py @@ -1,5 +1,5 @@ from multiprocessing import Manager, Pool, cpu_count -from typing import Sequence +from typing import Sequence, Union, Callable from swarms.structs.agent import Agent from swarms.structs.base_workflow import BaseWorkflow @@ -49,7 +49,7 @@ class MultiProcessWorkflow(BaseWorkflow): self, max_workers: int = 5, autosave: bool = True, - agents: Sequence[Agent] = None, + agents: Sequence[Union[Agent, Callable]] = None, *args, **kwargs, ): diff --git a/swarms/structs/run_agents_in_parallel.py b/swarms/structs/run_agents_in_parallel.py deleted file mode 100644 index 06a01a58..00000000 --- a/swarms/structs/run_agents_in_parallel.py +++ /dev/null @@ -1,120 +0,0 @@ -import os - -# from swarms.structs. import OpenAIChat -import asyncio -from swarms.utils.calculate_func_metrics import profile_func - - -# Function to run a single agent on the task (synchronous) -def run_single_agent(agent, task): - return agent.run(task) - - -# Asynchronous wrapper for agent tasks -async def run_agent_async(agent, task): - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, run_single_agent, agent, task - ) - - -# Asynchronous function to run agents concurrently -async def run_agents_concurrently_async(agents, task: str): - """ - Run multiple agents concurrently on the same task with optimized performance. - - :param agents: List of Agent instances to run concurrently. - :param task: The task string to execute by all agents. - :return: A list of outputs from each agent. - """ - - # Run all agents asynchronously using asyncio.gather - results = await asyncio.gather( - *(run_agent_async(agent, task) for agent in agents) - ) - return results - - -# Function to manage the overall process and batching -@profile_func -def run_agents_concurrently(agents, task: str, batch_size: int = 5): - """ - Manage and run multiple agents concurrently in batches, with optimized performance. - - :param agents: List of Agent instances to run concurrently. - :param task: The task string to execute by all agents. - :param batch_size: Number of agents to run in parallel in each batch. - :return: A list of outputs from each agent. - """ - - results = [] - loop = asyncio.get_event_loop() - - batch_size = ( - os.cpu_count() if batch_size > os.cpu_count() else batch_size - ) - - # Process agents in batches to avoid overwhelming system resources - for i in range(0, len(agents), batch_size): - batch = agents[i : i + batch_size] - batch_results = loop.run_until_complete( - run_agents_concurrently_async(batch, task) - ) - results.extend(batch_results) - - return results - - -# # Example usage: -# # Initialize your agents with the same model to avoid re-creating it -# agents = [ -# Agent( -# agent_name=f"Financial-Analysis-Agent_parallel_swarm{i}", -# system_prompt=FINANCIAL_AGENT_SYS_PROMPT, -# llm=model, -# max_loops=1, -# autosave=True, -# dashboard=False, -# verbose=False, -# dynamic_temperature_enabled=False, -# saved_state_path=f"finance_agent_{i}.json", -# user_name="swarms_corp", -# retry_attempts=1, -# context_length=200000, -# return_step_meta=False, -# ) -# for i in range(5) # Assuming you want 10 agents -# ] - -# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria" -# outputs = run_agents_concurrently(agents, task) - -# for i, output in enumerate(outputs): -# print(f"Output from agent {i+1}:\n{output}") - -# # Output from agent 2: -# # execution_time=12.89196228981018 memory_usage=-294.9375 cpu_usage=-10.3 io_operations=23309 function_calls=1 -# # execution_time=11.810921907424927 memory_usage=-242.734375 cpu_usage=-26.4 io_operations=10752 function_calls=1 - -# # Parallel -# # execution_time=18.79391312599182 memory_usage=-342.9375 cpu_usage=-2.5 io_operations=59518 function_calls=1 - -# # # Multiprocess -# # 2024-08-22T14:49:33.986491-0400 Function metrics: { -# # "execution_time": 24.783875942230225, -# # "memory_usage": -286.734375, -# # "cpu_usage": -24.6, -# # "io_operations": 17961, -# # "function_calls": 1 -# # } - - -# # Latest -# # Analysis-Agent_parallel_swarm4_state.json -# # 2024-08-22T15:43:11.800970-0400 Function metrics: { -# # "execution_time": 11.062030792236328, -# # "memory_usage": -249.5625, -# # "cpu_usage": -15.700000000000003, -# # "io_operations": 13439, -# # "function_calls": 1 -# # } diff --git a/swarms/structs/run_agents_in_parallel_async_multiprocess.py b/swarms/structs/run_agents_in_parallel_async_multiprocess.py deleted file mode 100644 index 4a1c7c63..00000000 --- a/swarms/structs/run_agents_in_parallel_async_multiprocess.py +++ /dev/null @@ -1,130 +0,0 @@ -import asyncio -from typing import List, Any -from swarms import Agent -from multiprocessing import cpu_count -from swarms.utils.calculate_func_metrics import profile_func - -# Use uvloop for faster asyncio event loop -asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - - -def run_single_agent(agent: Agent, task: str) -> Any: - """ - Run a single agent on the given task. - - Args: - agent (Agent): The agent to run. - task (str): The task for the agent to perform. - - Returns: - Any: The result of the agent's execution. - """ - return agent.run(task) - - -async def run_agent_async(agent: Agent, task: str) -> Any: - """ - Asynchronous wrapper for agent tasks. - - Args: - agent (Agent): The agent to run asynchronously. - task (str): The task for the agent to perform. - - Returns: - Any: The result of the agent's execution. - """ - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, run_single_agent, agent, task - ) - - -async def run_agents_concurrently_async( - agents: List[Agent], task: str -) -> List[Any]: - """ - Run multiple agents concurrently on the same task with optimized performance. - - Args: - agents (List[Agent]): List of Agent instances to run concurrently. - task (str): The task string to execute by all agents. - - Returns: - List[Any]: A list of outputs from each agent. - """ - results = await asyncio.gather( - *(run_agent_async(agent, task) for agent in agents) - ) - return results - - -@profile_func -def run_agents_concurrently_multiprocess( - agents: List[Agent], task: str, batch_size: int = cpu_count() -) -> List[Any]: - """ - Manage and run multiple agents concurrently in batches, with optimized performance. - - Args: - agents (List[Agent]): List of Agent instances to run concurrently. - task (str): The task string to execute by all agents. - batch_size (int, optional): Number of agents to run in parallel in each batch. - Defaults to the number of CPU cores. - - Returns: - List[Any]: A list of outputs from each agent. - """ - results = [] - loop = asyncio.get_event_loop() - - # Process agents in batches to avoid overwhelming system resources - for i in range(0, len(agents), batch_size): - batch = agents[i : i + batch_size] - batch_results = loop.run_until_complete( - run_agents_concurrently_async(batch, task) - ) - results.extend(batch_results) - - return results - - -# # # Example usage: -# # Initialize your agents with the same model to avoid re-creating it -# agents = [ -# Agent( -# agent_name=f"Financial-Analysis-Agent_new_parallel_swarm_test{i}", -# system_prompt=FINANCIAL_AGENT_SYS_PROMPT, -# llm=model, -# max_loops=1, -# autosave=True, -# dashboard=False, -# verbose=False, -# dynamic_temperature_enabled=False, -# saved_state_path=f"finance_agent_{i}.json", -# user_name="swarms_corp", -# retry_attempts=1, -# context_length=200000, -# return_step_meta=False, -# ) -# for i in range(5) # Assuming you want 10 agents -# ] - -# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria" -# outputs = run_agents_concurrently_multiprocess( -# agents, -# task, -# ) - -# for i, output in enumerate(outputs): -# print(f"Output from agent {i+1}:\n{output}") - - -# # execution_time=15.958749055862427 memory_usage=-328.046875 cpu_usage=-2.5999999999999943 io_operations=81297 function_calls=1 -# # Analysis-Agent_new_parallel_swarm_test1_state.json -# # 2024-08-22T15:42:12.463246-0400 Function metrics: { -# # "execution_time": 15.958749055862427, -# # "memory_usage": -328.046875, -# # "cpu_usage": -2.5999999999999943, -# # "io_operations": 81297, -# "function_calls": 1 -# } diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index 95993bc4..cc4a1865 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -14,6 +14,9 @@ class SequentialWorkflow(BaseSwarm): max_loops (int, optional): The maximum number of loops to execute the workflow. Defaults to 1. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. + + Raises: + ValueError: If agents list is None or empty, or if max_loops is 0 """ def __init__( @@ -25,26 +28,40 @@ class SequentialWorkflow(BaseSwarm): *args, **kwargs, ): - super().__init__( - name=name, - description=description, - agents=agents, - *args, - **kwargs, - ) - self.name = name - self.description = description - self.agents = agents - self.flow = " -> ".join(agent.agent_name for agent in agents) - self.agent_rearrange = AgentRearrange( - name=name, - description=description, - agents=agents, - flow=self.flow, - max_loops=max_loops, - *args, - **kwargs, - ) + if agents is None or len(agents) == 0: + raise ValueError("Agents list cannot be None or empty") + + if max_loops == 0: + raise ValueError("max_loops cannot be 0") + + try: + super().__init__( + name=name, + description=description, + agents=agents, + *args, + **kwargs, + ) + self.name = name + self.description = description + self.agents = agents + self.flow = " -> ".join( + agent.agent_name for agent in agents + ) + self.agent_rearrange = AgentRearrange( + name=name, + description=description, + agents=agents, + flow=self.flow, + max_loops=max_loops, + *args, + **kwargs, + ) + except Exception as e: + logger.error( + f"Error initializing SequentialWorkflow: {str(e)}" + ) + raise def run(self, task: str) -> str: """ @@ -55,7 +72,14 @@ class SequentialWorkflow(BaseSwarm): Returns: str: The final result after processing through all agents. + + Raises: + ValueError: If task is None or empty + Exception: If any error occurs during task execution """ + if not task or not isinstance(task, str): + raise ValueError("Task must be a non-empty string") + try: logger.info( f"Running task with dynamic flow: {self.flow}" diff --git a/swarms/structs/swarm_matcher.py b/swarms/structs/swarm_matcher.py index 16adb5c7..ebbfa0a3 100644 --- a/swarms/structs/swarm_matcher.py +++ b/swarms/structs/swarm_matcher.py @@ -1,107 +1,338 @@ -from typing import List, Tuple, Optional -import numpy as np -import torch -from transformers import AutoTokenizer, AutoModel +# from typing import List, Tuple, Optional +# import numpy as np +# import torch +# from transformers import AutoTokenizer, AutoModel +# from pydantic import BaseModel, Field +# from loguru import logger +# import json +# from tenacity import retry, stop_after_attempt, wait_exponential +# from uuid import uuid4 + + +# class SwarmType(BaseModel): +# name: str +# description: str +# embedding: Optional[List[float]] = Field( +# default=None, exclude=True +# ) + + +# class SwarmMatcherConfig(BaseModel): +# model_name: str = "sentence-transformers/all-MiniLM-L6-v2" +# embedding_dim: int = ( +# 512 # Dimension of the sentence-transformers model +# ) + + +# class SwarmMatcher: +# """ +# A class for matching tasks to swarm types based on their descriptions. +# It utilizes a transformer model to generate embeddings for task and swarm type descriptions, +# and then calculates the dot product to find the best match. +# """ + +# def __init__(self, config: SwarmMatcherConfig): +# """ +# Initializes the SwarmMatcher with a configuration. + +# Args: +# config (SwarmMatcherConfig): The configuration for the SwarmMatcher. +# """ +# logger.add("swarm_matcher_debug.log", level="DEBUG") +# logger.debug("Initializing SwarmMatcher") +# try: +# self.config = config +# self.tokenizer = AutoTokenizer.from_pretrained( +# config.model_name +# ) +# self.model = AutoModel.from_pretrained(config.model_name) +# self.swarm_types: List[SwarmType] = [] +# logger.debug("SwarmMatcher initialized successfully") +# except Exception as e: +# logger.error(f"Error initializing SwarmMatcher: {str(e)}") +# raise + +# @retry( +# stop=stop_after_attempt(3), +# wait=wait_exponential(multiplier=1, min=4, max=10), +# ) +# def get_embedding(self, text: str) -> np.ndarray: +# """ +# Generates an embedding for a given text using the configured model. + +# Args: +# text (str): The text for which to generate an embedding. + +# Returns: +# np.ndarray: The embedding vector for the text. +# """ +# logger.debug(f"Getting embedding for text: {text[:50]}...") +# try: +# inputs = self.tokenizer( +# text, +# return_tensors="pt", +# padding=True, +# truncation=True, +# max_length=512, +# ) +# with torch.no_grad(): +# outputs = self.model(**inputs) +# embedding = ( +# outputs.last_hidden_state.mean(dim=1) +# .squeeze() +# .numpy() +# ) +# logger.debug("Embedding generated successfully") +# return embedding +# except Exception as e: +# logger.error(f"Error generating embedding: {str(e)}") +# raise + +# def add_swarm_type(self, swarm_type: SwarmType): +# """ +# Adds a swarm type to the list of swarm types, generating an embedding for its description. + +# Args: +# swarm_type (SwarmType): The swarm type to add. +# """ +# logger.debug(f"Adding swarm type: {swarm_type.name}") +# try: +# embedding = self.get_embedding(swarm_type.description) +# swarm_type.embedding = embedding.tolist() +# self.swarm_types.append(swarm_type) +# logger.info(f"Added swarm type: {swarm_type.name}") +# except Exception as e: +# logger.error( +# f"Error adding swarm type {swarm_type.name}: {str(e)}" +# ) +# raise + +# def find_best_match(self, task: str) -> Tuple[str, float]: +# """ +# Finds the best match for a given task among the registered swarm types. + +# Args: +# task (str): The task for which to find the best match. + +# Returns: +# Tuple[str, float]: A tuple containing the name of the best matching swarm type and the score. +# """ +# logger.debug(f"Finding best match for task: {task[:50]}...") +# try: +# task_embedding = self.get_embedding(task) +# best_match = None +# best_score = -float("inf") +# for swarm_type in self.swarm_types: +# score = np.dot( +# task_embedding, np.array(swarm_type.embedding) +# ) +# if score > best_score: +# best_score = score +# best_match = swarm_type +# logger.info( +# f"Best match for task: {best_match.name} (score: {best_score})" +# ) +# return best_match.name, float(best_score) +# except Exception as e: +# logger.error( +# f"Error finding best match for task: {str(e)}" +# ) +# raise + +# def auto_select_swarm(self, task: str) -> str: +# """ +# Automatically selects the best swarm type for a given task based on their descriptions. + +# Args: +# task (str): The task for which to select a swarm type. + +# Returns: +# str: The name of the selected swarm type. +# """ +# logger.debug(f"Auto-selecting swarm for task: {task[:50]}...") +# best_match, score = self.find_best_match(task) +# logger.info(f"Task: {task}") +# logger.info(f"Selected Swarm Type: {best_match}") +# logger.info(f"Confidence Score: {score:.2f}") +# return best_match + +# def run_multiple(self, tasks: List[str], *args, **kwargs) -> str: +# swarms = [] + +# for task in tasks: +# output = self.auto_select_swarm(task) + +# # Append +# swarms.append(output) + +# return swarms + +# def save_swarm_types(self, filename: str): +# """ +# Saves the registered swarm types to a JSON file. + +# Args: +# filename (str): The name of the file to which to save the swarm types. +# """ +# try: +# with open(filename, "w") as f: +# json.dump([st.dict() for st in self.swarm_types], f) +# logger.info(f"Saved swarm types to {filename}") +# except Exception as e: +# logger.error(f"Error saving swarm types: {str(e)}") +# raise + +# def load_swarm_types(self, filename: str): +# """ +# Loads swarm types from a JSON file. + +# Args: +# filename (str): The name of the file from which to load the swarm types. +# """ +# try: +# with open(filename, "r") as f: +# swarm_types_data = json.load(f) +# self.swarm_types = [ +# SwarmType(**st) for st in swarm_types_data +# ] +# logger.info(f"Loaded swarm types from {filename}") +# except Exception as e: +# logger.error(f"Error loading swarm types: {str(e)}") +# raise + + +# def initialize_swarm_types(matcher: SwarmMatcher): +# logger.debug("Initializing swarm types") +# swarm_types = [ +# SwarmType( +# name="AgentRearrange", +# description="Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation and minimizing bottlenecks. Keywords: orchestration, coordination, pipeline optimization, task scheduling, resource allocation, workflow management, agent organization, process optimization", +# ), +# SwarmType( +# name="MixtureOfAgents", +# description="Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach to problem-solving and leveraging individual strengths. Keywords: multi-agent system, expert collaboration, distributed intelligence, collective problem solving, agent specialization, team coordination, hybrid approaches, knowledge synthesis", +# ), +# SwarmType( +# name="SpreadSheetSwarm", +# description="Collaborative data processing and analysis in a spreadsheet-like environment, facilitating real-time data sharing and visualization. Keywords: data analysis, tabular processing, collaborative editing, data transformation, spreadsheet operations, data visualization, real-time collaboration, structured data", +# ), +# SwarmType( +# name="SequentialWorkflow", +# description="Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical approach to task execution. Keywords: linear processing, waterfall methodology, step-by-step execution, ordered tasks, sequential operations, process flow, systematic approach, staged execution", +# ), +# SwarmType( +# name="ConcurrentWorkflow", +# description="Process multiple tasks or data sources concurrently in parallel, maximizing productivity and reducing processing time. Keywords: parallel processing, multi-threading, asynchronous execution, distributed computing, concurrent operations, simultaneous tasks, parallel workflows, scalable processing", +# ), +# # SwarmType( +# # name="HierarchicalSwarm", +# # description="Organize agents in a hierarchical structure with clear reporting lines and delegation of responsibilities. Keywords: management hierarchy, organizational structure, delegation, supervision, chain of command, tiered organization, structured coordination", +# # ), +# # SwarmType( +# # name="AdaptiveSwarm", +# # description="Dynamically adjust agent behavior and swarm configuration based on task requirements and performance feedback. Keywords: dynamic adaptation, self-optimization, feedback loops, learning systems, flexible configuration, responsive behavior, adaptive algorithms", +# # ), +# # SwarmType( +# # name="ConsensusSwarm", +# # description="Achieve group decisions through consensus mechanisms and voting protocols among multiple agents. Keywords: group decision making, voting systems, collective intelligence, agreement protocols, democratic processes, collaborative decisions", +# # ), +# ] + +# for swarm_type in swarm_types: +# matcher.add_swarm_type(swarm_type) +# logger.debug("Swarm types initialized") + + +# def swarm_matcher(task: str, *args, **kwargs): +# """ +# Runs the SwarmMatcher example with predefined tasks and swarm types. +# """ +# config = SwarmMatcherConfig() +# matcher = SwarmMatcher(config) +# initialize_swarm_types(matcher) + +# # matcher.save_swarm_types(f"swarm_logs/{uuid4().hex}.json") + +# swarm_type = matcher.auto_select_swarm(task) + +# logger.info(f"{swarm_type}") + +# return swarm_type + + +from typing import List, Tuple, Dict from pydantic import BaseModel, Field from loguru import logger +from uuid import uuid4 +import chromadb import json from tenacity import retry, stop_after_attempt, wait_exponential -# Ensure you have the necessary libraries installed: -# pip install torch transformers pydantic loguru tenacity - class SwarmType(BaseModel): + """A swarm type with its name, description and optional metadata""" + + id: str = Field(default_factory=lambda: str(uuid4())) name: str description: str - embedding: Optional[List[float]] = Field( - default=None, exclude=True - ) + metadata: Dict = Field(default_factory=dict) class SwarmMatcherConfig(BaseModel): - model_name: str = "sentence-transformers/all-MiniLM-L6-v2" - embedding_dim: int = ( - 512 # Dimension of the sentence-transformers model + """Configuration for the SwarmMatcher""" + + collection_name: str = "swarm_types" + distance_metric: str = "cosine" # or "l2" or "ip" + embedding_function: str = ( + "sentence-transformers/all-mpnet-base-v2" # Better model than MiniLM ) + persist_directory: str = "./chroma_db" class SwarmMatcher: """ - A class for matching tasks to swarm types based on their descriptions. - It utilizes a transformer model to generate embeddings for task and swarm type descriptions, - and then calculates the dot product to find the best match. + An improved swarm matcher that uses ChromaDB for better vector similarity search. + Features: + - Persistent storage of embeddings + - Better vector similarity search with multiple distance metrics + - Improved embedding model + - Metadata filtering capabilities + - Batch operations support """ def __init__(self, config: SwarmMatcherConfig): - """ - Initializes the SwarmMatcher with a configuration. - - Args: - config (SwarmMatcherConfig): The configuration for the SwarmMatcher. - """ - logger.add("swarm_matcher_debug.log", level="DEBUG") - logger.debug("Initializing SwarmMatcher") - try: - self.config = config - self.tokenizer = AutoTokenizer.from_pretrained( - config.model_name - ) - self.model = AutoModel.from_pretrained(config.model_name) - self.swarm_types: List[SwarmType] = [] - logger.debug("SwarmMatcher initialized successfully") - except Exception as e: - logger.error(f"Error initializing SwarmMatcher: {str(e)}") - raise - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10), - ) - def get_embedding(self, text: str) -> np.ndarray: - """ - Generates an embedding for a given text using the configured model. + """Initialize the improved swarm matcher""" + logger.add("swarm_matcher.log", rotation="100 MB") + self.config = config - Args: - text (str): The text for which to generate an embedding. + # Initialize ChromaDB client with persistence + self.chroma_client = chromadb.Client() - Returns: - np.ndarray: The embedding vector for the text. - """ - logger.debug(f"Getting embedding for text: {text[:50]}...") + # Get or create collection try: - inputs = self.tokenizer( - text, - return_tensors="pt", - padding=True, - truncation=True, - max_length=512, + self.collection = self.chroma_client.get_collection( + name=config.collection_name, ) - with torch.no_grad(): - outputs = self.model(**inputs) - embedding = ( - outputs.last_hidden_state.mean(dim=1) - .squeeze() - .numpy() + except ValueError: + self.collection = self.chroma_client.create_collection( + name=config.collection_name, + metadata={"hnsw:space": config.distance_metric}, ) - logger.debug("Embedding generated successfully") - return embedding - except Exception as e: - logger.error(f"Error generating embedding: {str(e)}") - raise - def add_swarm_type(self, swarm_type: SwarmType): - """ - Adds a swarm type to the list of swarm types, generating an embedding for its description. + logger.info( + f"Initialized SwarmMatcher with collection '{config.collection_name}'" + ) - Args: - swarm_type (SwarmType): The swarm type to add. - """ - logger.debug(f"Adding swarm type: {swarm_type.name}") + def add_swarm_type(self, swarm_type: SwarmType) -> None: + """Add a single swarm type to the collection""" try: - embedding = self.get_embedding(swarm_type.description) - swarm_type.embedding = embedding.tolist() - self.swarm_types.append(swarm_type) + self.collection.add( + ids=[swarm_type.id], + documents=[swarm_type.description], + metadatas=[ + {"name": swarm_type.name, **swarm_type.metadata} + ], + ) logger.info(f"Added swarm type: {swarm_type.name}") except Exception as e: logger.error( @@ -109,142 +340,239 @@ class SwarmMatcher: ) raise - def find_best_match(self, task: str) -> Tuple[str, float]: - """ - Finds the best match for a given task among the registered swarm types. - - Args: - task (str): The task for which to find the best match. - - Returns: - Tuple[str, float]: A tuple containing the name of the best matching swarm type and the score. - """ - logger.debug(f"Finding best match for task: {task[:50]}...") + def add_swarm_types(self, swarm_types: List[SwarmType]) -> None: + """Add multiple swarm types in batch""" try: - task_embedding = self.get_embedding(task) - best_match = None - best_score = -float("inf") - for swarm_type in self.swarm_types: - score = np.dot( - task_embedding, np.array(swarm_type.embedding) - ) - if score > best_score: - best_score = score - best_match = swarm_type - logger.info( - f"Best match for task: {best_match.name} (score: {best_score})" + self.collection.add( + ids=[st.id for st in swarm_types], + documents=[st.description for st in swarm_types], + metadatas=[ + {"name": st.name, **st.metadata} + for st in swarm_types + ], ) - return best_match.name, float(best_score) + logger.info(f"Added {len(swarm_types)} swarm types") except Exception as e: logger.error( - f"Error finding best match for task: {str(e)}" + f"Error adding swarm types in batch: {str(e)}" ) raise - def auto_select_swarm(self, task: str) -> str: + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + ) + def find_best_matches( + self, + task: str, + n_results: int = 3, + score_threshold: float = 0.7, + ) -> List[Tuple[str, float]]: """ - Automatically selects the best swarm type for a given task based on their descriptions. - - Args: - task (str): The task for which to select a swarm type. - - Returns: - str: The name of the selected swarm type. + Find the best matching swarm types for a given task + Returns multiple matches with their scores """ - logger.debug(f"Auto-selecting swarm for task: {task[:50]}...") - best_match, score = self.find_best_match(task) - logger.info(f"Task: {task}") - logger.info(f"Selected Swarm Type: {best_match}") - logger.info(f"Confidence Score: {score:.2f}") - return best_match - - def run_multiple(self, tasks: List[str], *args, **kwargs) -> str: - swarms = [] + try: + results = self.collection.query( + query_texts=[task], + n_results=n_results, + include=["metadatas", "distances"], + ) - for task in tasks: - output = self.auto_select_swarm(task) + matches = [] + for metadata, distance in zip( + results["metadatas"][0], results["distances"][0] + ): + # Convert distance to similarity score (1 - normalized_distance) + score = 1 - ( + distance / 2 + ) # Normalize cosine distance to [0,1] + if score >= score_threshold: + matches.append((metadata["name"], score)) - # Append - swarms.append(output) + logger.info(f"Found {len(matches)} matches for task") + return matches - return swarms + except Exception as e: + logger.error(f"Error finding matches for task: {str(e)}") + raise - def save_swarm_types(self, filename: str): + def auto_select_swarm(self, task: str) -> str: """ - Saves the registered swarm types to a JSON file. - - Args: - filename (str): The name of the file to which to save the swarm types. + Automatically select the best swarm type for a task + Returns only the top match """ + matches = self.find_best_matches(task, n_results=1) + if not matches: + logger.warning("No suitable matches found for task") + return "SequentialWorkflow" # Default fallback + + best_match, score = matches[0] + logger.info( + f"Selected swarm type '{best_match}' with confidence {score:.3f}" + ) + return best_match + + def run_multiple(self, tasks: List[str]) -> List[str]: + """Process multiple tasks in batch""" + return [self.auto_select_swarm(task) for task in tasks] + + def save_swarm_types(self, filename: str) -> None: + """Export swarm types to JSON""" try: + all_data = self.collection.get( + include=["metadatas", "documents"] + ) + swarm_types = [ + SwarmType( + id=id_, + name=metadata["name"], + description=document, + metadata={ + k: v + for k, v in metadata.items() + if k != "name" + }, + ) + for id_, metadata, document in zip( + all_data["ids"], + all_data["metadatas"], + all_data["documents"], + ) + ] + with open(filename, "w") as f: - json.dump([st.dict() for st in self.swarm_types], f) + json.dump( + [st.dict() for st in swarm_types], f, indent=2 + ) logger.info(f"Saved swarm types to {filename}") except Exception as e: logger.error(f"Error saving swarm types: {str(e)}") raise - def load_swarm_types(self, filename: str): - """ - Loads swarm types from a JSON file. - - Args: - filename (str): The name of the file from which to load the swarm types. - """ + def load_swarm_types(self, filename: str) -> None: + """Import swarm types from JSON""" try: with open(filename, "r") as f: swarm_types_data = json.load(f) - self.swarm_types = [ - SwarmType(**st) for st in swarm_types_data - ] + swarm_types = [SwarmType(**st) for st in swarm_types_data] + self.add_swarm_types(swarm_types) logger.info(f"Loaded swarm types from {filename}") except Exception as e: logger.error(f"Error loading swarm types: {str(e)}") raise -def initialize_swarm_types(matcher: SwarmMatcher): - logger.debug("Initializing swarm types") +def initialize_default_swarm_types(matcher: SwarmMatcher) -> None: + """Initialize the matcher with default swarm types""" swarm_types = [ SwarmType( name="AgentRearrange", - description="Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation and minimizing bottlenecks", + description=""" + Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation + and minimizing bottlenecks. Specialized in orchestration, coordination, pipeline optimization, + task scheduling, resource allocation, workflow management, agent organization, and process optimization. + Best for tasks requiring complex agent interactions and workflow optimization. + """, + metadata={ + "category": "optimization", + "complexity": "high", + }, ), SwarmType( name="MixtureOfAgents", - description="Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach to problem-solving and leveraging individual strengths", + description=""" + Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach + to problem-solving and leveraging individual strengths. Focuses on multi-agent systems, + expert collaboration, distributed intelligence, collective problem solving, agent specialization, + team coordination, hybrid approaches, and knowledge synthesis. Ideal for complex problems + requiring multiple areas of expertise. + """, + metadata={ + "category": "collaboration", + "complexity": "high", + }, ), SwarmType( name="SpreadSheetSwarm", - description="Collaborative data processing and analysis in a spreadsheet-like environment, facilitating real-time data sharing and visualization", + description=""" + Collaborative data processing and analysis in a spreadsheet-like environment, facilitating + real-time data sharing and visualization. Specializes in data analysis, tabular processing, + collaborative editing, data transformation, spreadsheet operations, data visualization, + real-time collaboration, and structured data handling. Perfect for data-intensive tasks + requiring structured analysis. + """, + metadata={ + "category": "data_processing", + "complexity": "medium", + }, ), SwarmType( name="SequentialWorkflow", - description="Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical approach to task execution", + description=""" + Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical + approach to task execution. Focuses on linear processing, waterfall methodology, step-by-step + execution, ordered tasks, sequential operations, process flow, systematic approach, and staged + execution. Best for tasks requiring strict order and dependencies. + """, + metadata={"category": "workflow", "complexity": "low"}, ), SwarmType( name="ConcurrentWorkflow", - description="Process multiple tasks or data sources concurrently in parallel, maximizing productivity and reducing processing time", + description=""" + Process multiple tasks or data sources concurrently in parallel, maximizing productivity + and reducing processing time. Specializes in parallel processing, multi-threading, + asynchronous execution, distributed computing, concurrent operations, simultaneous tasks, + parallel workflows, and scalable processing. Ideal for independent tasks that can be + processed simultaneously. + """, + metadata={"category": "workflow", "complexity": "medium"}, ), ] - for swarm_type in swarm_types: - matcher.add_swarm_type(swarm_type) - logger.debug("Swarm types initialized") + matcher.add_swarm_types(swarm_types) + logger.info("Initialized default swarm types") -def swarm_matcher(task: str, *args, **kwargs): - """ - Runs the SwarmMatcher example with predefined tasks and swarm types. - """ - config = SwarmMatcherConfig() +def create_swarm_matcher( + persist_dir: str = "./chroma_db", + collection_name: str = "swarm_types", +) -> SwarmMatcher: + """Convenience function to create and initialize a swarm matcher""" + config = SwarmMatcherConfig( + persist_directory=persist_dir, collection_name=collection_name + ) matcher = SwarmMatcher(config) - initialize_swarm_types(matcher) + initialize_default_swarm_types(matcher) + return matcher - matcher.save_swarm_types("swarm_types.json") - swarm_type = matcher.auto_select_swarm(task) +# Example usage +def swarm_matcher(task: str) -> str: + # Create and initialize matcher + matcher = create_swarm_matcher() - logger.info(f"{swarm_type}") + swarm_type = matcher.auto_select_swarm(task) + print(f"Task: {task}\nSelected Swarm: {swarm_type}\n") return swarm_type + + +# # Example usage +# if __name__ == "__main__": +# # Create and initialize matcher +# matcher = create_swarm_matcher() + +# # Example tasks +# tasks = [ +# "Analyze this spreadsheet of sales data and create visualizations", +# "Coordinate multiple AI agents to solve a complex problem", +# "Process these tasks one after another in a specific order", +# "Write multiple blog posts about the latest advancements in swarm intelligence all at once", +# "Write a blog post about the latest advancements in swarm intelligence", +# ] + +# # Process tasks +# for task in tasks: +# swarm_type = matcher.auto_select_swarm(task) +# print(f"Task: {task}\nSelected Swarm: {swarm_type}\n") diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index 045761aa..759d34d8 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -13,6 +13,7 @@ from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm from tenacity import retry, stop_after_attempt, wait_fixed from swarms.structs.swarm_matcher import swarm_matcher +from swarms.prompts.ag_prompt import aggregator_system_prompt SwarmType = Literal[ "AgentRearrange", @@ -57,6 +58,7 @@ class SwarmRouter: swarm (Union[AgentRearrange, MixtureOfAgents, SpreadSheetSwarm, SequentialWorkflow, ConcurrentWorkflow]): The instantiated swarm object. logs (List[SwarmLog]): A list of log entries captured during operations. + auto_generate_prompt (bool): A flag to enable/disable auto generation of prompts. Available Swarm Types: - AgentRearrange: Rearranges agents for optimal task execution. @@ -99,6 +101,37 @@ class SwarmRouter: f"SwarmRouter initialized with swarm type: {swarm_type}", ) + self.activate_ape() + + def activate_ape(self): + """Activate automatic prompt engineering for agents that support it""" + try: + logger.info("Activating automatic prompt engineering...") + activated_count = 0 + for agent in self.agents: + if hasattr(agent, "auto_generate_prompt"): + agent.auto_generate_prompt = ( + self.auto_generate_prompts + ) + activated_count += 1 + logger.debug( + f"Activated APE for agent: {agent.name if hasattr(agent, 'name') else 'unnamed'}" + ) + + logger.info( + f"Successfully activated APE for {activated_count} agents" + ) + self._log( + "info", + f"Activated automatic prompt engineering for {activated_count} agents", + ) + + except Exception as e: + error_msg = f"Error activating automatic prompt engineering: {str(e)}" + logger.error(error_msg) + self._log("error", error_msg) + raise RuntimeError(error_msg) from e + @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) def reliability_check(self): logger.info("Logger initializing checks") @@ -137,7 +170,9 @@ class SwarmRouter: ValueError: If an invalid swarm type is provided. """ if self.swarm_type == "auto": - self.swarm_type = swarm_matcher(task) + self.swarm_type = str(swarm_matcher(task)) + + self._create_swarm(self.swarm_type) elif self.swarm_type == "AgentRearrange": return AgentRearrange( @@ -154,8 +189,9 @@ class SwarmRouter: return MixtureOfAgents( name=self.name, description=self.description, - agents=self.agents, - aggregator_agent=[self.agents[-1]], + reference_agents=self.agents, + aggregator_system_prompt=aggregator_system_prompt.get_prompt(), + aggregator_agent=self.agents[-1], layers=self.max_loops, *args, **kwargs, diff --git a/swarms/structs/tree_swarm.py b/swarms/structs/tree_swarm.py index ad8db2a9..ceb15800 100644 --- a/swarms/structs/tree_swarm.py +++ b/swarms/structs/tree_swarm.py @@ -1,39 +1,19 @@ -import os import uuid from collections import Counter from datetime import datetime from typing import Any, List, Optional -from dotenv import load_dotenv from loguru import logger from pydantic import BaseModel, Field from sentence_transformers import SentenceTransformer, util -from swarm_models import OpenAIChat from swarms import Agent -load_dotenv() - -# Get the OpenAI API key from the environment variable -api_key = os.getenv("OPENAI_API_KEY") - - # Pretrained model for embeddings embedding_model = SentenceTransformer( "all-MiniLM-L6-v2" ) # A small, fast model for embedding -# Get the OpenAI API key from the environment variable -api_key = os.getenv("GROQ_API_KEY") - -# Model -model = OpenAIChat( - openai_api_base="https://api.groq.com/openai/v1", - openai_api_key=api_key, - model_name="llama-3.1-70b-versatile", - temperature=0.1, -) - # Pydantic Models for Logging class AgentLogInput(BaseModel): @@ -83,14 +63,18 @@ class TreeAgent(Agent): def __init__( self, + name: str = None, + description: str = None, system_prompt: str = None, - llm: callable = model, + llm: callable = None, agent_name: Optional[str] = None, *args, **kwargs, ): agent_name = agent_name super().__init__( + name=name, + description=description, system_prompt=system_prompt, llm=llm, agent_name=agent_name, diff --git a/swarms/telemetry/bootup.py b/swarms/telemetry/bootup.py index 380b7e4b..24d7a7c4 100644 --- a/swarms/telemetry/bootup.py +++ b/swarms/telemetry/bootup.py @@ -11,10 +11,13 @@ def bootup(): """Bootup swarms""" logging.disable(logging.CRITICAL) os.environ["WANDB_SILENT"] = "true" - # Dynamically set WORKSPACE_DIR based on the current directory - os.environ["WORKSPACE_DIR"] = os.path.join( - os.getcwd(), "agent_workspace" - ) + + # Auto set workspace directory + workspace_dir = os.path.join(os.getcwd(), "agent_workspace") + if not os.path.exists(workspace_dir): + os.makedirs(workspace_dir) + os.environ["WORKSPACE_DIR"] = workspace_dir + warnings.filterwarnings("ignore", category=DeprecationWarning) # Use ThreadPoolExecutor to run disable_logging and auto_update concurrently