diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index a63a2968..99ea5a2a 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -61,19 +61,19 @@ extra: provider: google property: G-MPE9C65596 - alternate: - - name: English - link: / - lang: en - - name: 简体中文 - link: /zh/ - lang: zh - - name: 日本語 - link: /ja/ - lang: ja - - name: 한국어 - link: /ko/ - lang: ko + # alternate: + # - name: English + # link: / + # lang: en + # - name: 简体中文 + # link: /zh/ + # lang: zh + # - name: 日本語 + # link: /ja/ + # lang: ja + # - name: 한국어 + # link: /ko/ + # lang: ko theme: name: material @@ -348,7 +348,8 @@ nav: - SwarmRouter Example: "swarms/examples/swarm_router.md" - MultiAgentRouter Minimal Example: "swarms/examples/multi_agent_router_minimal.md" - ConcurrentWorkflow Example: "swarms/examples/concurrent_workflow.md" - - MixtureOfAgents Example: "swarms/examples/mixture_of_agents.md" + # - MixtureOfAgents Example: "swarms/examples/mixture_of_agents.md" + - Mixture of Agents Example: "swarms/examples/moa_example.md" - Unique Swarms: "swarms/examples/unique_swarms.md" - Agents as Tools: "swarms/examples/agents_as_tools.md" - Aggregate Multi-Agent Responses: "swarms/examples/aggregate.md" diff --git a/docs/swarms/examples/moa_example.md b/docs/swarms/examples/moa_example.md new file mode 100644 index 00000000..3ce7d24c --- /dev/null +++ b/docs/swarms/examples/moa_example.md @@ -0,0 +1,132 @@ +# Mixture of Agents Example + +The Mixture of Agents (MoA) is a sophisticated multi-agent architecture that implements parallel processing with iterative refinement. This approach processes multiple specialized agents simultaneously, concatenates their outputs, and then performs multiple parallel runs to achieve consensus or enhanced results. + +## How It Works + +1. **Parallel Processing**: Multiple agents work simultaneously on the same input +2. **Output Concatenation**: Results from all agents are combined into a unified response +3. **Iterative Refinement**: The process repeats for `n` layers/iterations to improve quality +4. **Consensus Building**: Multiple runs help achieve more reliable and comprehensive outputs + +This architecture is particularly effective for complex tasks that benefit from diverse perspectives and iterative improvement, such as financial analysis, risk assessment, and multi-faceted problem solving. + +![Mixture of Agents](https://files.readme.io/ddb138e-moa-3layer.png) + + +## Installation + +Install the swarms package using pip: + +```bash +pip install -U swarms +``` + +## Basic Setup + +1. First, set up your environment variables: + +```python +WORKSPACE_DIR="agent_workspace" +ANTHROPIC_API_KEY="" +``` + +## Code + +```python +from swarms import Agent, MixtureOfAgents + +# Agent 1: Risk Metrics Calculator +risk_metrics_agent = Agent( + agent_name="Risk-Metrics-Calculator", + agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility", + system_prompt="""You are a risk metrics specialist. Calculate and explain: + - Value at Risk (VaR) + - Sharpe ratio + - Volatility + - Maximum drawdown + - Beta coefficient + + Provide clear, numerical results with brief explanations.""", + max_loops=1, + # model_name="gpt-4o-mini", + random_model_enabled=True, + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + max_tokens=4096, +) + +# Agent 2: Portfolio Risk Analyzer +portfolio_risk_agent = Agent( + agent_name="Portfolio-Risk-Analyzer", + agent_description="Analyzes portfolio diversification and concentration risk", + system_prompt="""You are a portfolio risk analyst. Focus on: + - Portfolio diversification analysis + - Concentration risk assessment + - Correlation analysis + - Sector/asset allocation risk + - Liquidity risk evaluation + + Provide actionable insights for risk reduction.""", + max_loops=1, + # model_name="gpt-4o-mini", + random_model_enabled=True, + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + max_tokens=4096, +) + +# Agent 3: Market Risk Monitor +market_risk_agent = Agent( + agent_name="Market-Risk-Monitor", + agent_description="Monitors market conditions and identifies risk factors", + system_prompt="""You are a market risk monitor. Identify and assess: + - Market volatility trends + - Economic risk factors + - Geopolitical risks + - Interest rate risks + - Currency risks + + Provide current risk alerts and trends.""", + max_loops=1, + # model_name="gpt-4o-mini", + random_model_enabled=True, + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + max_tokens=4096, +) + + +swarm = MixtureOfAgents( + agents=[ + risk_metrics_agent, + portfolio_risk_agent, + market_risk_agent, + ], + layers=1, + max_loops=1, + output_type="final", +) + + +out = swarm.run( + "Calculate VaR and Sharpe ratio for a portfolio with 15% annual return and 20% volatility" +) + +print(out) +``` + +## Support and Community + +If you're facing issues or want to learn more, check out the following resources to join our Discord, stay updated on Twitter, and watch tutorials on YouTube! + +| Platform | Link | Description | +|----------|------|-------------| +| 📚 Documentation | [docs.swarms.world](https://docs.swarms.world) | Official documentation and guides | +| 📝 Blog | [Medium](https://medium.com/@kyeg) | Latest updates and technical articles | +| 💬 Discord | [Join Discord](https://discord.gg/jM3Z6M9uMq) | Live chat and community support | +| 🐦 Twitter | [@kyegomez](https://twitter.com/kyegomez) | Latest news and announcements | +| 👥 LinkedIn | [The Swarm Corporation](https://www.linkedin.com/company/the-swarm-corporation) | Professional network and updates | +| 📺 YouTube | [Swarms Channel](https://www.youtube.com/channel/UC9yXyitkbU_WSy7bd_41SqQ) | Tutorials and demos | +| 🎫 Events | [Sign up here](https://lu.ma/5p2jnc2v) | Join our community events | + diff --git a/mixture_of_agents_example.py b/mixture_of_agents_example.py new file mode 100644 index 00000000..12bbf837 --- /dev/null +++ b/mixture_of_agents_example.py @@ -0,0 +1,80 @@ +from swarms import Agent, MixtureOfAgents + +# Agent 1: Risk Metrics Calculator +risk_metrics_agent = Agent( + agent_name="Risk-Metrics-Calculator", + agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility", + system_prompt="""You are a risk metrics specialist. Calculate and explain: + - Value at Risk (VaR) + - Sharpe ratio + - Volatility + - Maximum drawdown + - Beta coefficient + + Provide clear, numerical results with brief explanations.""", + max_loops=1, + # model_name="gpt-4o-mini", + random_model_enabled=True, + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + max_tokens=4096, +) + +# Agent 2: Portfolio Risk Analyzer +portfolio_risk_agent = Agent( + agent_name="Portfolio-Risk-Analyzer", + agent_description="Analyzes portfolio diversification and concentration risk", + system_prompt="""You are a portfolio risk analyst. Focus on: + - Portfolio diversification analysis + - Concentration risk assessment + - Correlation analysis + - Sector/asset allocation risk + - Liquidity risk evaluation + + Provide actionable insights for risk reduction.""", + max_loops=1, + # model_name="gpt-4o-mini", + random_model_enabled=True, + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + max_tokens=4096, +) + +# Agent 3: Market Risk Monitor +market_risk_agent = Agent( + agent_name="Market-Risk-Monitor", + agent_description="Monitors market conditions and identifies risk factors", + system_prompt="""You are a market risk monitor. Identify and assess: + - Market volatility trends + - Economic risk factors + - Geopolitical risks + - Interest rate risks + - Currency risks + + Provide current risk alerts and trends.""", + max_loops=1, + # model_name="gpt-4o-mini", + random_model_enabled=True, + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + max_tokens=4096, +) + + +swarm = MixtureOfAgents( + agents=[ + risk_metrics_agent, + portfolio_risk_agent, + market_risk_agent, + ], + layers=1, + max_loops=1, + output_type="final", +) + + +out = swarm.run( + "Calculate VaR and Sharpe ratio for a portfolio with 15% annual return and 20% volatility" +) + +print(out) diff --git a/swarms/structs/ma_utils.py b/swarms/structs/ma_utils.py index 8d28b76e..a3a9eeb8 100644 --- a/swarms/structs/ma_utils.py +++ b/swarms/structs/ma_utils.py @@ -5,8 +5,9 @@ import random def list_all_agents( agents: List[Union[Callable, Any]], conversation: Optional[Any] = None, - name: str = "", - add_to_conversation: bool = False, + name: Optional[str] = None, + description: Optional[str] = None, + add_to_conversation: Optional[bool] = False, ) -> str: """Lists all agents in a swarm and optionally adds them to a conversation. @@ -27,6 +28,7 @@ def list_all_agents( >>> conversation = Conversation() >>> agent_info = list_all_agents(agents, conversation, "MySwarm") >>> print(agent_info) + Swarm: MySwarm Total Agents: 2 Agent: Agent1 @@ -39,8 +41,15 @@ def list_all_agents( # Compile information about all agents total_agents = len(agents) - all_agents = f"Total Agents: {total_agents}\n\n" + "\n\n".join( - f"Agent: {agent.agent_name} \n\n Description: {agent.description or (agent.system_prompt[:50] + '...' if len(agent.system_prompt) > 50 else agent.system_prompt)}" + all_agents = f"Team Name: {name}\n" if name else "" + all_agents += ( + f"Team Description: {description}\n" if description else "" + ) + all_agents += f"Total Agents: {total_agents}\n\n" + all_agents += "| Agent | Description |\n" + all_agents += "|-------|-------------|\n" + all_agents += "\n".join( + f"| {agent.agent_name} | {agent.description or (agent.system_prompt[:50] + '...' if len(agent.system_prompt) > 50 else agent.system_prompt)} |" for agent in agents ) @@ -48,7 +57,7 @@ def list_all_agents( # Add the agent information to the conversation conversation.add( role="System", - content=f"All Agents Available in the Swarm {name}:\n\n{all_agents}", + content=all_agents, ) return all_agents diff --git a/swarms/structs/mixture_of_agents.py b/swarms/structs/mixture_of_agents.py index 9c6b8756..3bab8211 100644 --- a/swarms/structs/mixture_of_agents.py +++ b/swarms/structs/mixture_of_agents.py @@ -1,17 +1,18 @@ -import asyncio import os from typing import List, Optional from swarms.structs.agent import Agent from swarms.prompts.ag_prompt import aggregator_system_prompt_main +from swarms.structs.ma_utils import list_all_agents +from swarms.utils.history_output_formatter import ( + history_output_formatter, +) from swarms.utils.loguru_logger import initialize_logger import concurrent.futures from swarms.utils.output_types import OutputType from swarms.structs.conversation import Conversation -from swarms.utils.history_output_formatter import ( - history_output_formatter, -) + logger = initialize_logger(log_folder="mixture_of_agents") @@ -25,13 +26,13 @@ class MixtureOfAgents: self, name: str = "MixtureOfAgents", description: str = "A class to run a mixture of agents and aggregate their responses.", - agents: List[Agent] = [], + agents: List[Agent] = None, aggregator_agent: Agent = None, aggregator_system_prompt: str = aggregator_system_prompt_main, layers: int = 3, max_loops: int = 1, - return_str_on: bool = False, - output_type: OutputType = "dict", + output_type: OutputType = "final", + aggregator_model_name: str = "claude-3-5-sonnet-20240620", ) -> None: """ Initialize the Mixture of Agents class with agents and configuration. @@ -48,16 +49,36 @@ class MixtureOfAgents: self.description = description self.agents = agents self.aggregator_agent = aggregator_agent - self.aggregator_system_prompt = aggregator_system_prompt_main + self.aggregator_system_prompt = aggregator_system_prompt self.layers = layers self.max_loops = max_loops - self.return_str_on = return_str_on self.output_type = output_type + self.aggregator_model_name = aggregator_model_name + self.aggregator_agent = self.aggregator_agent_setup() self.reliability_check() self.conversation = Conversation() + list_all_agents( + agents=self.agents, + conversation=self.conversation, + description=self.description, + name=self.name, + add_to_conversation=True, + ) + + def aggregator_agent_setup(self): + return Agent( + agent_name="Aggregator Agent", + description="An agent that aggregates the responses of the other agents.", + system_prompt=aggregator_system_prompt_main, + model_name=self.aggregator_model_name, + temperature=0.5, + max_loops=1, + output_type="str-all-except-first", + ) + def reliability_check(self) -> None: """ Performs a reliability check on the Mixture of Agents class. @@ -66,8 +87,8 @@ class MixtureOfAgents: "Checking the reliability of the Mixture of Agents class." ) - if not self.agents: - raise ValueError("No reference agents provided.") + if len(self.agents) == 0: + raise ValueError("No agents provided.") if not self.aggregator_agent: raise ValueError("No aggregator agent provided.") @@ -78,129 +99,83 @@ class MixtureOfAgents: if not self.layers: raise ValueError("No layers provided.") - if self.layers < 1: - raise ValueError("Layers must be greater than 0.") - logger.info("Reliability check passed.") logger.info("Mixture of Agents class is ready for use.") - def _get_final_system_prompt( - self, system_prompt: str, results: List[str] - ) -> str: - """ - Constructs a system prompt for subsequent layers that includes previous responses. + def save_to_markdown_file(self, file_path: str = "moa.md"): + with open(file_path, "w") as f: + f.write(self.conversation.get_str()) - Args: - system_prompt (str): The initial system prompt. - results (List[str]): A list of previous responses. - - Returns: - str: The final system prompt including previous responses. - """ - return ( - system_prompt - + "\n" - + "\n".join( - [ - f"{i+1}. {str(element)}" - for i, element in enumerate(results) - ] - ) - ) - - async def _run_agent_async( + def step( self, - agent: Agent, task: str, - prev_responses: Optional[List[str]] = None, - ) -> str: - """ - Asynchronous method to run a single agent. + img: Optional[str] = None, + imgs: Optional[List[str]] = None, + ): + # self.conversation.add(role="User", content=task) - Args: - agent (Agent): The agent to be run. - task (str): The task for the agent. - prev_responses (Optional[List[str]], optional): A list of previous responses. Defaults to None. + # Run agents concurrently + with concurrent.futures.ThreadPoolExecutor( + max_workers=os.cpu_count() + ) as executor: + # Submit all agent tasks and store with their index + future_to_agent = { + executor.submit( + agent.run, task=task, img=img, imgs=imgs + ): agent + for agent in self.agents + } - Returns: - str: The response from the agent. - """ - # If there are previous responses, update the agent's system prompt - if prev_responses: - system_prompt_with_responses = ( - self._get_final_system_prompt( - self.aggregator_system_prompt, prev_responses - ) - ) - agent.system_prompt = system_prompt_with_responses + # Collect results and add to conversation in completion order + for future in concurrent.futures.as_completed( + future_to_agent + ): + agent = future_to_agent[future] + output = future.result() + self.conversation.add(role=agent.name, content=output) - # Run the agent asynchronously - response = await asyncio.to_thread(agent.run, task) + return self.conversation.get_str() - self.conversation.add(agent.agent_name, response) + def _run( + self, + task: str, + img: Optional[str] = None, + imgs: Optional[List[str]] = None, + ): - # Log the agent's response - print(f"Agent {agent.agent_name} response: {response}") - return response + self.conversation.add(role="User", content=task) - async def _run_async(self, task: str) -> None: - """ - Asynchronous method to run the Mixture of Agents process. + for i in range(self.layers): + out = self.step( + task=self.conversation.get_str(), img=img, imgs=imgs + ) + task = out - Args: - task (str): The task for the mixture of agents. - """ - # Gather initial responses from reference agents - results: List[str] = await asyncio.gather( - *[ - self._run_agent_async(agent, task) - for agent in self.agents - ] + out = self.aggregator_agent.run( + task=self.conversation.get_str() ) - # Process additional layers, if applicable - for _ in range(1, self.layers - 1): - results = await asyncio.gather( - *[ - self._run_agent_async( - agent, task, prev_responses=results - ) - for agent in self.agents - ] - ) - - # Perform final aggregation using the aggregator agent - final_result = await self._run_agent_async( - self.aggregator_agent, task, prev_responses=results + self.conversation.add( + role=self.aggregator_agent.agent_name, content=out ) - print(f"Final Aggregated Response: {final_result}") + out = history_output_formatter( + conversation=self.conversation, type=self.output_type + ) - def run(self, task: str) -> None: - """ - Synchronous wrapper to run the async process. + return out - Args: - task (str): The task for the mixture of agents. - """ + def run( + self, + task: str, + img: Optional[str] = None, + imgs: Optional[List[str]] = None, + ): try: - self.conversation.add("user", task) - - for _ in range(self.max_loops): - # Add previous context to task if available - prompt = f"History: {self.conversation.get_str()}\n\nTask: {task}" - - # Run async process - asyncio.run(self._run_async(prompt)) - - return history_output_formatter( - conversation=self.conversation, - type=self.output_type, - ) - + return self._run(task=task, img=img, imgs=imgs) except Exception as e: - logger.error(f"Error running mixture of agents: {str(e)}") - raise e + logger.error(f"Error running Mixture of Agents: {e}") + return f"Error: {e}" def run_batched(self, tasks: List[str]) -> List[str]: """