mixture of agents improvement and fix and example in docs with image

pull/926/head
Kye Gomez 6 days ago
parent 2770b8c7bf
commit 7277a3ffbb

@ -61,19 +61,19 @@ extra:
provider: google
property: G-MPE9C65596
alternate:
- name: English
link: /
lang: en
- name: 简体中文
link: /zh/
lang: zh
- name: 日本語
link: /ja/
lang: ja
- name: 한국어
link: /ko/
lang: ko
# alternate:
# - name: English
# link: /
# lang: en
# - name: 简体中文
# link: /zh/
# lang: zh
# - name: 日本語
# link: /ja/
# lang: ja
# - name: 한국어
# link: /ko/
# lang: ko
theme:
name: material
@ -348,7 +348,8 @@ nav:
- SwarmRouter Example: "swarms/examples/swarm_router.md"
- MultiAgentRouter Minimal Example: "swarms/examples/multi_agent_router_minimal.md"
- ConcurrentWorkflow Example: "swarms/examples/concurrent_workflow.md"
- MixtureOfAgents Example: "swarms/examples/mixture_of_agents.md"
# - MixtureOfAgents Example: "swarms/examples/mixture_of_agents.md"
- Mixture of Agents Example: "swarms/examples/moa_example.md"
- Unique Swarms: "swarms/examples/unique_swarms.md"
- Agents as Tools: "swarms/examples/agents_as_tools.md"
- Aggregate Multi-Agent Responses: "swarms/examples/aggregate.md"

@ -0,0 +1,132 @@
# Mixture of Agents Example
The Mixture of Agents (MoA) is a sophisticated multi-agent architecture that implements parallel processing with iterative refinement. This approach processes multiple specialized agents simultaneously, concatenates their outputs, and then performs multiple parallel runs to achieve consensus or enhanced results.
## How It Works
1. **Parallel Processing**: Multiple agents work simultaneously on the same input
2. **Output Concatenation**: Results from all agents are combined into a unified response
3. **Iterative Refinement**: The process repeats for `n` layers/iterations to improve quality
4. **Consensus Building**: Multiple runs help achieve more reliable and comprehensive outputs
This architecture is particularly effective for complex tasks that benefit from diverse perspectives and iterative improvement, such as financial analysis, risk assessment, and multi-faceted problem solving.
![Mixture of Agents](https://files.readme.io/ddb138e-moa-3layer.png)
## Installation
Install the swarms package using pip:
```bash
pip install -U swarms
```
## Basic Setup
1. First, set up your environment variables:
```python
WORKSPACE_DIR="agent_workspace"
ANTHROPIC_API_KEY=""
```
## Code
```python
from swarms import Agent, MixtureOfAgents
# Agent 1: Risk Metrics Calculator
risk_metrics_agent = Agent(
agent_name="Risk-Metrics-Calculator",
agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility",
system_prompt="""You are a risk metrics specialist. Calculate and explain:
- Value at Risk (VaR)
- Sharpe ratio
- Volatility
- Maximum drawdown
- Beta coefficient
Provide clear, numerical results with brief explanations.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 2: Portfolio Risk Analyzer
portfolio_risk_agent = Agent(
agent_name="Portfolio-Risk-Analyzer",
agent_description="Analyzes portfolio diversification and concentration risk",
system_prompt="""You are a portfolio risk analyst. Focus on:
- Portfolio diversification analysis
- Concentration risk assessment
- Correlation analysis
- Sector/asset allocation risk
- Liquidity risk evaluation
Provide actionable insights for risk reduction.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 3: Market Risk Monitor
market_risk_agent = Agent(
agent_name="Market-Risk-Monitor",
agent_description="Monitors market conditions and identifies risk factors",
system_prompt="""You are a market risk monitor. Identify and assess:
- Market volatility trends
- Economic risk factors
- Geopolitical risks
- Interest rate risks
- Currency risks
Provide current risk alerts and trends.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
swarm = MixtureOfAgents(
agents=[
risk_metrics_agent,
portfolio_risk_agent,
market_risk_agent,
],
layers=1,
max_loops=1,
output_type="final",
)
out = swarm.run(
"Calculate VaR and Sharpe ratio for a portfolio with 15% annual return and 20% volatility"
)
print(out)
```
## Support and Community
If you're facing issues or want to learn more, check out the following resources to join our Discord, stay updated on Twitter, and watch tutorials on YouTube!
| Platform | Link | Description |
|----------|------|-------------|
| 📚 Documentation | [docs.swarms.world](https://docs.swarms.world) | Official documentation and guides |
| 📝 Blog | [Medium](https://medium.com/@kyeg) | Latest updates and technical articles |
| 💬 Discord | [Join Discord](https://discord.gg/jM3Z6M9uMq) | Live chat and community support |
| 🐦 Twitter | [@kyegomez](https://twitter.com/kyegomez) | Latest news and announcements |
| 👥 LinkedIn | [The Swarm Corporation](https://www.linkedin.com/company/the-swarm-corporation) | Professional network and updates |
| 📺 YouTube | [Swarms Channel](https://www.youtube.com/channel/UC9yXyitkbU_WSy7bd_41SqQ) | Tutorials and demos |
| 🎫 Events | [Sign up here](https://lu.ma/5p2jnc2v) | Join our community events |

@ -0,0 +1,80 @@
from swarms import Agent, MixtureOfAgents
# Agent 1: Risk Metrics Calculator
risk_metrics_agent = Agent(
agent_name="Risk-Metrics-Calculator",
agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility",
system_prompt="""You are a risk metrics specialist. Calculate and explain:
- Value at Risk (VaR)
- Sharpe ratio
- Volatility
- Maximum drawdown
- Beta coefficient
Provide clear, numerical results with brief explanations.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 2: Portfolio Risk Analyzer
portfolio_risk_agent = Agent(
agent_name="Portfolio-Risk-Analyzer",
agent_description="Analyzes portfolio diversification and concentration risk",
system_prompt="""You are a portfolio risk analyst. Focus on:
- Portfolio diversification analysis
- Concentration risk assessment
- Correlation analysis
- Sector/asset allocation risk
- Liquidity risk evaluation
Provide actionable insights for risk reduction.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 3: Market Risk Monitor
market_risk_agent = Agent(
agent_name="Market-Risk-Monitor",
agent_description="Monitors market conditions and identifies risk factors",
system_prompt="""You are a market risk monitor. Identify and assess:
- Market volatility trends
- Economic risk factors
- Geopolitical risks
- Interest rate risks
- Currency risks
Provide current risk alerts and trends.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
swarm = MixtureOfAgents(
agents=[
risk_metrics_agent,
portfolio_risk_agent,
market_risk_agent,
],
layers=1,
max_loops=1,
output_type="final",
)
out = swarm.run(
"Calculate VaR and Sharpe ratio for a portfolio with 15% annual return and 20% volatility"
)
print(out)

@ -5,8 +5,9 @@ import random
def list_all_agents(
agents: List[Union[Callable, Any]],
conversation: Optional[Any] = None,
name: str = "",
add_to_conversation: bool = False,
name: Optional[str] = None,
description: Optional[str] = None,
add_to_conversation: Optional[bool] = False,
) -> str:
"""Lists all agents in a swarm and optionally adds them to a conversation.
@ -27,6 +28,7 @@ def list_all_agents(
>>> conversation = Conversation()
>>> agent_info = list_all_agents(agents, conversation, "MySwarm")
>>> print(agent_info)
Swarm: MySwarm
Total Agents: 2
Agent: Agent1
@ -39,8 +41,15 @@ def list_all_agents(
# Compile information about all agents
total_agents = len(agents)
all_agents = f"Total Agents: {total_agents}\n\n" + "\n\n".join(
f"Agent: {agent.agent_name} \n\n Description: {agent.description or (agent.system_prompt[:50] + '...' if len(agent.system_prompt) > 50 else agent.system_prompt)}"
all_agents = f"Team Name: {name}\n" if name else ""
all_agents += (
f"Team Description: {description}\n" if description else ""
)
all_agents += f"Total Agents: {total_agents}\n\n"
all_agents += "| Agent | Description |\n"
all_agents += "|-------|-------------|\n"
all_agents += "\n".join(
f"| {agent.agent_name} | {agent.description or (agent.system_prompt[:50] + '...' if len(agent.system_prompt) > 50 else agent.system_prompt)} |"
for agent in agents
)
@ -48,7 +57,7 @@ def list_all_agents(
# Add the agent information to the conversation
conversation.add(
role="System",
content=f"All Agents Available in the Swarm {name}:\n\n{all_agents}",
content=all_agents,
)
return all_agents

@ -1,17 +1,18 @@
import asyncio
import os
from typing import List, Optional
from swarms.structs.agent import Agent
from swarms.prompts.ag_prompt import aggregator_system_prompt_main
from swarms.structs.ma_utils import list_all_agents
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.loguru_logger import initialize_logger
import concurrent.futures
from swarms.utils.output_types import OutputType
from swarms.structs.conversation import Conversation
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
logger = initialize_logger(log_folder="mixture_of_agents")
@ -25,13 +26,13 @@ class MixtureOfAgents:
self,
name: str = "MixtureOfAgents",
description: str = "A class to run a mixture of agents and aggregate their responses.",
agents: List[Agent] = [],
agents: List[Agent] = None,
aggregator_agent: Agent = None,
aggregator_system_prompt: str = aggregator_system_prompt_main,
layers: int = 3,
max_loops: int = 1,
return_str_on: bool = False,
output_type: OutputType = "dict",
output_type: OutputType = "final",
aggregator_model_name: str = "claude-3-5-sonnet-20240620",
) -> None:
"""
Initialize the Mixture of Agents class with agents and configuration.
@ -48,16 +49,36 @@ class MixtureOfAgents:
self.description = description
self.agents = agents
self.aggregator_agent = aggregator_agent
self.aggregator_system_prompt = aggregator_system_prompt_main
self.aggregator_system_prompt = aggregator_system_prompt
self.layers = layers
self.max_loops = max_loops
self.return_str_on = return_str_on
self.output_type = output_type
self.aggregator_model_name = aggregator_model_name
self.aggregator_agent = self.aggregator_agent_setup()
self.reliability_check()
self.conversation = Conversation()
list_all_agents(
agents=self.agents,
conversation=self.conversation,
description=self.description,
name=self.name,
add_to_conversation=True,
)
def aggregator_agent_setup(self):
return Agent(
agent_name="Aggregator Agent",
description="An agent that aggregates the responses of the other agents.",
system_prompt=aggregator_system_prompt_main,
model_name=self.aggregator_model_name,
temperature=0.5,
max_loops=1,
output_type="str-all-except-first",
)
def reliability_check(self) -> None:
"""
Performs a reliability check on the Mixture of Agents class.
@ -66,8 +87,8 @@ class MixtureOfAgents:
"Checking the reliability of the Mixture of Agents class."
)
if not self.agents:
raise ValueError("No reference agents provided.")
if len(self.agents) == 0:
raise ValueError("No agents provided.")
if not self.aggregator_agent:
raise ValueError("No aggregator agent provided.")
@ -78,129 +99,83 @@ class MixtureOfAgents:
if not self.layers:
raise ValueError("No layers provided.")
if self.layers < 1:
raise ValueError("Layers must be greater than 0.")
logger.info("Reliability check passed.")
logger.info("Mixture of Agents class is ready for use.")
def _get_final_system_prompt(
self, system_prompt: str, results: List[str]
) -> str:
"""
Constructs a system prompt for subsequent layers that includes previous responses.
def save_to_markdown_file(self, file_path: str = "moa.md"):
with open(file_path, "w") as f:
f.write(self.conversation.get_str())
Args:
system_prompt (str): The initial system prompt.
results (List[str]): A list of previous responses.
Returns:
str: The final system prompt including previous responses.
"""
return (
system_prompt
+ "\n"
+ "\n".join(
[
f"{i+1}. {str(element)}"
for i, element in enumerate(results)
]
)
)
async def _run_agent_async(
def step(
self,
agent: Agent,
task: str,
prev_responses: Optional[List[str]] = None,
) -> str:
"""
Asynchronous method to run a single agent.
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
):
# self.conversation.add(role="User", content=task)
Args:
agent (Agent): The agent to be run.
task (str): The task for the agent.
prev_responses (Optional[List[str]], optional): A list of previous responses. Defaults to None.
# Run agents concurrently
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
# Submit all agent tasks and store with their index
future_to_agent = {
executor.submit(
agent.run, task=task, img=img, imgs=imgs
): agent
for agent in self.agents
}
Returns:
str: The response from the agent.
"""
# If there are previous responses, update the agent's system prompt
if prev_responses:
system_prompt_with_responses = (
self._get_final_system_prompt(
self.aggregator_system_prompt, prev_responses
)
)
agent.system_prompt = system_prompt_with_responses
# Collect results and add to conversation in completion order
for future in concurrent.futures.as_completed(
future_to_agent
):
agent = future_to_agent[future]
output = future.result()
self.conversation.add(role=agent.name, content=output)
# Run the agent asynchronously
response = await asyncio.to_thread(agent.run, task)
return self.conversation.get_str()
self.conversation.add(agent.agent_name, response)
def _run(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
):
# Log the agent's response
print(f"Agent {agent.agent_name} response: {response}")
return response
self.conversation.add(role="User", content=task)
async def _run_async(self, task: str) -> None:
"""
Asynchronous method to run the Mixture of Agents process.
for i in range(self.layers):
out = self.step(
task=self.conversation.get_str(), img=img, imgs=imgs
)
task = out
Args:
task (str): The task for the mixture of agents.
"""
# Gather initial responses from reference agents
results: List[str] = await asyncio.gather(
*[
self._run_agent_async(agent, task)
for agent in self.agents
]
out = self.aggregator_agent.run(
task=self.conversation.get_str()
)
# Process additional layers, if applicable
for _ in range(1, self.layers - 1):
results = await asyncio.gather(
*[
self._run_agent_async(
agent, task, prev_responses=results
)
for agent in self.agents
]
)
# Perform final aggregation using the aggregator agent
final_result = await self._run_agent_async(
self.aggregator_agent, task, prev_responses=results
self.conversation.add(
role=self.aggregator_agent.agent_name, content=out
)
print(f"Final Aggregated Response: {final_result}")
out = history_output_formatter(
conversation=self.conversation, type=self.output_type
)
def run(self, task: str) -> None:
"""
Synchronous wrapper to run the async process.
return out
Args:
task (str): The task for the mixture of agents.
"""
def run(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
):
try:
self.conversation.add("user", task)
for _ in range(self.max_loops):
# Add previous context to task if available
prompt = f"History: {self.conversation.get_str()}\n\nTask: {task}"
# Run async process
asyncio.run(self._run_async(prompt))
return history_output_formatter(
conversation=self.conversation,
type=self.output_type,
)
return self._run(task=task, img=img, imgs=imgs)
except Exception as e:
logger.error(f"Error running mixture of agents: {str(e)}")
raise e
logger.error(f"Error running Mixture of Agents: {e}")
return f"Error: {e}"
def run_batched(self, tasks: List[str]) -> List[str]:
"""

Loading…
Cancel
Save