Update monte_carlo_swarm.py

pull/593/head
kirill670 4 months ago committed by GitHub
parent edc293cb6f
commit 2e3124270e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,5 +1,5 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Callable, List, Optional
from typing import Any, Callable, List, Optional, Dict
from swarms import Agent
from swarms.structs.base_swarm import BaseSwarm
@ -7,38 +7,17 @@ from swarms.utils.loguru_logger import logger
class MonteCarloSwarm(BaseSwarm):
"""
MonteCarloSwarm leverages multiple agents to collaborate in a Monte Carlo fashion.
Each agent's output is passed to the next, refining the result progressively.
Supports parallel execution, dynamic agent selection, and custom result aggregation.
Attributes:
agents (List[Agent]): A list of agents that will participate in the swarm.
parallel (bool): If True, agents will run in parallel.
result_aggregator (Callable[[List[Any]], Any]): A function to aggregate results from agents.
max_workers (Optional[int]): The maximum number of threads for parallel execution.
"""
def __init__(
self,
agents: List[Agent],
parallel: bool = False,
result_aggregator: Optional[
Callable[[List[Any]], Any]
] = None,
iterations: int = 10, # Number of Monte Carlo iterations
result_aggregator: Optional[Callable[[List[Any]], Any]] = None,
agent_selector: Optional[Callable[[List[Agent], int, Dict], Agent]] = None,
max_workers: Optional[int] = None,
*args,
**kwargs,
) -> None:
"""
Initializes the MonteCarloSwarm with a list of agents.
Args:
agents (List[Agent]): A list of agents to include in the swarm.
parallel (bool): If True, agents will run in parallel. Default is False.
result_aggregator (Optional[Callable[[List[Any]], Any]]): A function to aggregate results from agents.
max_workers (Optional[int]): The maximum number of threads for parallel execution.
"""
super().__init__(agents=agents, *args, **kwargs)
if not agents:
@ -46,81 +25,64 @@ class MonteCarloSwarm(BaseSwarm):
self.agents = agents
self.parallel = parallel
self.result_aggregator = (
result_aggregator or self.default_aggregator
)
self.iterations = iterations
self.result_aggregator = result_aggregator or self.default_aggregator
self.agent_selector = agent_selector or self.default_agent_selector
self.max_workers = max_workers or len(agents)
self.agent_performance: Dict[str, List[float]] = {agent.agent_name: [] for agent in agents}
def run(self, task: str) -> Any:
"""
Runs the MonteCarloSwarm with the given input, passing the output of each agent
to the next one in the list or running agents in parallel.
logger.info(f"Starting MonteCarloSwarm with parallel={self.parallel}, iterations={self.iterations}")
results = []
for i in range(self.iterations):
logger.info(f"Starting iteration {i+1}")
if self.parallel:
iteration_results = self._run_parallel(task)
else:
iteration_results = self._run_sequential(task)
Args:
task (str): The initial input to provide to the first agent.
results.append(self.result_aggregator(iteration_results))
Returns:
Any: The final output after all agents have processed the input.
"""
logger.info(
f"Starting MonteCarloSwarm with parallel={self.parallel}"
)
# Update agent performance metrics (example)
for j, agent_result in enumerate(iteration_results):
agent_name = self.agents[j].agent_name
# Example: Store some performance metric (replace with your actual metric)
self.agent_performance[agent_name].append(len(str(agent_result)))
if self.parallel:
results = self._run_parallel(task)
else:
results = self._run_sequential(task)
final_output = self.result_aggregator(results)
logger.info(
f"MonteCarloSwarm completed. Final output: {final_output}"
)
final_output = self.result_aggregator(results) # Aggregate across all iterations
logger.info(f"MonteCarloSwarm completed. Final output: {final_output}")
logger.info(f"Agent performance: {self.agent_performance}")
return final_output
def _run_sequential(self, task: str) -> List[Any]:
"""
Runs the agents sequentially, passing each agent's output to the next.
Args:
task (str): The initial input to provide to the first agent.
Returns:
List[Any]: A list of results from each agent.
"""
results = []
current_input = task
for i, agent in enumerate(self.agents):
logger.info(f"Agent {i + 1} processing sequentially...")
for i in range(len(self.agents)):
agent = self.agent_selector(self.agents, i, self.agent_performance) # Dynamic agent selection
logger.info(f"Agent {agent.agent_name} processing sequentially...")
current_output = agent.run(current_input)
results.append(current_output)
current_input = current_output
current_input = current_output # Pass output to the next agent
return results
def _run_parallel(self, task: str) -> List[Any]:
"""
Runs the agents in parallel, each receiving the same initial input.
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for i in range(len(self.agents)):
agent = self.agent_selector(self.agents, i, self.agent_performance)
logger.info(f"Submitting task to agent {agent.agent_name} in parallel...")
futures.append(executor.submit(agent.run, task))
Args:
task (str): The initial input to provide to all agents.
Returns:
List[Any]: A list of results from each agent.
"""
results = []
with ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
future_to_agent = {
executor.submit(agent.run, task): agent
for agent in self.agents
}
for future in as_completed(future_to_agent):
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
logger.info(
f"Agent completed with result: {result}"
)
logger.info(f"Agent completed with result: {result}")
except Exception as e:
logger.error(f"Agent encountered an error: {e}")
results.append(None)
@ -128,250 +90,23 @@ class MonteCarloSwarm(BaseSwarm):
@staticmethod
def default_aggregator(results: List[Any]) -> Any:
"""
Default result aggregator that returns the last result.
Args:
results (List[Any]): A list of results from agents.
Returns:
Any: The final aggregated result.
"""
return results
def average_aggregator(results: List[float]) -> float:
return sum(results) / len(results) if results else 0.0
# # Example usage
# if __name__ == "__main__":
# # Get the OpenAI API key from the environment variable
# api_key = os.getenv("OPENAI_API_KEY")
# # Create an instance of the OpenAIChat class
# model = OpenAIChat(
# api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
# )
# # Initialize the agents
# agents_list = [
# Agent(
# agent_name="Financial-Analysis-Agent-1",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
# llm=model,
# max_loops=1,
# autosave=False,
# dashboard=False,
# verbose=True,
# streaming_on=True,
# dynamic_temperature_enabled=True,
# saved_state_path="finance_agent_1.json",
# retry_attempts=3,
# context_length=200000,
# ),
# Agent(
# agent_name="Financial-Analysis-Agent-2",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
# llm=model,
# max_loops=1,
# autosave=False,
# dashboard=False,
# verbose=True,
# streaming_on=True,
# dynamic_temperature_enabled=True,
# saved_state_path="finance_agent_2.json",
# retry_attempts=3,
# context_length=200000,
# ),
# # Add more agents as needed
# ]
# # Initialize the MonteCarloSwarm with parallel execution enabled
# swarm = MonteCarloSwarm(
# agents=agents_list, parallel=True, max_workers=2
# )
# # Run the swarm with an initial query
# final_output = swarm.run(
# "What are the components of a startup's stock incentive equity plan?"
# )
# print("Final output:", final_output)
# import os
# from swarms import Agent
# from typing import List, Union, Callable
# from collections import Counter
# # Aggregation functions
# def aggregate_most_common_result(results: List[str]) -> str:
# """
# Aggregate results using the most common result.
# Args:
# results (List[str]): List of results from each iteration.
# Returns:
# str: The most common result.
# """
# result_counter = Counter(results)
# most_common_result = result_counter.most_common(1)[0][0]
# return most_common_result
return results[-1] if results else None # Return the last result by default
# def aggregate_weighted_vote(results: List[str], weights: List[int]) -> str:
# """
# Aggregate results using a weighted voting system.
# Args:
# results (List[str]): List of results from each iteration.
# weights (List[int]): List of weights corresponding to each result.
# Returns:
# str: The result with the highest weighted vote.
# """
# weighted_results = Counter()
# for result, weight in zip(results, weights):
# weighted_results[result] += weight
# weighted_result = weighted_results.most_common(1)[0][0]
# return weighted_result
# def aggregate_average_numerical(results: List[Union[str, float]]) -> float:
# """
# Aggregate results by averaging numerical outputs.
# Args:
# results (List[Union[str, float]]): List of numerical results from each iteration.
# Returns:
# float: The average of the numerical results.
# """
# numerical_results = [
# float(result) for result in results if is_numerical(result)
# ]
# if numerical_results:
# return sum(numerical_results) / len(numerical_results)
# else:
# return float("nan") # or handle non-numerical case as needed
# def aggregate_consensus(results: List[str]) -> Union[str, None]:
# """
# Aggregate results by checking if there's a consensus (all results are the same).
# Args:
# results (List[str]): List of results from each iteration.
# Returns:
# Union[str, None]: The consensus result if there is one, otherwise None.
# """
# if all(result == results[0] for result in results):
# return results[0]
# else:
# return None # or handle lack of consensus as needed
# def is_numerical(value: str) -> bool:
# """
# Check if a string can be interpreted as a numerical value.
# Args:
# value (str): The string to check.
# Returns:
# bool: True if the string is numerical, otherwise False.
# """
# try:
# float(value)
# return True
# except ValueError:
# return False
# # MonteCarloSwarm class
# class MonteCarloSwarm:
# def __init__(
# self,
# agents: List[Agent],
# iterations: int = 100,
# aggregator: Callable = aggregate_most_common_result,
# ):
# self.agents = agents
# self.iterations = iterations
# self.aggregator = aggregator
# def run(self, task: str) -> Union[str, float, None]:
# """
# Execute the Monte Carlo swarm, passing the output of each agent to the next.
# The final result is aggregated over multiple iterations using the provided aggregator.
# Args:
# task (str): The task for the swarm to execute.
# Returns:
# Union[str, float, None]: The final aggregated result.
# """
# aggregated_results = []
# for i in range(self.iterations):
# result = task
# for agent in self.agents:
# result = agent.run(result)
# aggregated_results.append(result)
# # Apply the selected aggregation function
# final_result = self.aggregator(aggregated_results)
# return final_result
# # Example usage:
# # Assuming you have the OpenAI API key set up and agents defined
# api_key = os.getenv("OPENAI_API_KEY")
# model = OpenAIChat(
# api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
# )
# agent1 = Agent(
# agent_name="Agent1",
# system_prompt="System prompt for agent 1",
# llm=model,
# max_loops=1,
# verbose=True,
# )
# agent2 = Agent(
# agent_name="Agent2",
# system_prompt="System prompt for agent 2",
# llm=model,
# max_loops=1,
# verbose=True,
# )
@staticmethod
def default_agent_selector(agents: List[Agent], iteration: int, agent_performance: Dict) -> Agent:
return agents[iteration % len(agents)] # Round-robin by default
# # Create a MonteCarloSwarm with the agents and a selected aggregation function
# swarm = MonteCarloSwarm(
# agents=[agent1, agent2],
# iterations=1,
# aggregator=aggregate_weighted_vote,
# )
# Example usage with dynamic agent selection and iterative refinement:
# # Run the swarm on a specific task
# final_output = swarm.run(
# "What are the components of a startup's stock incentive plan?"
# )
# print("Final Output:", final_output)
# # You can easily switch the aggregation function by passing a different one to the constructor:
# # swarm = MonteCarloSwarm(agents=[agent1, agent2], iterations=100, aggregator=aggregate_weighted_vote)
def best_performing_agent_selector(agents: List[Agent], iteration: int, agent_performance: Dict) -> Agent:
"""Selects the best performing agent based on average result length."""
if not all(agent_performance.values()): # Check if any agent has no performance data yet
return agents[iteration % len(agents)] # Default to round robin if no performance data
# # If using weighted voting, you'll need to adjust the aggregator call to provide the weights:
# # weights = list(range(100, 0, -1)) # Example weights for 100 iterations
# # swarm = MonteCarloSwarm(agents=[agent1, agent2], iterations=100, aggregator=lambda results: aggregate_weighted_vote(results, weights))
average_performance = {
agent_name: sum(scores) / len(scores) if scores else 0
for agent_name, scores in agent_performance.items()
}
best_agent_name = max(average_performance, key=average_performance.get)
return next((agent for agent in agents if agent.agent_name == best_agent_name), agents[0])

Loading…
Cancel
Save