From 93b6ee71f496f315ce5c5de26b5e969d497a42fc Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 16 Sep 2024 22:56:04 -0400 Subject: [PATCH] [EXPERIMENTAL] --- .../a_star_swarm_example.py | 2 +- .../swarms/experimental}/a_star_swarm.py | 0 .../swarms/experimental}/agent_matrix.py | 0 .../experimental/company_swarm_example.py | 0 .../swarms/experimental}/dfs_search_swarm.py | 0 .../swarms/experimental}/federated_swarm.py | 0 .../swarms/experimental}/pulsar_swarm.py | 0 .../swarms/rearrange}/rearrange/example.py | 0 .../swarms/rearrange/sample_rearrange.py | 0 .../swarms/spreadsheet_swarm/dfs_example.py | 2 +- swarms/structs/agent_matrix 2.py | 214 ------ swarms/structs/multi_agent_collab_new 2.py | 629 ------------------ swarms/structs/pulsar_swarm 2.py | 242 ------- 13 files changed, 2 insertions(+), 1087 deletions(-) rename {swarms/structs => examples/structs/swarms/experimental}/a_star_swarm.py (100%) rename {swarms/structs => examples/structs/swarms/experimental}/agent_matrix.py (100%) rename company_swarm_example.py => examples/structs/swarms/experimental/company_swarm_example.py (100%) rename {swarms/structs => examples/structs/swarms/experimental}/dfs_search_swarm.py (100%) rename {swarms/structs => examples/structs/swarms/experimental}/federated_swarm.py (100%) rename {swarms/structs => examples/structs/swarms/experimental}/pulsar_swarm.py (100%) rename examples/{swarms => structs/swarms/rearrange}/rearrange/example.py (100%) rename sample_rearrange.py => examples/structs/swarms/rearrange/sample_rearrange.py (100%) delete mode 100644 swarms/structs/agent_matrix 2.py delete mode 100644 swarms/structs/multi_agent_collab_new 2.py delete mode 100644 swarms/structs/pulsar_swarm 2.py diff --git a/examples/structs/swarms/different_architectures/a_star_swarm_example.py b/examples/structs/swarms/different_architectures/a_star_swarm_example.py index 01fa59a8..4fbb1e69 100644 --- a/examples/structs/swarms/different_architectures/a_star_swarm_example.py +++ b/examples/structs/swarms/different_architectures/a_star_swarm_example.py @@ -5,7 +5,7 @@ from swarm_models import OpenAIChat from swarms.prompts.finance_agent_sys_prompt import ( FINANCIAL_AGENT_SYS_PROMPT, ) -from swarms.structs.a_star_swarm import AStarSwarm +from examples.structs.swarms.experimental.a_star_swarm import AStarSwarm # Set up the model as provided api_key = os.getenv("OPENAI_API_KEY") diff --git a/swarms/structs/a_star_swarm.py b/examples/structs/swarms/experimental/a_star_swarm.py similarity index 100% rename from swarms/structs/a_star_swarm.py rename to examples/structs/swarms/experimental/a_star_swarm.py diff --git a/swarms/structs/agent_matrix.py b/examples/structs/swarms/experimental/agent_matrix.py similarity index 100% rename from swarms/structs/agent_matrix.py rename to examples/structs/swarms/experimental/agent_matrix.py diff --git a/company_swarm_example.py b/examples/structs/swarms/experimental/company_swarm_example.py similarity index 100% rename from company_swarm_example.py rename to examples/structs/swarms/experimental/company_swarm_example.py diff --git a/swarms/structs/dfs_search_swarm.py b/examples/structs/swarms/experimental/dfs_search_swarm.py similarity index 100% rename from swarms/structs/dfs_search_swarm.py rename to examples/structs/swarms/experimental/dfs_search_swarm.py diff --git a/swarms/structs/federated_swarm.py b/examples/structs/swarms/experimental/federated_swarm.py similarity index 100% rename from swarms/structs/federated_swarm.py rename to examples/structs/swarms/experimental/federated_swarm.py diff --git a/swarms/structs/pulsar_swarm.py b/examples/structs/swarms/experimental/pulsar_swarm.py similarity index 100% rename from swarms/structs/pulsar_swarm.py rename to examples/structs/swarms/experimental/pulsar_swarm.py diff --git a/examples/swarms/rearrange/example.py b/examples/structs/swarms/rearrange/rearrange/example.py similarity index 100% rename from examples/swarms/rearrange/example.py rename to examples/structs/swarms/rearrange/rearrange/example.py diff --git a/sample_rearrange.py b/examples/structs/swarms/rearrange/sample_rearrange.py similarity index 100% rename from sample_rearrange.py rename to examples/structs/swarms/rearrange/sample_rearrange.py diff --git a/examples/structs/swarms/spreadsheet_swarm/dfs_example.py b/examples/structs/swarms/spreadsheet_swarm/dfs_example.py index 049bbbb2..9f020ab2 100644 --- a/examples/structs/swarms/spreadsheet_swarm/dfs_example.py +++ b/examples/structs/swarms/spreadsheet_swarm/dfs_example.py @@ -2,7 +2,7 @@ import os from swarms import Agent from swarm_models import OpenAIChat -from swarms.structs.dfs_search_swarm import DFSSwarm +from examples.structs.swarms.experimental.dfs_search_swarm import DFSSwarm # Get the OpenAI API key from the environment variable api_key = os.getenv("OPENAI_API_KEY") diff --git a/swarms/structs/agent_matrix 2.py b/swarms/structs/agent_matrix 2.py deleted file mode 100644 index 35e0bfcb..00000000 --- a/swarms/structs/agent_matrix 2.py +++ /dev/null @@ -1,214 +0,0 @@ -import concurrent.futures -from typing import List, Union -from loguru import logger -from pydantic import BaseModel -from swarms.structs.agent import Agent -from swarms.schemas.agent_step_schemas import ManySteps - - -class AgentRowMetadata(BaseModel): - row_index: int - agent_runs: List[ManySteps] - - -class AgentMatrixMetadata(BaseModel): - matrix_runs: List[AgentRowMetadata] - - -class AgentMatrix: - def __init__( - self, agents: Union[List["Agent"], List[List["Agent"]]] - ): - """ - Initializes the matrix with the provided list of agents or list of lists of agents. - - Args: - agents (List[Agent] or List[List[Agent]]): A list of agents or a list of lists of agents (matrix). - """ - if isinstance(agents[0], list): - self.agents_matrix: List[List["Agent"]] = ( - agents # List of lists (matrix) - ) - self.rows: int = len(agents) - self.cols: int = len(agents[0]) if agents else 0 - else: - self.agents_matrix: List[List["Agent"]] = [ - agents - ] # Single row of agents (1D list) - self.rows: int = 1 - self.cols: int = len(agents) - - # Store metadata for all runs - self.matrix_metadata = AgentMatrixMetadata(matrix_runs=[]) - logger.info( - f"AgentMatrix initialized with {self.rows} rows and {self.cols} columns of agents." - ) - - def execute_in_order(self, query: str) -> None: - """Executes the agents in row-major order.""" - logger.info( - f"Executing all agents in row-major order with query: {query}" - ) - for i, row in enumerate(self.agents_matrix): - row_metadata = AgentRowMetadata( - row_index=i, agent_runs=[] - ) - for j, agent in enumerate(row): - logger.info(f"Executing Agent [{i}][{j}]") - out = agent.run(query) - logger.info(f"Output from Agent [{i}][{j}]: {out}") - - agent_metadata = agent.agent_output - row_metadata.agent_runs.append(agent_metadata) - self.matrix_metadata.matrix_runs.append(row_metadata) - - def execute_by_row( - self, row_index: int, query: str, sequential: bool = True - ) -> None: - """ - Executes all agents in a specific row, either sequentially or concurrently. - - Args: - row_index (int): The index of the row to execute. - query (str): The query to run. - sequential (bool): Whether to execute agents sequentially (True) or concurrently (False). - """ - if not (0 <= row_index < self.rows): - logger.error(f"Invalid row index: {row_index}") - return - - logger.info( - f"Executing row {row_index} with query: {query}. Sequential: {sequential}" - ) - row_metadata = AgentRowMetadata( - row_index=row_index, agent_runs=[] - ) - - if sequential: - self._execute_row_sequentially( - row_index, query, row_metadata - ) - else: - self._execute_row_concurrently( - row_index, query, row_metadata - ) - - self.matrix_metadata.matrix_runs.append(row_metadata) - - def _execute_row_sequentially( - self, - row_index: int, - query: str, - row_metadata: AgentRowMetadata, - ) -> None: - """Executes agents in a row sequentially, passing output from one agent to the next.""" - logger.info( - f"Executing agents in row {row_index} sequentially." - ) - current_input = query - for j, agent in enumerate(self.agents_matrix[row_index]): - logger.info( - f"Executing Agent [{row_index}][{j}] sequentially with input: {current_input}" - ) - current_output = agent.run(current_input) - agent_metadata = agent.agent_output - logger.info( - f"Output from Agent [{row_index}][{j}]: {current_output}" - ) - row_metadata.agent_runs.append(agent_metadata) - current_input = current_output - - def _execute_row_concurrently( - self, - row_index: int, - query: str, - row_metadata: AgentRowMetadata, - ) -> None: - """Executes agents in a row concurrently.""" - logger.info( - f"Executing agents in row {row_index} concurrently." - ) - - def agent_task(agent, query): - return agent.run(query) - - with concurrent.futures.ThreadPoolExecutor() as executor: - future_to_agent = { - executor.submit(agent_task, agent, query): agent - for agent in self.agents_matrix[row_index] - } - for future in concurrent.futures.as_completed( - future_to_agent - ): - agent = future_to_agent[future] - try: - output = future.result() - logger.info( - f"Output from concurrent agent: {output}" - ) - - # Capture metadata - agent_metadata = agent.agent_output - row_metadata.agent_runs.append(agent_metadata) - - except Exception as exc: - logger.error( - f"Agent generated an exception: {exc}" - ) - - def execute_by_column(self, col_index: int, query: str) -> None: - """Executes all agents in a specific column.""" - if not (0 <= col_index < self.cols): - logger.error(f"Invalid column index: {col_index}") - return - - logger.info( - f"Executing column {col_index} with query: {query}" - ) - for i in range(self.rows): - logger.info(f"Executing Agent [{i}][{col_index}]") - out = self.agents_matrix[i][col_index].run(query) - logger.info( - f"Output from Agent [{i}][{col_index}]: {out}" - ) - - # Capture metadata for the column run - row_metadata = AgentRowMetadata( - row_index=i, agent_runs=[] - ) - agent_metadata = self.agents_matrix[i][ - col_index - ].agent_output - row_metadata.agent_runs.append(agent_metadata) - self.matrix_metadata.matrix_runs.append(row_metadata) - - def export_metadata(self) -> str: - """Exports the metadata to a JSON format.""" - logger.info("Exporting metadata to JSON.") - return self.matrix_metadata.json(indent=4) - - -# Example usage with pre-created agents - -# # Assuming you have pre-created agents, here's an example: -# # agent_1, agent_2, ..., agent_n are instances of the `Agent` class - -# agents_row_1 = [agent_1, agent_2, agent_3] -# agents_row_2 = [agent_4, agent_5, agent_6] -# agents_row_3 = [agent_7, agent_8, agent_9] - -# # Matrix of agents (list of lists) -# agents_matrix = [agents_row_1, agents_row_2, agents_row_3] - -# # Initialize the AgentMatrix with the list of lists -# agent_matrix = AgentMatrix(agents_matrix) - -# # Execute all agents in row 1 sequentially (output of one agent passed to the next) -# agent_matrix.execute_by_row(1, "What is the process for getting a ROTH IRA started?", sequential=True) - -# # Execute all agents in row 1 concurrently (all agents run independently) -# agent_matrix.execute_by_row(1, "What is the process for getting a ROTH IRA started?", sequential=False) - -# # Export and print the run metadata in JSON format -# metadata_json = agent_matrix.export_metadata() -# print(metadata_json) diff --git a/swarms/structs/multi_agent_collab_new 2.py b/swarms/structs/multi_agent_collab_new 2.py deleted file mode 100644 index 69c3e79a..00000000 --- a/swarms/structs/multi_agent_collab_new 2.py +++ /dev/null @@ -1,629 +0,0 @@ -import os -import random -import time -from concurrent.futures import ThreadPoolExecutor -from datetime import datetime, timedelta -from threading import Lock -from typing import Any, Callable, Dict, List, Optional - -from loguru import logger -from pydantic import BaseModel, Field - -from swarms import Agent, create_file_in_folder -from swarms.schemas.agent_step_schemas import ManySteps - - -class MultiAgentCollaborationSchema(BaseModel): - name: str = Field(..., title="Name of the collaboration") - description: str = Field( - ..., title="Description of the collaboration" - ) - agent_outputs: List[ManySteps] = Field( - ..., title="List of agent outputs" - ) - timestamp: str = Field( - default_factory=lambda: time.strftime("%Y-%m-%d %H:%M:%S"), - title="Timestamp of the collaboration", - ) - number_of_agents: int = Field( - ..., title="Number of agents in the collaboration" - ) - - -class Cache: - def __init__(self, expiration_time: Optional[timedelta] = None): - """ - Initializes the cache. - - :param expiration_time: Time after which a cache entry should expire. - """ - self.cache: Dict[str, Dict[str, Any]] = {} - self.expiration_time = expiration_time - self.lock = Lock() - - def set(self, key: str, value: Any): - """ - Stores a value in the cache with an optional expiration time. - - :param key: Cache key. - :param value: Value to store in the cache. - """ - with self.lock: - expiry = ( - datetime.utcnow() + self.expiration_time - if self.expiration_time - else None - ) - self.cache[key] = {"value": value, "expiry": expiry} - logger.debug( - f"Cache set for key '{key}' with expiry {expiry}" - ) - - def get(self, key: str) -> Optional[Any]: - """ - Retrieves a value from the cache. - - :param key: Cache key. - :return: Cached value if available and not expired, else None. - """ - with self.lock: - if key in self.cache: - entry = self.cache[key] - if ( - entry["expiry"] - and entry["expiry"] < datetime.utcnow() - ): - logger.debug(f"Cache expired for key '{key}'") - del self.cache[key] - return None - logger.debug(f"Cache hit for key '{key}'") - return entry["value"] - logger.debug(f"Cache miss for key '{key}'") - return None - - -def random_selector(agents: List[Agent], iteration: int) -> Agent: - """ - Selects a random agent. - - :param agents: List of agents to select from. - :param iteration: The current iteration number (unused). - :return: A randomly selected agent. - """ - return random.choice(agents) - - -def first_agent_selector( - agents: List[Agent], iteration: int -) -> Agent: - """ - Always selects the first agent in the list. - - :param agents: List of agents to select from. - :param iteration: The current iteration number (unused). - :return: The first agent in the list. - """ - return agents[0] - - -def last_agent_selector(agents: List[Agent], iteration: int) -> Agent: - """ - Always selects the last agent in the list. - - :param agents: List of agents to select from. - :param iteration: The current iteration number (unused). - :return: The last agent in the list. - """ - return agents[-1] - - -def reverse_round_robin_selector( - agents: List[Agent], iteration: int -) -> Agent: - """ - Selects agents in reverse round-robin order. - - :param agents: List of agents to select from. - :param iteration: The current iteration number. - :return: The agent selected in reverse round-robin order. - """ - index = -((iteration % len(agents)) + 1) - return agents[index] - - -def even_iteration_selector( - agents: List[Agent], iteration: int -) -> Agent: - """ - Selects the agent based on even iteration; defaults to the first agent if odd. - - :param agents: List of agents to select from. - :param iteration: The current iteration number. - :return: The selected agent based on even iteration. - """ - return ( - agents[iteration % len(agents)] - if iteration % 2 == 0 - else agents[0] - ) - - -def odd_iteration_selector( - agents: List[Agent], iteration: int -) -> Agent: - """ - Selects the agent based on odd iteration; defaults to the last agent if even. - - :param agents: List of agents to select from. - :param iteration: The current iteration number. - :return: The selected agent based on odd iteration. - """ - return ( - agents[iteration % len(agents)] - if iteration % 2 != 0 - else agents[-1] - ) - - -def weighted_random_selector( - agents: List[Agent], iteration: int -) -> Agent: - """ - Selects an agent based on weighted random choice, with the first agent having a higher weight. - - :param agents: List of agents to select from. - :param iteration: The current iteration number (unused). - :return: A randomly selected agent with weighted preference. - """ - weights = [1] * len(agents) - weights[0] = 2 # Give the first agent higher weight - return random.choices(agents, weights=weights, k=1)[0] - - -def increasing_weight_selector( - agents: List[Agent], iteration: int -) -> Agent: - """ - Selects an agent based on increasing weight with iteration (favoring later agents). - - :param agents: List of agents to select from. - :param iteration: The current iteration number (unused). - :return: A randomly selected agent with increasing weight. - """ - weights = [i + 1 for i in range(len(agents))] - return random.choices(agents, weights=weights, k=1)[0] - - -def decreasing_weight_selector( - agents: List[Agent], iteration: int -) -> Agent: - """ - Selects an agent based on decreasing weight with iteration (favoring earlier agents). - - :param agents: List of agents to select from. - :param iteration: The current iteration number (unused). - :return: A randomly selected agent with decreasing weight. - """ - weights = [len(agents) - i for i in range(len(agents))] - return random.choices(agents, weights=weights, k=1)[0] - - -def round_robin_with_skip_selector( - agents: List[Agent], iteration: int -) -> Agent: - """ - Selects agents in a round-robin fashion but skips every third agent. - - :param agents: List of agents to select from. - :param iteration: The current iteration number. - :return: The selected agent with a skipping pattern. - """ - index = (iteration * 2) % len(agents) - return agents[index] - - -def priority_selector( - agents: List[Agent], iteration: int, priority_index: int = 0 -) -> Agent: - """ - Selects an agent based on a priority index, always selecting the agent at the given index. - - :param agents: List of agents to select from. - :param iteration: The current iteration number (unused). - :param priority_index: The index of the agent with priority. - :return: The agent at the priority index. - """ - return agents[priority_index] - - -def dynamic_priority_selector( - agents: List[Agent], iteration: int, priorities: List[int] = None -) -> Agent: - """ - Selects an agent based on dynamic priorities, which can change over iterations. - - :param agents: List of agents to select from. - :param iteration: The current iteration number. - :param priorities: A list of priorities for each agent, determining their selection likelihood. - :return: The selected agent based on dynamic priorities. - """ - if priorities is None: - priorities = [1] * len(agents) - index = random.choices( - range(len(agents)), weights=priorities, k=1 - )[0] - return agents[index] - - -def alternating_selector( - agents: List[Agent], iteration: int -) -> Agent: - """ - Alternates between the first and last agent. - - :param agents: List of agents to select from. - :param iteration: The current iteration number. - :return: The first agent if the iteration is even, the last if odd. - """ - return agents[0] if iteration % 2 == 0 else agents[-1] - - -def middle_agent_selector( - agents: List[Agent], iteration: int -) -> Agent: - """ - Always selects the middle agent. - - :param agents: List of agents to select from. - :param iteration: The current iteration number (unused). - :return: The middle agent in the list. - """ - index = len(agents) // 2 - return agents[index] - - -def weighted_round_robin_selector( - agents: List[Agent], iteration: int, weights: List[int] = None -) -> Agent: - """ - Selects agents in a weighted round-robin fashion. - - :param agents: List of agents to select from. - :param iteration: The current iteration number. - :param weights: A list of weights to determine the likelihood of selection. - :return: The selected agent based on weighted round-robin. - """ - if weights is None: - weights = [1] * len(agents) - index = random.choices(range(len(agents)), weights=weights, k=1)[ - 0 - ] - return agents[index] - - -def even_odd_priority_selector( - agents: List[Agent], iteration: int -) -> Agent: - """ - Gives priority to even-indexed agents on even iterations and odd-indexed agents on odd iterations. - - :param agents: List of agents to select from. - :param iteration: The current iteration number. - :return: The selected agent based on even/odd priority. - """ - even_agents = agents[::2] - odd_agents = agents[1::2] - return ( - random.choice(even_agents) - if iteration % 2 == 0 - else random.choice(odd_agents) - ) - - -def reverse_selector(agents: List[Agent], iteration: int) -> Agent: - """ - Selects agents in reverse order starting from the last agent. - - :param agents: List of agents to select from. - :param iteration: The current iteration number. - :return: The agent selected in reverse order. - """ - return agents[-(iteration % len(agents)) - 1] - - -def frequent_first_selector( - agents: List[Agent], iteration: int, frequency: int = 3 -) -> Agent: - """ - Frequently selects the first agent every 'n' iterations, otherwise selects in round-robin. - - :param agents: List of agents to select from. - :param iteration: The current iteration number. - :param frequency: The frequency of selecting the first agent. - :return: The selected agent with frequent first preference. - """ - if iteration % frequency == 0: - return agents[0] - return agents[iteration % len(agents)] - - -def frequent_last_selector( - agents: List[Agent], iteration: int, frequency: int = 3 -) -> Agent: - """ - Frequently selects the last agent every 'n' iterations, otherwise selects in round-robin. - - :param agents: List of agents to select from. - :param iteration: The current iteration number. - :param frequency: The frequency of selecting the last agent. - :return: The selected agent with frequent last preference. - """ - if iteration % frequency == 0: - return agents[-1] - return agents[iteration % len(agents)] - - -def random_skip_selector( - agents: List[Agent], iteration: int, skip_probability: float = 0.5 -) -> Agent: - """ - Randomly skips agents with a given probability, selecting the next in line. - - :param agents: List of agents to select from. - :param iteration: The current iteration number. - :param skip_probability: The probability of skipping an agent. - :return: The selected agent with random skips. - """ - while random.random() < skip_probability: - iteration += 1 - return agents[iteration % len(agents)] - - -def adaptive_selector( - agents: List[Agent], - iteration: int, - performance_metric: Callable[[Agent], float] = None, -) -> Agent: - """ - Selects the agent based on a performance metric, favoring better-performing agents. - - :param agents: List of agents to select from. - :param iteration: The current iteration number. - :param performance_metric: A function to determine the performance of each agent. - :return: The selected agent based on adaptive performance. - """ - if performance_metric is None: - - def performance_metric(agent): - return ( - random.random() - ) # Default random performance metric - - performance_scores = [ - performance_metric(agent) for agent in agents - ] - best_agent_index = performance_scores.index( - max(performance_scores) - ) - return agents[best_agent_index] - - -class MultiAgentCollaboration: - """ - Initializes the MultiAgentCollaboration. - - :param agents: List of Agent instances. - :param speaker_fn: Function to select the agent for each loop. - :param max_loops: Maximum number of iterations. - :param use_cache: Boolean to enable or disable caching. - :param autosave_on: Boolean to enable or disable autosaving the output. - """ - - def __init__( - self, - name: str = "MultiAgentCollaboration", - description: str = "A collaboration of multiple agents", - agents: List[Agent] = [], - speaker_fn: Callable[[List[Agent], int], Agent] = [], - max_loops: int = 1, - use_cache: bool = True, - autosave_on: bool = True, - ): - self.name = name - self.description = description - self.agents = agents - self.speaker_fn = speaker_fn - self.max_loops = max_loops - self.autosave_on = autosave_on - self.lock = Lock() - self.max_workers = os.cpu_count() - self.use_cache = use_cache - logger.info( - f"Initialized MultiAgentCollaboration with {len(agents)} agents and max_loops={max_loops}" - ) - - # Cache - self.cache = Cache(expiration_time=timedelta(minutes=5)) - - # Output schema - self.output_schema = MultiAgentCollaborationSchema( - name=name, - description=description, - agent_outputs=[], - number_of_agents=len(agents), - ) - - def _execute_agent(self, agent: Agent, task: str, loop: int): - """ - Executes an agent's run method and records the output. - - :param agent: The Agent instance to execute. - :param task: The input prompt for the agent. - :param loop: Current loop iteration. - """ - logger.debug( - f"Executing agent '{agent.agent_name}' on loop {loop}" - ) - - output = agent.run(task) - - agent_output = agent.agent_output - - self.output_schema.agent_outputs.append(agent_output) - - return output - - def run(self, task: str, *args, **kwargs): - """ - Runs the agents in sequence, passing the output of one as the input to the next. - - :param task: The input prompt to pass to each agent. - :return: The final output of the last agent. - """ - logger.info("Starting MultiAgentCollaboration run.") - current_task = task - with ThreadPoolExecutor( - max_workers=self.max_workers - ) as executor: - for i in range(self.max_loops): - selected_agent = self.speaker_fn(self.agents, i) - logger.debug( - f"Loop {i}: Selected agent '{selected_agent.agent_name}'" - ) - future = executor.submit( - self._execute_agent, - selected_agent, - current_task, - i, - *args, - **kwargs, - ) - try: - current_task = ( - future.result() - ) # The output of this agent becomes the input for the next - except Exception as exc: - logger.error( - f"Loop {i} generated an exception: {exc}" - ) - break - - logger.info("Completed MultiAgentCollaboration run.") - - if self.autosave_on is True: - self.save_file() - - return self.return_output_schema_json() - - def save_file(self): - time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - create_file_in_folder( - "multi_agent_collab_folder", - f"{self.name}_time_{time}_output.json", - self.return_output_schema_json(), - ) - - def get_outputs_dict(self): - """ - Retrieves all recorded agent outputs as a list of dictionaries. - - :return: List of dictionaries representing AgentOutput instances. - """ - return self.output_schema.model_dump() - - def return_output_schema_json(self): - return self.output_schema.model_dump_json(indent=4) - - -def round_robin_speaker(agents: List[Agent], iteration: int) -> Agent: - """ - Selects an agent from the given list of agents using round-robin scheduling. - - Args: - agents (List[Agent]): The list of agents to select from. - iteration (int): The current iteration number. - - Returns: - Agent: The selected agent. - - """ - selected = agents[iteration % len(agents)] - logger.debug( - f"Round-robin selected agent '{selected.agent_name}' for iteration {iteration}" - ) - return selected - - -# # Example usage -# if __name__ == "__main__": -# from swarms import OpenAIChat -# from swarms.prompts.finance_agent_sys_prompt import ( -# FINANCIAL_AGENT_SYS_PROMPT, -# ) - -# # Get the OpenAI API key from the environment variable -# api_key = os.getenv("OPENAI_API_KEY") -# if not api_key: -# logger.error("OpenAI API key not found in environment variables.") -# exit(1) - -# # Create instances of the OpenAIChat class -# model = OpenAIChat( -# api_key=api_key, model_name="gpt-4o-mini", temperature=0.1 -# ) - -# # Initialize agents -# agent1 = Agent( -# agent_name="Financial-Analysis-Agent_1", -# system_prompt=FINANCIAL_AGENT_SYS_PROMPT, -# llm=model, -# max_loops=1, -# dynamic_temperature_enabled=True, -# saved_state_path="finance_agent_1.json", -# user_name="swarms_corp", -# retry_attempts=1, -# context_length=200000, -# return_step_meta=False, -# ) - -# agent2 = Agent( -# agent_name="Financial-Analysis-Agent_2", -# system_prompt=FINANCIAL_AGENT_SYS_PROMPT, -# llm=model, -# max_loops=1, -# dynamic_temperature_enabled=True, -# saved_state_path="finance_agent_2.json", -# user_name="swarms_corp", -# retry_attempts=1, -# context_length=200000, -# return_step_meta=False, -# ) - -# agent2 = Agent( -# agent_name="Financial-Analysis-Agent_3", -# system_prompt=FINANCIAL_AGENT_SYS_PROMPT, -# llm=model, -# max_loops=1, -# dynamic_temperature_enabled=True, -# saved_state_path="finance_agent_2.json", -# user_name="swarms_corp", -# retry_attempts=1, -# context_length=200000, -# return_step_meta=False, -# ) - -# # Initialize the MultiAgentCollaboration with the round-robin speaker function -# multi_agent_framework = MultiAgentCollaboration( -# agents=[agent1, agent2], -# speaker_fn=round_robin_speaker, -# max_loops=3, -# use_cache=True, # Enable caching -# autosave_on=True, -# ) - -# # Run the framework with an input prompt -# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria" -# out = multi_agent_framework.run(task) -# print(out) - -# print(multi_agent_framework.return_output_schema_json()) diff --git a/swarms/structs/pulsar_swarm 2.py b/swarms/structs/pulsar_swarm 2.py deleted file mode 100644 index 199bdefe..00000000 --- a/swarms/structs/pulsar_swarm 2.py +++ /dev/null @@ -1,242 +0,0 @@ -import os -import sys -import datetime -from typing import List, Dict, Any, Optional - -from swarms import Agent -from swarm_models import OpenAIChat -from swarms.prompts.finance_agent_sys_prompt import ( - FINANCIAL_AGENT_SYS_PROMPT, -) - -from pulsar import Client, Producer -from pydantic import BaseModel, Field -from loguru import logger - -# Configure Loguru logger -logger.remove() -logger.add(sys.stderr, level="INFO") -logger.add("swarm_logs.log", rotation="10 MB", level="DEBUG") - -# Apache Pulsar configuration -PULSAR_SERVICE_URL = os.getenv( - "PULSAR_SERVICE_URL", "pulsar://localhost:6650" -) - - -# Define Pydantic schemas for structured output -class AgentOutputMetadata(BaseModel): - agent_name: str - task: str - timestamp: datetime.datetime - status: str - - -class AgentOutputData(BaseModel): - output: str - additional_info: Optional[Dict[str, Any]] = None - - -class AgentOutputSchema(BaseModel): - metadata: AgentOutputMetadata - data: AgentOutputData - - -class SwarmOutputSchema(BaseModel): - results: List[AgentOutputSchema] = Field(default_factory=list) - - -# SwarmManager class to manage agents and tasks -class SwarmManager: - def __init__( - self, - agents: List[Agent], - pulsar_service_url: str = PULSAR_SERVICE_URL, - ): - """ - Initializes the SwarmManager with a list of agents and Pulsar service URL. - - :param agents: List of Agent instances. - :param pulsar_service_url: URL of the Apache Pulsar service. - """ - self.agents = agents - self.pulsar_service_url = pulsar_service_url - self.client: Optional[Client] = None - self.producers: Dict[str, Producer] = {} - self.swarm_results = SwarmOutputSchema() - - def connect_pulsar(self) -> None: - """ - Establishes connection to the Apache Pulsar service. - """ - try: - self.client = Client( - self.pulsar_service_url, operation_timeout_seconds=30 - ) - logger.info( - f"Connected to Pulsar service at {self.pulsar_service_url}" - ) - except Exception as e: - logger.error(f"Failed to connect to Pulsar service: {e}") - raise - - def initialize_producers(self) -> None: - """ - Initializes Pulsar producers for each agent. - """ - if not self.client: - logger.error("Pulsar client is not connected.") - raise ConnectionError("Pulsar client is not connected.") - - for agent in self.agents: - try: - topic = f"{agent.agent_name}_topic" - producer = self.client.create_producer(topic) - self.producers[agent.agent_name] = producer - logger.debug( - f"Initialized producer for agent {agent.agent_name} on topic {topic}" - ) - except Exception as e: - logger.error( - f"Failed to create producer for agent {agent.agent_name}: {e}" - ) - raise - - def run_task(self, agent: Agent, task: str) -> AgentOutputSchema: - """ - Executes a task using the specified agent and returns the structured output. - - :param agent: The Agent instance to execute the task. - :param task: The task string to be executed. - :return: AgentOutputSchema containing the result and metadata. - """ - logger.info( - f"Agent {agent.agent_name} is starting task: {task}" - ) - timestamp = datetime.datetime.utcnow() - - try: - output = agent.run(task) - status = "Success" - logger.info( - f"Agent {agent.agent_name} completed task successfully." - ) - except Exception as e: - output = str(e) - status = "Failed" - logger.error( - f"Agent {agent.agent_name} failed to complete task: {e}" - ) - - metadata = AgentOutputMetadata( - agent_name=agent.agent_name, - task=task, - timestamp=timestamp, - status=status, - ) - - data = AgentOutputData(output=output) - - agent_output = AgentOutputSchema(metadata=metadata, data=data) - - # Publish result to Pulsar topic - try: - producer = self.producers.get(agent.agent_name) - if producer: - producer.send(agent_output.json().encode("utf-8")) - logger.debug( - f"Published output for agent {agent.agent_name} to Pulsar topic." - ) - else: - logger.warning( - f"No producer found for agent {agent.agent_name}. Skipping publish step." - ) - except Exception as e: - logger.error( - f"Failed to publish output for agent {agent.agent_name}: {e}" - ) - - return agent_output - - def run(self, task: str) -> SwarmOutputSchema: - """ - Runs the swarm by executing the task across all agents sequentially and returns aggregated results. - - :param task: The task string to be executed by the swarm. - :return: SwarmOutputSchema containing results from all agents. - """ - try: - self.connect_pulsar() - self.initialize_producers() - - for agent in self.agents: - result = self.run_task(agent, task) - self.swarm_results.results.append(result) - - logger.info("Swarm run completed successfully.") - return self.swarm_results - - except Exception as e: - logger.error(f"Swarm run encountered an error: {e}") - raise - - finally: - if self.client: - self.client.close() - logger.info("Pulsar client connection closed.") - - -# Example usage -if __name__ == "__main__": - # Initialize OpenAIChat model - api_key = os.getenv("OPENAI_API_KEY") - if not api_key: - logger.error( - "OPENAI_API_KEY environment variable is not set." - ) - sys.exit(1) - - model = OpenAIChat( - api_key=api_key, model_name="gpt-4", temperature=0.1 - ) - - # Define agents - agent1 = Agent( - agent_name="Financial-Analysis-Agent", - system_prompt=FINANCIAL_AGENT_SYS_PROMPT, - llm=model, - max_loops=1, - autosave=True, - dashboard=False, - verbose=True, - dynamic_temperature_enabled=True, - saved_state_path="finance_agent.json", - user_name="swarms_corp", - retry_attempts=1, - context_length=2000, - return_step_meta=False, - ) - - agent2 = Agent( - agent_name="Market-Analysis-Agent", - system_prompt=FINANCIAL_AGENT_SYS_PROMPT, - llm=model, - max_loops=1, - autosave=True, - dashboard=False, - verbose=True, - dynamic_temperature_enabled=True, - saved_state_path="market_agent.json", - user_name="swarms_corp", - retry_attempts=1, - context_length=2000, - return_step_meta=False, - ) - - # Initialize and run swarm - swarm = SwarmManager(agents=[agent1, agent2]) - task_description = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria?" - results = swarm.run(task_description) - - # Output results - print(results.json(indent=4))