From 4e588ed232ccdfbc550a72441a28ab5a53a83f17 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 28 Oct 2024 12:04:07 -0400 Subject: [PATCH] [5.8.7] --- changelog.md | 1 + example.py | 9 +- new_prompt.py | 28 ++ pyproject.toml | 2 +- swarms/prompts/prompt.py | 28 +- swarms/structs/agent.py | 160 ++++++-- swarms/structs/agent_rag.py | 370 +++++++++--------- swarms/structs/auto_swarm_builder.py | 20 + swarms/structs/concurrent_workflow.py | 3 +- ...n_agents_in_parallel_async_multiprocess.py | 71 ++-- 10 files changed, 442 insertions(+), 250 deletions(-) create mode 100644 changelog.md create mode 100644 new_prompt.py create mode 100644 swarms/structs/auto_swarm_builder.py diff --git a/changelog.md b/changelog.md new file mode 100644 index 00000000..4cd80326 --- /dev/null +++ b/changelog.md @@ -0,0 +1 @@ +# 5.8.7 \ No newline at end of file diff --git a/example.py b/example.py index ab77e25b..4a5c328c 100644 --- a/example.py +++ b/example.py @@ -33,11 +33,14 @@ agent = Agent( context_length=200000, return_step_meta=False, # output_type="json", - output_type="string", + output_type="json", # "json", "dict", "csv" OR "string" soon "yaml" and streaming_on=False, + # auto_generate_prompt=True, ) -agent.run( - "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria" +print( + agent.run( + "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria" + ) ) diff --git a/new_prompt.py b/new_prompt.py new file mode 100644 index 00000000..592fc3ab --- /dev/null +++ b/new_prompt.py @@ -0,0 +1,28 @@ +from swarms import Prompt +from swarm_models import OpenAIChat +import os + +model = OpenAIChat( + api_key=os.getenv("OPENAI_API_KEY"), model_name="gpt-4o-mini", temperature=0.1 +) + +# Aggregator system prompt +prompt_generator_sys_prompt = Prompt( + name="prompt-generator-sys-prompt-o1", + description="Generate the most reliable prompt for a specific problem", + content=""" + Your purpose is to craft extremely reliable and production-grade system prompts for other agents. + + # Instructions + - Understand the prompt required for the agent. + - Utilize a combination of the most effective prompting strategies available, including chain of thought, many shot, few shot, and instructions-examples-constraints. + - Craft the prompt by blending the most suitable prompting strategies. + - Ensure the prompt is production-grade ready and educates the agent on how to reason and why to reason in that manner. + - Provide constraints if necessary and as needed. + - The system prompt should be extensive and cover a vast array of potential scenarios to specialize the agent. + """, + auto_generate_prompt=True, + llm=model, +) + +print(prompt_generator_sys_prompt.get_prompt()) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index a5f9ba25..f9695dcb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "5.8.6" +version = "5.8.7" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/prompts/prompt.py b/swarms/prompts/prompt.py index 954ed4df..a5e95288 100644 --- a/swarms/prompts/prompt.py +++ b/swarms/prompts/prompt.py @@ -13,7 +13,8 @@ from pydantic.v1 import validator from swarms_cloud.utils.log_to_swarms_database import log_agent_data from swarms_cloud.utils.capture_system_data import capture_system_data from swarms.tools.base_tool import BaseTool - +# from swarms.agents.ape_agent import auto_generate_prompt +from typing import Any class Prompt(BaseModel): """ @@ -70,11 +71,15 @@ class Prompt(BaseModel): default="prompts", description="The folder path within WORKSPACE_DIR where the prompt will be autosaved", ) + auto_generate_prompt: bool = Field( + default=False, + description="Flag to enable or disable auto-generating the prompt", + ) parent_folder: str = Field( default=os.getenv("WORKSPACE_DIR"), description="The folder where the autosave folder is in", ) - # tools: List[callable] = None + llm: Any = None @validator("edit_history", pre=True, always=True) def initialize_history(cls, v, values): @@ -86,6 +91,15 @@ class Prompt(BaseModel): values["content"] ] # Store initial version in history return v + + def __init__(self, **data): + super().__init__(**data) + + if self.autosave: + self._autosave() + + if self.auto_generate_prompt and self.llm: + self.auto_generate_prompt() def edit_prompt(self, new_content: str) -> None: """ @@ -238,6 +252,16 @@ class Prompt(BaseModel): with open(file_path, "w") as file: json.dump(self.model_dump(), file) logger.info(f"Autosaved prompt {self.id} to {file_path}.") + + # def auto_generate_prompt(self): + # logger.info(f"Auto-generating prompt for {self.name}") + # task = self.name + " " + self.description + " " + self.content + # prompt = auto_generate_prompt(task, llm=self.llm, max_tokens=4000, use_second_sys_prompt=True) + # logger.info("Generated prompt successfully, updating content") + # self.edit_prompt(prompt) + # logger.info("Prompt content updated") + + # return "Prompt auto-generated successfully." class Config: """Pydantic configuration for better JSON serialization.""" diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 30073fab..2d07f106 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -57,7 +57,7 @@ from clusterops import ( execute_with_cpu_cores, ) from swarms.agents.ape_agent import auto_generate_prompt - +import yaml # Utils # Custom stopping condition @@ -799,18 +799,29 @@ class Agent: while attempt < self.retry_attempts and not success: try: if self.long_term_memory is not None: - logger.info("Querying long term memory...") + logger.info( + "Querying long term memory..." + ) self.memory_query(task_prompt) - + # Generate response using LLM response_args = ( - (task_prompt, *args) if img is None else (task_prompt, img, *args) + (task_prompt, *args) + if img is None + else (task_prompt, img, *args) ) - response = self.call_llm(*response_args, **kwargs) - + response = self.call_llm( + *response_args, **kwargs + ) + # Check if response is a dictionary and has 'choices' key - if isinstance(response, dict) and 'choices' in response: - response = response['choices'][0]['message']['content'] + if ( + isinstance(response, dict) + and "choices" in response + ): + response = response["choices"][0][ + "message" + ]["content"] elif isinstance(response, str): # If response is already a string, use it as is pass @@ -818,38 +829,37 @@ class Agent: raise ValueError( f"Unexpected response format: {type(response)}" ) - + # Check and execute tools if self.tools is not None: - print(f"self.tools is not None: {response}") + print( + f"self.tools is not None: {response}" + ) self.parse_and_execute_tools(response) - + # Log the step metadata logged = self.log_step_metadata( - loop_count, - task_prompt, - response + loop_count, task_prompt, response ) logger.info(logged) - + # Convert to a str if the response is not a str response = self.llm_output_parser(response) - + # Print if self.streaming_on is True: self.stream_response(response) else: print(response) - + # Add the response to the memory self.short_memory.add( - role=self.agent_name, - content=response + role=self.agent_name, content=response ) - + # Add to all responses all_responses.append(response) - + # TODO: Implement reliability check if self.tools is not None: # self.parse_function_call_and_execute(response) @@ -978,8 +988,25 @@ class Agent: ] # return self.agent_output_type(all_responses) - - return concat_strings(all_responses) + # More flexible output types + if self.output_type == "string": + return concat_strings(all_responses) + elif self.output_type == "list": + return all_responses + elif self.output_type == "json": + return self.agent_output.model_dump_json(indent=4) + elif self.output_type == "csv": + return self.dict_to_csv( + self.agent_output.model_dump() + ) + elif self.output_type == "dict": + return self.agent_output.model_dump() + elif self.output_type == "yaml": + return yaml.safe_dump(self.agent_output.model_dump(), sort_keys=False) + else: + raise ValueError( + f"Invalid output type: {self.output_type}" + ) except Exception as error: logger.info( @@ -988,20 +1015,79 @@ class Agent: raise error def __call__( - self, task: str = None, img: str = None, *args, **kwargs - ): + self, + task: Optional[str] = None, + img: Optional[str] = None, + is_last: bool = False, + device: str = "cpu", # gpu + device_id: int = 0, + all_cores: bool = True, + *args, + **kwargs, + ) -> Any: """Call the agent Args: - task (str): _description_ - img (str, optional): _description_. Defaults to None. + task (Optional[str]): The task to be performed. Defaults to None. + img (Optional[str]): The image to be processed. Defaults to None. + is_last (bool): Indicates if this is the last task. Defaults to False. + device (str): The device to use for execution. Defaults to "cpu". + device_id (int): The ID of the GPU to use if device is set to "gpu". Defaults to 0. + all_cores (bool): If True, uses all available CPU cores. Defaults to True. """ try: - return self.run(task, img, *args, **kwargs) + if task is not None: + return self.run( + task=task, + is_last=is_last, + device=device, + device_id=device_id, + all_cores=all_cores, + *args, + **kwargs, + ) + elif img is not None: + return self.run( + img=img, + is_last=is_last, + device=device, + device_id=device_id, + all_cores=all_cores, + *args, + **kwargs, + ) + else: + raise ValueError( + "Either 'task' or 'img' must be provided." + ) except Exception as error: logger.error(f"Error calling agent: {error}") raise error + def dict_to_csv(self, data: dict) -> str: + """ + Convert a dictionary to a CSV string. + + Args: + data (dict): The dictionary to convert. + + Returns: + str: The CSV string representation of the dictionary. + """ + import csv + import io + + output = io.StringIO() + writer = csv.writer(output) + + # Write header + writer.writerow(data.keys()) + + # Write values + writer.writerow(data.values()) + + return output.getvalue() + def parse_and_execute_tools(self, response: str, *args, **kwargs): # Extract json from markdown # response = extract_code_from_markdown(response) @@ -1860,17 +1946,25 @@ class Agent: """Parse the output from the LLM""" try: if isinstance(response, dict): - if 'choices' in response: - return response['choices'][0]['message']['content'] + if "choices" in response: + return response["choices"][0]["message"][ + "content" + ] else: - return json.dumps(response) # Convert dict to string + return json.dumps( + response + ) # Convert dict to string elif isinstance(response, str): return response else: - return str(response) # Convert any other type to string + return str( + response + ) # Convert any other type to string except Exception as e: logger.error(f"Error parsing LLM output: {e}") - return str(response) # Return string representation as fallback + return str( + response + ) # Return string representation as fallback def log_step_metadata( self, loop: int, task: str, response: str diff --git a/swarms/structs/agent_rag.py b/swarms/structs/agent_rag.py index 1e6d71d9..e92926e3 100644 --- a/swarms/structs/agent_rag.py +++ b/swarms/structs/agent_rag.py @@ -1,4 +1,3 @@ -import os from typing import List, Optional import chromadb @@ -8,8 +7,17 @@ from typing import Union, Callable, Any from swarms import Agent -class AgentRAG: - """A vector database for storing and retrieving agents based on their characteristics.""" +class AgentRouter: + """ + Initialize the AgentRouter. + + Args: + collection_name (str): Name of the collection in the vector database. + persist_directory (str): Directory to persist the vector database. + n_agents (int): Number of agents to return in queries. + *args: Additional arguments to pass to the chromadb Client. + **kwargs: Additional keyword arguments to pass to the chromadb Client. + """ def __init__( self, @@ -19,12 +27,6 @@ class AgentRAG: *args, **kwargs, ): - """ - Initialize the AgentRAG. - - Args: - persist_directory (str): The directory to persist the ChromaDB data. - """ self.collection_name = collection_name self.n_agents = n_agents self.persist_directory = persist_directory @@ -44,6 +46,9 @@ class AgentRAG: Args: agent (Agent): The agent to add. + + Raises: + Exception: If there's an error adding the agent to the vector database. """ try: agent_text = f"{agent.name} {agent.description} {agent.system_prompt}" @@ -65,6 +70,12 @@ class AgentRAG: def add_agents( self, agents: List[Union[Agent, Callable, Any]] ) -> None: + """ + Add multiple agents to the vector database. + + Args: + agents (List[Union[Agent, Callable, Any]]): List of agents to add. + """ for agent in agents: self.add_agent(agent) @@ -108,9 +119,14 @@ class AgentRAG: Args: task (str): The task description. + *args: Additional arguments to pass to the collection.query method. + **kwargs: Additional keyword arguments to pass to the collection.query method. Returns: Optional[Agent]: The best matching agent, if found. + + Raises: + Exception: If there's an error finding the best agent. """ try: results = self.collection.query( @@ -150,171 +166,171 @@ class AgentRAG: raise -# Example usage -if __name__ == "__main__": - from dotenv import load_dotenv - from swarm_models import OpenAIChat - - load_dotenv() - - # Get the OpenAI API key from the environment variable - api_key = os.getenv("GROQ_API_KEY") - - # Model - model = OpenAIChat( - openai_api_base="https://api.groq.com/openai/v1", - openai_api_key=api_key, - model_name="llama-3.1-70b-versatile", - temperature=0.1, - ) - # Initialize the vector database - vector_db = AgentRAG() - - # Define specialized system prompts for each agent - DATA_EXTRACTOR_PROMPT = """You are a highly specialized private equity agent focused on data extraction from various documents. Your expertise includes: - 1. Extracting key financial metrics (revenue, EBITDA, growth rates, etc.) from financial statements and reports - 2. Identifying and extracting important contract terms from legal documents - 3. Pulling out relevant market data from industry reports and analyses - 4. Extracting operational KPIs from management presentations and internal reports - 5. Identifying and extracting key personnel information from organizational charts and bios - Provide accurate, structured data extracted from various document types to support investment analysis.""" - - SUMMARIZER_PROMPT = """You are an expert private equity agent specializing in summarizing complex documents. Your core competencies include: - 1. Distilling lengthy financial reports into concise executive summaries - 2. Summarizing legal documents, highlighting key terms and potential risks - 3. Condensing industry reports to capture essential market trends and competitive dynamics - 4. Summarizing management presentations to highlight key strategic initiatives and projections - 5. Creating brief overviews of technical documents, emphasizing critical points for non-technical stakeholders - Deliver clear, concise summaries that capture the essence of various documents while highlighting information crucial for investment decisions.""" - - FINANCIAL_ANALYST_PROMPT = """You are a specialized private equity agent focused on financial analysis. Your key responsibilities include: - 1. Analyzing historical financial statements to identify trends and potential issues - 2. Evaluating the quality of earnings and potential adjustments to EBITDA - 3. Assessing working capital requirements and cash flow dynamics - 4. Analyzing capital structure and debt capacity - 5. Evaluating financial projections and underlying assumptions - Provide thorough, insightful financial analysis to inform investment decisions and valuation.""" - - MARKET_ANALYST_PROMPT = """You are a highly skilled private equity agent specializing in market analysis. Your expertise covers: - 1. Analyzing industry trends, growth drivers, and potential disruptors - 2. Evaluating competitive landscape and market positioning - 3. Assessing market size, segmentation, and growth potential - 4. Analyzing customer dynamics, including concentration and loyalty - 5. Identifying potential regulatory or macroeconomic impacts on the market - Deliver comprehensive market analysis to assess the attractiveness and risks of potential investments.""" - - OPERATIONAL_ANALYST_PROMPT = """You are an expert private equity agent focused on operational analysis. Your core competencies include: - 1. Evaluating operational efficiency and identifying improvement opportunities - 2. Analyzing supply chain and procurement processes - 3. Assessing sales and marketing effectiveness - 4. Evaluating IT systems and digital capabilities - 5. Identifying potential synergies in merger or add-on acquisition scenarios - Provide detailed operational analysis to uncover value creation opportunities and potential risks.""" - - # Initialize specialized agents - data_extractor_agent = Agent( - agent_name="Data-Extractor", - system_prompt=DATA_EXTRACTOR_PROMPT, - llm=model, - max_loops=1, - autosave=True, - verbose=True, - dynamic_temperature_enabled=True, - saved_state_path="data_extractor_agent.json", - user_name="pe_firm", - retry_attempts=1, - context_length=200000, - output_type="string", - ) - - summarizer_agent = Agent( - agent_name="Document-Summarizer", - system_prompt=SUMMARIZER_PROMPT, - llm=model, - max_loops=1, - autosave=True, - verbose=True, - dynamic_temperature_enabled=True, - saved_state_path="summarizer_agent.json", - user_name="pe_firm", - retry_attempts=1, - context_length=200000, - output_type="string", - ) - - financial_analyst_agent = Agent( - agent_name="Financial-Analyst", - system_prompt=FINANCIAL_ANALYST_PROMPT, - llm=model, - max_loops=1, - autosave=True, - verbose=True, - dynamic_temperature_enabled=True, - saved_state_path="financial_analyst_agent.json", - user_name="pe_firm", - retry_attempts=1, - context_length=200000, - output_type="string", - ) - - market_analyst_agent = Agent( - agent_name="Market-Analyst", - system_prompt=MARKET_ANALYST_PROMPT, - llm=model, - max_loops=1, - autosave=True, - verbose=True, - dynamic_temperature_enabled=True, - saved_state_path="market_analyst_agent.json", - user_name="pe_firm", - retry_attempts=1, - context_length=200000, - output_type="string", - ) - - operational_analyst_agent = Agent( - agent_name="Operational-Analyst", - system_prompt=OPERATIONAL_ANALYST_PROMPT, - llm=model, - max_loops=1, - autosave=True, - verbose=True, - dynamic_temperature_enabled=True, - saved_state_path="operational_analyst_agent.json", - user_name="pe_firm", - retry_attempts=1, - context_length=200000, - output_type="string", - ) - - # Create agents (using the agents from the original code) - agents_to_add = [ - data_extractor_agent, - summarizer_agent, - financial_analyst_agent, - market_analyst_agent, - operational_analyst_agent, - ] - - # Add agents to the vector database - for agent in agents_to_add: - vector_db.add_agent(agent) - - # Example task - task = "Analyze the financial statements of a potential acquisition target and identify key growth drivers." - - # Find the best agent for the task - best_agent = vector_db.find_best_agent(task) - - if best_agent: - logger.info(f"Best agent for the task: {best_agent.name}") - # Use the best agent to perform the task - result = best_agent.run(task) - print(f"Task result: {result}") - - # Update the agent's history in the database - vector_db.update_agent_history(best_agent.name) - else: - print("No suitable agent found for the task.") - - # Save the vector database +# # Example usage +# if __name__ == "__main__": +# from dotenv import load_dotenv +# from swarm_models import OpenAIChat + +# load_dotenv() + +# # Get the OpenAI API key from the environment variable +# api_key = os.getenv("GROQ_API_KEY") + +# # Model +# model = OpenAIChat( +# openai_api_base="https://api.groq.com/openai/v1", +# openai_api_key=api_key, +# model_name="llama-3.1-70b-versatile", +# temperature=0.1, +# ) +# # Initialize the vector database +# vector_db = AgentRouter() + +# # Define specialized system prompts for each agent +# DATA_EXTRACTOR_PROMPT = """You are a highly specialized private equity agent focused on data extraction from various documents. Your expertise includes: +# 1. Extracting key financial metrics (revenue, EBITDA, growth rates, etc.) from financial statements and reports +# 2. Identifying and extracting important contract terms from legal documents +# 3. Pulling out relevant market data from industry reports and analyses +# 4. Extracting operational KPIs from management presentations and internal reports +# 5. Identifying and extracting key personnel information from organizational charts and bios +# Provide accurate, structured data extracted from various document types to support investment analysis.""" + +# SUMMARIZER_PROMPT = """You are an expert private equity agent specializing in summarizing complex documents. Your core competencies include: +# 1. Distilling lengthy financial reports into concise executive summaries +# 2. Summarizing legal documents, highlighting key terms and potential risks +# 3. Condensing industry reports to capture essential market trends and competitive dynamics +# 4. Summarizing management presentations to highlight key strategic initiatives and projections +# 5. Creating brief overviews of technical documents, emphasizing critical points for non-technical stakeholders +# Deliver clear, concise summaries that capture the essence of various documents while highlighting information crucial for investment decisions.""" + +# FINANCIAL_ANALYST_PROMPT = """You are a specialized private equity agent focused on financial analysis. Your key responsibilities include: +# 1. Analyzing historical financial statements to identify trends and potential issues +# 2. Evaluating the quality of earnings and potential adjustments to EBITDA +# 3. Assessing working capital requirements and cash flow dynamics +# 4. Analyzing capital structure and debt capacity +# 5. Evaluating financial projections and underlying assumptions +# Provide thorough, insightful financial analysis to inform investment decisions and valuation.""" + +# MARKET_ANALYST_PROMPT = """You are a highly skilled private equity agent specializing in market analysis. Your expertise covers: +# 1. Analyzing industry trends, growth drivers, and potential disruptors +# 2. Evaluating competitive landscape and market positioning +# 3. Assessing market size, segmentation, and growth potential +# 4. Analyzing customer dynamics, including concentration and loyalty +# 5. Identifying potential regulatory or macroeconomic impacts on the market +# Deliver comprehensive market analysis to assess the attractiveness and risks of potential investments.""" + +# OPERATIONAL_ANALYST_PROMPT = """You are an expert private equity agent focused on operational analysis. Your core competencies include: +# 1. Evaluating operational efficiency and identifying improvement opportunities +# 2. Analyzing supply chain and procurement processes +# 3. Assessing sales and marketing effectiveness +# 4. Evaluating IT systems and digital capabilities +# 5. Identifying potential synergies in merger or add-on acquisition scenarios +# Provide detailed operational analysis to uncover value creation opportunities and potential risks.""" + +# # Initialize specialized agents +# data_extractor_agent = Agent( +# agent_name="Data-Extractor", +# system_prompt=DATA_EXTRACTOR_PROMPT, +# llm=model, +# max_loops=1, +# autosave=True, +# verbose=True, +# dynamic_temperature_enabled=True, +# saved_state_path="data_extractor_agent.json", +# user_name="pe_firm", +# retry_attempts=1, +# context_length=200000, +# output_type="string", +# ) + +# summarizer_agent = Agent( +# agent_name="Document-Summarizer", +# system_prompt=SUMMARIZER_PROMPT, +# llm=model, +# max_loops=1, +# autosave=True, +# verbose=True, +# dynamic_temperature_enabled=True, +# saved_state_path="summarizer_agent.json", +# user_name="pe_firm", +# retry_attempts=1, +# context_length=200000, +# output_type="string", +# ) + +# financial_analyst_agent = Agent( +# agent_name="Financial-Analyst", +# system_prompt=FINANCIAL_ANALYST_PROMPT, +# llm=model, +# max_loops=1, +# autosave=True, +# verbose=True, +# dynamic_temperature_enabled=True, +# saved_state_path="financial_analyst_agent.json", +# user_name="pe_firm", +# retry_attempts=1, +# context_length=200000, +# output_type="string", +# ) + +# market_analyst_agent = Agent( +# agent_name="Market-Analyst", +# system_prompt=MARKET_ANALYST_PROMPT, +# llm=model, +# max_loops=1, +# autosave=True, +# verbose=True, +# dynamic_temperature_enabled=True, +# saved_state_path="market_analyst_agent.json", +# user_name="pe_firm", +# retry_attempts=1, +# context_length=200000, +# output_type="string", +# ) + +# operational_analyst_agent = Agent( +# agent_name="Operational-Analyst", +# system_prompt=OPERATIONAL_ANALYST_PROMPT, +# llm=model, +# max_loops=1, +# autosave=True, +# verbose=True, +# dynamic_temperature_enabled=True, +# saved_state_path="operational_analyst_agent.json", +# user_name="pe_firm", +# retry_attempts=1, +# context_length=200000, +# output_type="string", +# ) + +# # Create agents (using the agents from the original code) +# agents_to_add = [ +# data_extractor_agent, +# summarizer_agent, +# financial_analyst_agent, +# market_analyst_agent, +# operational_analyst_agent, +# ] + +# # Add agents to the vector database +# for agent in agents_to_add: +# vector_db.add_agent(agent) + +# # Example task +# task = "Analyze the financial statements of a potential acquisition target and identify key growth drivers." + +# # Find the best agent for the task +# best_agent = vector_db.find_best_agent(task) + +# if best_agent: +# logger.info(f"Best agent for the task: {best_agent.name}") +# # Use the best agent to perform the task +# result = best_agent.run(task) +# print(f"Task result: {result}") + +# # Update the agent's history in the database +# vector_db.update_agent_history(best_agent.name) +# else: +# print("No suitable agent found for the task.") + +# # Save the vector database diff --git a/swarms/structs/auto_swarm_builder.py b/swarms/structs/auto_swarm_builder.py new file mode 100644 index 00000000..37a93e2a --- /dev/null +++ b/swarms/structs/auto_swarm_builder.py @@ -0,0 +1,20 @@ +from typing import List, Any, Dict, Optional + + +class AutoSwarmBuilder: + def __init__(self, task: str, num_agents: int = 10, batch_size: int = 10): + self.task = task + self.num_agents = num_agents + self.batch_size = batch_size + + def run(self, task: str, image_url: str = None, *args, **kwargs): + pass + + def _create_swarm(self): + pass + + def _create_agents(self): + pass + + def _run_agents(self): + pass diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index 1c6ea472..02102188 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -138,7 +138,6 @@ class ConcurrentWorkflow(BaseSwarm): self.reliability_check() - def reliability_check(self): try: logger.info("Starting reliability checks") @@ -395,7 +394,7 @@ class ConcurrentWorkflow(BaseSwarm): """ if task is not None: self.tasks.append(task) - + try: logger.info(f"Attempting to run on device: {device}") if device == "cpu": diff --git a/swarms/structs/run_agents_in_parallel_async_multiprocess.py b/swarms/structs/run_agents_in_parallel_async_multiprocess.py index c3a4d87f..4a1c7c63 100644 --- a/swarms/structs/run_agents_in_parallel_async_multiprocess.py +++ b/swarms/structs/run_agents_in_parallel_async_multiprocess.py @@ -1,74 +1,81 @@ -import os import asyncio +from typing import List, Any from swarms import Agent -from swarm_models import OpenAIChat -import uvloop from multiprocessing import cpu_count from swarms.utils.calculate_func_metrics import profile_func -from typing import List # Use uvloop for faster asyncio event loop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) -# Get the OpenAI API key from the environment variable -api_key = os.getenv("OPENAI_API_KEY") -# Get the OpenAI API key from the environment variable -api_key = os.getenv("OPENAI_API_KEY") - -# Create an instance of the OpenAIChat class (can be reused) -model = OpenAIChat( - api_key=api_key, model_name="gpt-4o-mini", temperature=0.1 -) +def run_single_agent(agent: Agent, task: str) -> Any: + """ + Run a single agent on the given task. + Args: + agent (Agent): The agent to run. + task (str): The task for the agent to perform. -# Function to run a single agent on the task (synchronous) -def run_single_agent(agent, task): + Returns: + Any: The result of the agent's execution. + """ return agent.run(task) -# Asynchronous wrapper for agent tasks -async def run_agent_async(agent, task): +async def run_agent_async(agent: Agent, task: str) -> Any: + """ + Asynchronous wrapper for agent tasks. + + Args: + agent (Agent): The agent to run asynchronously. + task (str): The task for the agent to perform. + + Returns: + Any: The result of the agent's execution. + """ loop = asyncio.get_event_loop() return await loop.run_in_executor( None, run_single_agent, agent, task ) -# Asynchronous function to run agents concurrently -async def run_agents_concurrently_async(agents, task: str): +async def run_agents_concurrently_async( + agents: List[Agent], task: str +) -> List[Any]: """ Run multiple agents concurrently on the same task with optimized performance. - :param agents: List of Agent instances to run concurrently. - :param task: The task string to execute by all agents. - :return: A list of outputs from each agent. - """ + Args: + agents (List[Agent]): List of Agent instances to run concurrently. + task (str): The task string to execute by all agents. - # Run all agents asynchronously using asyncio.gather + Returns: + List[Any]: A list of outputs from each agent. + """ results = await asyncio.gather( *(run_agent_async(agent, task) for agent in agents) ) return results -# Function to manage the overall process and batching @profile_func def run_agents_concurrently_multiprocess( agents: List[Agent], task: str, batch_size: int = cpu_count() -): +) -> List[Any]: """ Manage and run multiple agents concurrently in batches, with optimized performance. - :param agents: List of Agent instances to run concurrently. - :param task: The task string to execute by all agents. - :param batch_size: Number of agents to run in parallel in each batch. - :return: A list of outputs from each agent. - """ + Args: + agents (List[Agent]): List of Agent instances to run concurrently. + task (str): The task string to execute by all agents. + batch_size (int, optional): Number of agents to run in parallel in each batch. + Defaults to the number of CPU cores. + Returns: + List[Any]: A list of outputs from each agent. + """ results = [] loop = asyncio.get_event_loop() - # batch_size = cpu_count() # Process agents in batches to avoid overwhelming system resources for i in range(0, len(agents), batch_size):