[GROUPCHAT]

pull/704/head
Kye Gomez 2 weeks ago
parent af7f52bd05
commit a6415364d6

@ -184,6 +184,7 @@ nav:
- ForestSwarm: "swarms/structs/forest_swarm.md"
- SwarmRouter: "swarms/structs/swarm_router.md"
- TaskQueueSwarm: "swarms/structs/taskqueue_swarm.md"
- SwarmRearrange: "swarms/structs/swarm_rearrange.md"
- Various Execution Methods: "swarms/structs/various_execution_methods.md"
- Workflows:
- ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md"

@ -1,231 +1,341 @@
# GroupChat Class Documentation
The GroupChat class manages multi-agent conversations with state persistence, comprehensive logging, and flexible agent configurations. It supports both Agent class instances and callable functions, making it versatile for different use cases.
# GroupChat Swarm Documentation
A production-grade multi-agent system enabling sophisticated group conversations between AI agents with customizable speaking patterns, parallel processing capabilities, and comprehensive conversation tracking.
## Advanced Configuration
### Agent Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| agent_name | str | Required | Unique identifier for the agent |
| system_prompt | str | Required | Role and behavior instructions |
| llm | Any | Required | Language model instance |
| max_loops | int | 1 | Maximum conversation turns |
| autosave | bool | False | Enable conversation saving |
| dashboard | bool | False | Enable monitoring dashboard |
| verbose | bool | True | Enable detailed logging |
| dynamic_temperature | bool | True | Enable dynamic temperature |
| retry_attempts | int | 1 | Failed request retry count |
| context_length | int | 200000 | Maximum context window |
| output_type | str | "string" | Response format type |
| streaming_on | bool | False | Enable streaming responses |
### GroupChat Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| name | str | "GroupChat" | Chat group identifier |
| description | str | "" | Purpose description |
| agents | List[Agent] | [] | Participating agents |
| speaker_fn | Callable | round_robin | Speaker selection function |
| max_turns | int | 10 | Maximum conversation turns |
## Table of Contents
- [Installation](#installation)
- [Core Concepts](#core-concepts)
- [Basic Usage](#basic-usage)
- [Advanced Configuration](#advanced-configuration)
- [Speaker Functions](#speaker-functions)
- [Response Models](#response-models)
- [Advanced Examples](#advanced-examples)
- [API Reference](#api-reference)
- [Best Practices](#best-practices)
## Installation
```bash
pip install swarms python-dotenv pydantic
```
## Attributes
| Attribute | Type | Description |
|-----------|------|-------------|
| state_path | str | Path for saving/loading chat state |
| wrapped_agents | List[AgentWrapper] | List of wrapped agent instances |
| selector_agent | AgentWrapper | Agent responsible for speaker selection |
| state | GroupChatState | Current state of the group chat |
## Methods
### Core Methods
```python
def run(self, task: str) -> str:
"""Execute the group chat conversation"""
def save_state(self) -> None:
"""Save current state to disk"""
@classmethod
def load_state(cls, state_path: str) -> 'GroupChat':
"""Load GroupChat from saved state"""
def get_conversation_summary(self) -> Dict[str, Any]:
"""Return a summary of the conversation"""
def export_conversation(self, format: str = "json") -> Union[str, Dict]:
"""Export the conversation in specified format"""
```bash
pip3 install swarms swarm-models loguru
```
### Internal Methods
```python
def _log_interaction(self, agent_name: str, position: int, input_text: str, output_text: str) -> None:
"""Log a single interaction"""
## Core Concepts
def _add_message(self, role: str, content: str) -> None:
"""Add a message to the conversation history"""
The GroupChat system consists of several key components:
def select_next_speaker(self, last_speaker: AgentWrapper) -> AgentWrapper:
"""Select the next speaker using the selector agent"""
```
1. **Agents**: Individual AI agents with specialized knowledge and roles
2. **Speaker Functions**: Control mechanisms for conversation flow
3. **Chat History**: Structured conversation tracking
4. **Response Models**: Pydantic models for data validation
## Usage Examples
## Basic Usage
### 1. Basic Setup with Two Agents
```python
import os
from swarms import Agent
from dotenv import load_dotenv
from swarm_models import OpenAIChat
from swarms import Agent, GroupChat
from loguru import logger
# Initialize OpenAI
# Load environment variables
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
model = OpenAIChat(openai_api_key=api_key, model_name="gpt-4-mini")
# Create agents
analyst = Agent(
agent_name="Financial-Analyst",
system_prompt="You are a financial analyst...",
llm=model
# Initialize LLM
model = OpenAIChat(
openai_api_key=api_key,
model_name="gpt-4o-mini",
temperature=0.1
)
advisor = Agent(
agent_name="Investment-Advisor",
system_prompt="You are an investment advisor...",
llm=model
# Create financial analyst agent
financial_analyst = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt="You are a financial analyst specializing in investment strategies.",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
retry_attempts=1,
context_length=200000,
output_type="string"
)
# Create tax advisor agent
tax_advisor = Agent(
agent_name="Tax-Adviser-Agent",
system_prompt="You are a tax adviser providing clear tax guidance.",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
retry_attempts=1,
context_length=200000,
output_type="string"
)
# Create group chat
# Initialize group chat
chat = GroupChat(
name="Investment Team",
agents=[analyst, advisor],
max_rounds=5,
group_objective="Provide investment advice"
name="Investment Advisory",
description="Financial and tax analysis group",
agents=[financial_analyst, tax_advisor],
speaker_fn=expertise_based
)
response = chat.run("What's the best investment strategy for retirement?")
# Run conversation
history = chat.run("How to optimize tax strategy for investments?")
```
### 2. Advanced Setup with State Management
```python
# Create group chat with state persistence
chat = GroupChat(
name="Investment Advisory Team",
description="Expert team for financial planning",
agents=[analyst, advisor, tax_specialist],
max_rounds=10,
admin_name="Senior Advisor",
group_objective="Provide comprehensive financial planning",
state_path="investment_chat_state.json",
rules="1. Always provide sources\n2. Be concise\n3. Focus on practical advice"
)
## Speaker Functions
# Run chat and save state
response = chat.run("Create a retirement plan for a 35-year old")
chat.save_state()
# Load existing chat state
loaded_chat = GroupChat.load_state("investment_chat_state.json")
```
### Built-in Functions
### 3. Using Custom Callable Agents
```python
def custom_agent(input_text: str) -> str:
# Custom logic here
return f"Processed: {input_text}"
def round_robin(history: List[str], agent: Agent) -> bool:
"""
Enables agents to speak in turns.
Returns True for each agent in sequence.
"""
return True
def expertise_based(history: List[str], agent: Agent) -> bool:
"""
Enables agents to speak based on their expertise.
Returns True if agent's role matches conversation context.
"""
return agent.system_prompt.lower() in history[-1].lower() if history else True
def random_selection(history: List[str], agent: Agent) -> bool:
"""
Randomly selects speaking agents.
Returns True/False with 50% probability.
"""
import random
return random.choice([True, False])
def most_recent(history: List[str], agent: Agent) -> bool:
"""
Enables agents to respond to their mentions.
Returns True if agent was last speaker.
"""
return agent.agent_name == history[-1].split(":")[0].strip() if history else True
```
# Mix of regular agents and callable functions
### Custom Speaker Function Example
```python
def custom_speaker(history: List[str], agent: Agent) -> bool:
"""
Custom speaker function with complex logic.
Args:
history: Previous conversation messages
agent: Current agent being evaluated
Returns:
bool: Whether agent should speak
"""
# No history - let everyone speak
if not history:
return True
last_message = history[-1].lower()
# Check for agent expertise keywords
expertise_relevant = any(
keyword in last_message
for keyword in agent.expertise_keywords
)
# Check for direct mentions
mentioned = agent.agent_name.lower() in last_message
# Check if agent hasn't spoken recently
not_recent_speaker = not any(
agent.agent_name in msg
for msg in history[-3:]
)
return expertise_relevant or mentioned or not_recent_speaker
# Usage
chat = GroupChat(
name="Hybrid Team",
agents=[analyst, custom_agent],
max_rounds=3
agents=[agent1, agent2],
speaker_fn=custom_speaker
)
```
### 4. Export and Analysis
```python
# Run chat
chat.run("Analyze market conditions")
## Response Models
# Get summary
summary = chat.get_conversation_summary()
print(summary)
### Complete Schema
# Export in different formats
json_conv = chat.export_conversation(format="json")
text_conv = chat.export_conversation(format="text")
```python
class AgentResponse(BaseModel):
"""Individual agent response in a conversation turn"""
agent_name: str
role: str
message: str
timestamp: datetime = Field(default_factory=datetime.now)
turn_number: int
preceding_context: List[str] = Field(default_factory=list)
class ChatTurn(BaseModel):
"""Single turn in the conversation"""
turn_number: int
responses: List[AgentResponse]
task: str
timestamp: datetime = Field(default_factory=datetime.now)
class ChatHistory(BaseModel):
"""Complete conversation history"""
turns: List[ChatTurn]
total_messages: int
name: str
description: str
start_time: datetime = Field(default_factory=datetime.now)
```
### 5. Advanced Configuration with Custom Selector
```python
class CustomSelector(Agent):
def run(self, input_text: str) -> str:
# Custom selection logic
return "Financial-Analyst"
## Advanced Examples
chat = GroupChat(
name="Custom Selection Team",
agents=[analyst, advisor],
selector_agent=CustomSelector(
agent_name="Custom-Selector",
system_prompt="Select the next speaker based on expertise",
llm=model
),
max_rounds=5
)
```
### Multi-Agent Analysis Team
### 6. Debugging Setup
```python
import logging
# Create specialized agents
data_analyst = Agent(
agent_name="Data-Analyst",
system_prompt="You analyze numerical data and patterns",
llm=model
)
# Configure logging
logging.basicConfig(level=logging.DEBUG)
market_expert = Agent(
agent_name="Market-Expert",
system_prompt="You provide market insights and trends",
llm=model
)
chat = GroupChat(
name="Debug Team",
agents=[analyst, advisor],
max_rounds=3,
state_path="debug_chat.json"
strategy_advisor = Agent(
agent_name="Strategy-Advisor",
system_prompt="You formulate strategic recommendations",
llm=model
)
# Run with detailed logging
try:
response = chat.run("Complex query")
except Exception as e:
logger.error(f"Chat failed: {str(e)}")
# Access last successful state
state = chat.state
```
# Create analysis team
analysis_team = GroupChat(
name="Market Analysis Team",
description="Comprehensive market analysis group",
agents=[data_analyst, market_expert, strategy_advisor],
speaker_fn=expertise_based,
max_turns=15
)
## Error Handling
# Run complex analysis
history = analysis_team.run("""
Analyze the current market conditions:
1. Identify key trends
2. Evaluate risks
3. Recommend investment strategy
""")
```
The GroupChat class includes comprehensive error handling:
### Parallel Processing
```python
try:
chat = GroupChat(agents=[analyst]) # Will raise ValueError
except ValueError as e:
print("Configuration error:", str(e))
try:
response = chat.run("Query")
except Exception as e:
# Access error state
error_summary = chat.get_conversation_summary()
print("Execution error:", str(e))
print("State at error:", error_summary)
# Define multiple analysis tasks
tasks = [
"Analyze tech sector trends",
"Evaluate real estate market",
"Review commodity prices",
"Assess global economic indicators"
]
# Run tasks concurrently
histories = chat.concurrent_run(tasks)
# Process results
for task, history in zip(tasks, histories):
print(f"\nAnalysis for: {task}")
for turn in history.turns:
for response in turn.responses:
print(f"{response.agent_name}: {response.message}")
```
## Best Practices
1. **State Management**:
- Always specify a `state_path` for important conversations
- Use `save_state()` after critical operations
- Implement regular state backups for long conversations
2. **Agent Configuration**:
- Provide clear system prompts for each agent
- Use descriptive agent names
- Consider agent expertise when setting the group objective
3. **Performance**:
- Keep `max_rounds` reasonable (5-10 for most cases)
- Use early stopping conditions when possible
- Monitor conversation length and complexity
4. **Error Handling**:
- Always wrap chat execution in try-except blocks
- Implement proper logging
- Save states before potentially risky operations
## Limitations
- Agents must either have a `run` method or be callable
- State files can grow large with many interactions
- Selector agent may need optimization for large agent groups
- Real-time streaming not supported in basic configuration
1. **Agent Design**
- Give agents clear, specific roles
- Use detailed system prompts
- Set appropriate context lengths
- Enable retries for reliability
2. **Speaker Functions**
- Match function to use case
- Consider conversation flow
- Handle edge cases
- Add appropriate logging
3. **Error Handling**
- Use try-except blocks
- Log errors appropriately
- Implement retry logic
- Provide fallback responses
4. **Performance**
- Use concurrent processing for multiple tasks
- Monitor context lengths
- Implement proper cleanup
- Cache responses when appropriate
## API Reference
### GroupChat Methods
| Method | Description | Arguments | Returns |
|--------|-------------|-----------|---------|
| run | Run single conversation | task: str | ChatHistory |
| batched_run | Run multiple sequential tasks | tasks: List[str] | List[ChatHistory] |
| concurrent_run | Run multiple parallel tasks | tasks: List[str] | List[ChatHistory] |
| get_recent_messages | Get recent messages | n: int = 3 | List[str] |
### Agent Methods
| Method | Description | Returns |
|--------|-------------|---------|
| run | Process single task | str |
| generate_response | Generate LLM response | str |
| save_context | Save conversation context | None |

@ -0,0 +1,334 @@
# SwarmRearrange Documentation
SwarmRearrange is a class for orchestrating multiple swarms in a sequential or parallel flow pattern. It provides thread-safe operations for managing swarm execution, history tracking, and flow validation.
## Constructor Arguments
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| id | str | UUID | Unique identifier for the swarm arrangement |
| name | str | "SwarmRearrange" | Name of the swarm arrangement |
| description | str | "A swarm of swarms..." | Description of the arrangement |
| swarms | List[Any] | [] | List of swarm objects to be managed |
| flow | str | None | Flow pattern for swarm execution |
| max_loops | int | 1 | Maximum number of execution loops |
| verbose | bool | True | Enable detailed logging |
| human_in_the_loop | bool | False | Enable human intervention |
| custom_human_in_the_loop | Callable | None | Custom function for human interaction |
| return_json | bool | False | Return results in JSON format |
## Methods
### add_swarm(swarm: Any)
Adds a single swarm to the arrangement.
### remove_swarm(swarm_name: str)
Removes a swarm by name from the arrangement.
### add_swarms(swarms: List[Any])
Adds multiple swarms to the arrangement.
### validate_flow()
Validates the flow pattern syntax and swarm names.
### run(task: str = None, img: str = None, custom_tasks: Dict[str, str] = None)
Executes the swarm arrangement according to the flow pattern.
## Flow Pattern Syntax
The flow pattern uses arrow notation (`->`) to define execution order:
- Sequential: `"SwarmA -> SwarmB -> SwarmC"`
- Parallel: `"SwarmA, SwarmB -> SwarmC"`
- Human intervention: Use `"H"` in the flow
## Examples
### Basic Sequential Flow
```python
from swarms.structs.swarm_arange import SwarmRearrange
import os
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat
# model = Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"))
company = "TGSC"
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize the Managing Director agent
managing_director = Agent(
agent_name="Managing-Director",
system_prompt=f"""
As the Managing Director at Blackstone, your role is to oversee the entire investment analysis process for potential acquisitions.
Your responsibilities include:
1. Setting the overall strategy and direction for the analysis
2. Coordinating the efforts of the various team members and ensuring a comprehensive evaluation
3. Reviewing the findings and recommendations from each team member
4. Making the final decision on whether to proceed with the acquisition
For the current potential acquisition of {company}, direct the tasks for the team to thoroughly analyze all aspects of the company, including its financials, industry position, technology, market potential, and regulatory compliance. Provide guidance and feedback as needed to ensure a rigorous and unbiased assessment.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="managing-director.json",
)
# Initialize the Vice President of Finance
vp_finance = Agent(
agent_name="VP-Finance",
system_prompt=f"""
As the Vice President of Finance at Blackstone, your role is to lead the financial analysis of potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a thorough review of {company}' financial statements, including income statements, balance sheets, and cash flow statements
2. Analyzing key financial metrics such as revenue growth, profitability margins, liquidity ratios, and debt levels
3. Assessing the company's historical financial performance and projecting future performance based on assumptions and market conditions
4. Identifying any financial risks or red flags that could impact the acquisition decision
5. Providing a detailed report on your findings and recommendations to the Managing Director
Be sure to consider factors such as the sustainability of {company}' business model, the strength of its customer base, and its ability to generate consistent cash flows. Your analysis should be data-driven, objective, and aligned with Blackstone's investment criteria.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="vp-finance.json",
)
# Initialize the Industry Analyst
industry_analyst = Agent(
agent_name="Industry-Analyst",
system_prompt=f"""
As the Industry Analyst at Blackstone, your role is to provide in-depth research and analysis on the industries and markets relevant to potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a comprehensive analysis of the industrial robotics and automation solutions industry, including market size, growth rates, key trends, and future prospects
2. Identifying the major players in the industry and assessing their market share, competitive strengths and weaknesses, and strategic positioning
3. Evaluating {company}' competitive position within the industry, including its market share, differentiation, and competitive advantages
4. Analyzing the key drivers and restraints for the industry, such as technological advancements, labor costs, regulatory changes, and economic conditions
5. Identifying potential risks and opportunities for {company} based on the industry analysis, such as disruptive technologies, emerging markets, or shifts in customer preferences
Your analysis should provide a clear and objective assessment of the attractiveness and future potential of the industrial robotics industry, as well as {company}' positioning within it. Consider both short-term and long-term factors, and provide evidence-based insights to inform the investment decision.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="industry-analyst.json",
)
# Initialize the Technology Expert
tech_expert = Agent(
agent_name="Tech-Expert",
system_prompt=f"""
As the Technology Expert at Blackstone, your role is to assess the technological capabilities, competitive advantages, and potential risks of companies being considered for acquisition.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a deep dive into {company}' proprietary technologies, including its robotics platforms, automation software, and AI capabilities
2. Assessing the uniqueness, scalability, and defensibility of {company}' technology stack and intellectual property
3. Comparing {company}' technologies to those of its competitors and identifying any key differentiators or technology gaps
4. Evaluating {company}' research and development capabilities, including its innovation pipeline, engineering talent, and R&D investments
5. Identifying any potential technology risks or disruptive threats that could impact {company}' long-term competitiveness, such as emerging technologies or expiring patents
Your analysis should provide a comprehensive assessment of {company}' technological strengths and weaknesses, as well as the sustainability of its competitive advantages. Consider both the current state of its technology and its future potential in light of industry trends and advancements.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="tech-expert.json",
)
# Initialize the Market Researcher
market_researcher = Agent(
agent_name="Market-Researcher",
system_prompt=f"""
As the Market Researcher at Blackstone, your role is to analyze the target company's customer base, market share, and growth potential to assess the commercial viability and attractiveness of the potential acquisition.
For the current potential acquisition of {company}, your tasks include:
1. Analyzing {company}' current customer base, including customer segmentation, concentration risk, and retention rates
2. Assessing {company}' market share within its target markets and identifying key factors driving its market position
3. Conducting a detailed market sizing and segmentation analysis for the industrial robotics and automation markets, including identifying high-growth segments and emerging opportunities
4. Evaluating the demand drivers and sales cycles for {company}' products and services, and identifying any potential risks or limitations to adoption
5. Developing financial projections and estimates for {company}' revenue growth potential based on the market analysis and assumptions around market share and penetration
Your analysis should provide a data-driven assessment of the market opportunity for {company} and the feasibility of achieving our investment return targets. Consider both bottom-up and top-down market perspectives, and identify any key sensitivities or assumptions in your projections.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="market-researcher.json",
)
# Initialize the Regulatory Specialist
regulatory_specialist = Agent(
agent_name="Regulatory-Specialist",
system_prompt=f"""
As the Regulatory Specialist at Blackstone, your role is to identify and assess any regulatory risks, compliance requirements, and potential legal liabilities associated with potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Identifying all relevant regulatory bodies and laws that govern the operations of {company}, including industry-specific regulations, labor laws, and environmental regulations
2. Reviewing {company}' current compliance policies, procedures, and track record to identify any potential gaps or areas of non-compliance
3. Assessing the potential impact of any pending or proposed changes to relevant regulations that could affect {company}' business or create additional compliance burdens
4. Evaluating the potential legal liabilities and risks associated with {company}' products, services, and operations, including product liability, intellectual property, and customer contracts
5. Providing recommendations on any regulatory or legal due diligence steps that should be taken as part of the acquisition process, as well as any post-acquisition integration considerations
Your analysis should provide a comprehensive assessment of the regulatory and legal landscape surrounding {company}, and identify any material risks or potential deal-breakers. Consider both the current state and future outlook, and provide practical recommendations to mitigate identified risks.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="regulatory-specialist.json",
)
# Create a list of agents
agents = [
managing_director,
vp_finance,
industry_analyst,
tech_expert,
market_researcher,
regulatory_specialist,
]
# Define multiple flow patterns
flows = [
"Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Managing-Director -> VP-Finance",
"Managing-Director -> VP-Finance -> Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist",
"Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Industry-Analyst -> Managing-Director -> VP-Finance",
]
# Create instances of AgentRearrange for each flow pattern
blackstone_acquisition_analysis = AgentRearrange(
name="Blackstone-Acquisition-Analysis",
description="A system for analyzing potential acquisitions",
agents=agents,
flow=flows[0],
)
blackstone_investment_strategy = AgentRearrange(
name="Blackstone-Investment-Strategy",
description="A system for evaluating investment opportunities",
agents=agents,
flow=flows[1],
)
blackstone_market_analysis = AgentRearrange(
name="Blackstone-Market-Analysis",
description="A system for analyzing market trends and opportunities",
agents=agents,
flow=flows[2],
)
swarm_arrange = SwarmRearrange(
swarms=[
blackstone_acquisition_analysis,
blackstone_investment_strategy,
blackstone_market_analysis,
],
flow=f"{blackstone_acquisition_analysis.name} -> {blackstone_investment_strategy.name} -> {blackstone_market_analysis.name}",
)
print(
swarm_arrange.run(
"Analyze swarms, 150k revenue with 45m+ agents build, with 1.4m downloads since march 2024"
)
)
```
### Human-in-the-Loop
```python
def custom_human_input(task):
return input(f"Review {task} and provide feedback: ")
# Create arrangement with human intervention
arrangement = SwarmRearrange(
name="HumanAugmented",
swarms=[swarm1, swarm2],
flow="SwarmA -> H -> SwarmB",
human_in_the_loop=True,
custom_human_in_the_loop=custom_human_input
)
# Execute with human intervention
result = arrangement.run("Initial task")
```
### Complex Multi-Stage Pipeline
```python
# Define multiple flow patterns
flows = [
"Collector -> Processor -> Analyzer",
"Analyzer -> ML -> Validator",
"Validator -> Reporter"
]
# Create arrangements for each flow
pipelines = [
SwarmRearrange(name=f"Pipeline{i}", swarms=swarms, flow=flow)
for i, flow in enumerate(flows)
]
# Create master arrangement
master = SwarmRearrange(
name="MasterPipeline",
swarms=pipelines,
flow="Pipeline0 -> Pipeline1 -> Pipeline2"
)
# Execute complete pipeline
result = master.run("Start analysis")
```
## Best Practices
1. **Flow Validation**: Always validate flows before execution
2. **Error Handling**: Implement try-catch blocks around run() calls
3. **History Tracking**: Use track_history() for monitoring swarm execution
4. **Resource Management**: Set appropriate max_loops to prevent infinite execution
5. **Logging**: Enable verbose mode during development for detailed logging
## Error Handling
The class implements comprehensive error handling:
```python
try:
arrangement = SwarmRearrange(swarms=swarms, flow=flow)
result = arrangement.run(task)
except ValueError as e:
logger.error(f"Flow validation error: {e}")
except Exception as e:
logger.error(f"Execution error: {e}")
```

@ -7,21 +7,17 @@ from swarms.prompts.finance_agent_sys_prompt import (
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT
+ "Output the <DONE> token when you're done creating a portfolio of etfs, index, funds, and more for AI",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4o",
dynamic_temperature_enabled=True,
user_name="Kye",
user_name="swarms_corp",
retry_attempts=3,
# streaming_on=True,
context_length=8192,
return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" "yaml" and
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
max_tokens=4000, # max output tokens
# interactive=True,
stopping_token="<DONE>",
saved_state_path="agent_00.json",
interactive=False,
)

@ -8,21 +8,21 @@ if __name__ == "__main__":
# Create agents
data_collector = Agent(
agent_name="Market-Data-Collector",
model_name="gpt-4o-mini",
model_name="openai/gpt-4o",
max_loops=1,
streaming_on=True,
)
trend_analyzer = Agent(
agent_name="Market-Trend-Analyzer",
model_name="gpt-4o-mini",
model_name="openai/gpt-4o",
max_loops=1,
streaming_on=True,
)
report_generator = Agent(
agent_name="Investment-Report-Generator",
model_name="gpt-4o-mini",
model_name="openai/gpt-4o",
max_loops=1,
streaming_on=True,
)

@ -1,21 +0,0 @@
from swarms.structs.swarm_arange import SwarmRearrange
from blackstone_pe.rearrange_example_blackstone import (
blackstone_acquisition_analysis,
blackstone_investment_strategy,
blackstone_market_analysis,
)
swarm_arrange = SwarmRearrange(
swarms=[
blackstone_acquisition_analysis,
blackstone_investment_strategy,
blackstone_market_analysis,
],
flow=f"{blackstone_acquisition_analysis.name} -> {blackstone_investment_strategy.name} -> {blackstone_market_analysis.name}, {blackstone_acquisition_analysis.name}",
)
print(
swarm_arrange.run(
"Analyze swarms, 150k revenue with 45m+ agents build, with 1.4m downloads since march 2024"
)
)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "6.7.5"
version = "6.7.6"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -0,0 +1,216 @@
from swarms.structs.swarm_arange import SwarmRearrange
import os
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat
# model = Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"))
company = "TGSC"
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize the Managing Director agent
managing_director = Agent(
agent_name="Managing-Director",
system_prompt=f"""
As the Managing Director at Blackstone, your role is to oversee the entire investment analysis process for potential acquisitions.
Your responsibilities include:
1. Setting the overall strategy and direction for the analysis
2. Coordinating the efforts of the various team members and ensuring a comprehensive evaluation
3. Reviewing the findings and recommendations from each team member
4. Making the final decision on whether to proceed with the acquisition
For the current potential acquisition of {company}, direct the tasks for the team to thoroughly analyze all aspects of the company, including its financials, industry position, technology, market potential, and regulatory compliance. Provide guidance and feedback as needed to ensure a rigorous and unbiased assessment.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="managing-director.json",
)
# Initialize the Vice President of Finance
vp_finance = Agent(
agent_name="VP-Finance",
system_prompt=f"""
As the Vice President of Finance at Blackstone, your role is to lead the financial analysis of potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a thorough review of {company}' financial statements, including income statements, balance sheets, and cash flow statements
2. Analyzing key financial metrics such as revenue growth, profitability margins, liquidity ratios, and debt levels
3. Assessing the company's historical financial performance and projecting future performance based on assumptions and market conditions
4. Identifying any financial risks or red flags that could impact the acquisition decision
5. Providing a detailed report on your findings and recommendations to the Managing Director
Be sure to consider factors such as the sustainability of {company}' business model, the strength of its customer base, and its ability to generate consistent cash flows. Your analysis should be data-driven, objective, and aligned with Blackstone's investment criteria.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="vp-finance.json",
)
# Initialize the Industry Analyst
industry_analyst = Agent(
agent_name="Industry-Analyst",
system_prompt=f"""
As the Industry Analyst at Blackstone, your role is to provide in-depth research and analysis on the industries and markets relevant to potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a comprehensive analysis of the industrial robotics and automation solutions industry, including market size, growth rates, key trends, and future prospects
2. Identifying the major players in the industry and assessing their market share, competitive strengths and weaknesses, and strategic positioning
3. Evaluating {company}' competitive position within the industry, including its market share, differentiation, and competitive advantages
4. Analyzing the key drivers and restraints for the industry, such as technological advancements, labor costs, regulatory changes, and economic conditions
5. Identifying potential risks and opportunities for {company} based on the industry analysis, such as disruptive technologies, emerging markets, or shifts in customer preferences
Your analysis should provide a clear and objective assessment of the attractiveness and future potential of the industrial robotics industry, as well as {company}' positioning within it. Consider both short-term and long-term factors, and provide evidence-based insights to inform the investment decision.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="industry-analyst.json",
)
# Initialize the Technology Expert
tech_expert = Agent(
agent_name="Tech-Expert",
system_prompt=f"""
As the Technology Expert at Blackstone, your role is to assess the technological capabilities, competitive advantages, and potential risks of companies being considered for acquisition.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a deep dive into {company}' proprietary technologies, including its robotics platforms, automation software, and AI capabilities
2. Assessing the uniqueness, scalability, and defensibility of {company}' technology stack and intellectual property
3. Comparing {company}' technologies to those of its competitors and identifying any key differentiators or technology gaps
4. Evaluating {company}' research and development capabilities, including its innovation pipeline, engineering talent, and R&D investments
5. Identifying any potential technology risks or disruptive threats that could impact {company}' long-term competitiveness, such as emerging technologies or expiring patents
Your analysis should provide a comprehensive assessment of {company}' technological strengths and weaknesses, as well as the sustainability of its competitive advantages. Consider both the current state of its technology and its future potential in light of industry trends and advancements.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="tech-expert.json",
)
# Initialize the Market Researcher
market_researcher = Agent(
agent_name="Market-Researcher",
system_prompt=f"""
As the Market Researcher at Blackstone, your role is to analyze the target company's customer base, market share, and growth potential to assess the commercial viability and attractiveness of the potential acquisition.
For the current potential acquisition of {company}, your tasks include:
1. Analyzing {company}' current customer base, including customer segmentation, concentration risk, and retention rates
2. Assessing {company}' market share within its target markets and identifying key factors driving its market position
3. Conducting a detailed market sizing and segmentation analysis for the industrial robotics and automation markets, including identifying high-growth segments and emerging opportunities
4. Evaluating the demand drivers and sales cycles for {company}' products and services, and identifying any potential risks or limitations to adoption
5. Developing financial projections and estimates for {company}' revenue growth potential based on the market analysis and assumptions around market share and penetration
Your analysis should provide a data-driven assessment of the market opportunity for {company} and the feasibility of achieving our investment return targets. Consider both bottom-up and top-down market perspectives, and identify any key sensitivities or assumptions in your projections.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="market-researcher.json",
)
# Initialize the Regulatory Specialist
regulatory_specialist = Agent(
agent_name="Regulatory-Specialist",
system_prompt=f"""
As the Regulatory Specialist at Blackstone, your role is to identify and assess any regulatory risks, compliance requirements, and potential legal liabilities associated with potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Identifying all relevant regulatory bodies and laws that govern the operations of {company}, including industry-specific regulations, labor laws, and environmental regulations
2. Reviewing {company}' current compliance policies, procedures, and track record to identify any potential gaps or areas of non-compliance
3. Assessing the potential impact of any pending or proposed changes to relevant regulations that could affect {company}' business or create additional compliance burdens
4. Evaluating the potential legal liabilities and risks associated with {company}' products, services, and operations, including product liability, intellectual property, and customer contracts
5. Providing recommendations on any regulatory or legal due diligence steps that should be taken as part of the acquisition process, as well as any post-acquisition integration considerations
Your analysis should provide a comprehensive assessment of the regulatory and legal landscape surrounding {company}, and identify any material risks or potential deal-breakers. Consider both the current state and future outlook, and provide practical recommendations to mitigate identified risks.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="regulatory-specialist.json",
)
# Create a list of agents
agents = [
managing_director,
vp_finance,
industry_analyst,
tech_expert,
market_researcher,
regulatory_specialist,
]
# Define multiple flow patterns
flows = [
"Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Managing-Director -> VP-Finance",
"Managing-Director -> VP-Finance -> Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist",
"Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Industry-Analyst -> Managing-Director -> VP-Finance",
]
# Create instances of AgentRearrange for each flow pattern
blackstone_acquisition_analysis = AgentRearrange(
name="Blackstone-Acquisition-Analysis",
description="A system for analyzing potential acquisitions",
agents=agents,
flow=flows[0],
)
blackstone_investment_strategy = AgentRearrange(
name="Blackstone-Investment-Strategy",
description="A system for evaluating investment opportunities",
agents=agents,
flow=flows[1],
)
blackstone_market_analysis = AgentRearrange(
name="Blackstone-Market-Analysis",
description="A system for analyzing market trends and opportunities",
agents=agents,
flow=flows[2],
)
swarm_arrange = SwarmRearrange(
swarms=[
blackstone_acquisition_analysis,
blackstone_investment_strategy,
blackstone_market_analysis,
],
flow=f"{blackstone_acquisition_analysis.name} -> {blackstone_investment_strategy.name} -> {blackstone_market_analysis.name}",
)
print(
swarm_arrange.run(
"Analyze swarms, 150k revenue with 45m+ agents build, with 1.4m downloads since march 2024"
)
)

@ -12,7 +12,12 @@ from swarms.structs.graph_workflow import (
Node,
NodeType,
)
from swarms.structs.groupchat import GroupChat, GroupChatState
from swarms.structs.groupchat import (
GroupChat,
ChatHistory,
ChatTurn,
AgentResponse,
)
from swarms.structs.majority_voting import (
MajorityVoting,
majority_voting,
@ -143,5 +148,8 @@ __all__ = [
"AsyncWorkflow",
"run_agents_with_tasks_concurrently",
"showcase_available_agents",
"GroupChatState",
"GroupChat",
"ChatHistory",
"ChatTurn",
"AgentResponse",
]

@ -1,493 +1,301 @@
from typing import List, Dict, Optional, Union, Callable, Any
from pydantic import BaseModel, Field
import concurrent.futures
from datetime import datetime
import json
from uuid import uuid4
import logging
from swarms.structs.agent import Agent
from swarms.structs.agents_available import showcase_available_agents
from typing import Callable, List
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from loguru import logger
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
class Message(BaseModel):
"""Single message in the conversation"""
class AgentResponse(BaseModel):
agent_name: str
role: str
content: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
message: str
timestamp: datetime = Field(default_factory=datetime.now)
turn_number: int
preceding_context: List[str] = Field(default_factory=list)
class AgentMetadata(BaseModel):
"""Metadata for tracking agent state and configuration"""
class ChatTurn(BaseModel):
turn_number: int
responses: List[AgentResponse]
task: str
timestamp: datetime = Field(default_factory=datetime.now)
agent_name: str
agent_type: str
system_prompt: Optional[str] = None
description: Optional[str] = None
config: Dict[str, Any] = Field(default_factory=dict)
class ChatHistory(BaseModel):
turns: List[ChatTurn]
total_messages: int
name: str
description: str
start_time: datetime = Field(default_factory=datetime.now)
class InteractionLog(BaseModel):
"""Log entry for a single interaction"""
id: str = Field(default_factory=lambda: uuid4().hex)
agent_name: str
position: int
input_text: str
output_text: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
metadata: Dict[str, Any] = Field(default_factory=dict)
SpeakerFunction = Callable[[List[str], "Agent"], bool]
class GroupChatState(BaseModel):
"""Complete state of the group chat"""
def round_robin(history: List[str], agent: Agent) -> bool:
"""
Round robin speaker function.
Each agent speaks in turn, in a circular order.
"""
return True
id: str = Field(default_factory=lambda: uuid4().hex)
name: Optional[str] = None
description: Optional[str] = None
admin_name: str
group_objective: str
max_rounds: int
rules: Optional[str] = None
agent_metadata: List[AgentMetadata]
messages: List[Message]
interactions: List[InteractionLog]
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
def expertise_based(history: List[str], agent: Agent) -> bool:
"""
Expertise based speaker function.
An agent speaks if their system prompt is in the last message.
"""
return (
agent.system_prompt.lower() in history[-1].lower()
if history
else True
)
# Todo:
# Build a function that prompts the llm to output the
# [Agent-Name] in square brackets and then the question or something
# An agentic Language notation
def random_selection(history: List[str], agent: Agent) -> bool:
"""
Random selection speaker function.
An agent speaks randomly.
"""
import random
class AgentWrapper:
"""Wrapper class to standardize agent interfaces"""
return random.choice([True, False])
def __init__(
self,
agent: Union["Agent", Callable],
agent_name: str,
system_prompt: Optional[str] = None,
):
self.agent = agent
self.agent_name = agent_name
self.system_prompt = system_prompt
self._validate_agent()
def _validate_agent(self):
"""Validate that the agent has the required interface"""
if hasattr(self.agent, "run"):
self.run = self.agent.run
elif callable(self.agent):
self.run = self.agent
else:
raise ValueError(
"Agent must either have a 'run' method or be callable"
)
def get_metadata(self) -> AgentMetadata:
"""Extract metadata from the agent"""
return AgentMetadata(
agent_name=self.agent_name,
agent_type=type(self.agent).__name__,
system_prompt=self.system_prompt,
config={
k: v
for k, v in self.agent.__dict__.items()
if isinstance(v, (str, int, float, bool, dict, list))
},
)
def most_recent(history: List[str], agent: Agent) -> bool:
"""
Most recent speaker function.
An agent speaks if they are the last speaker.
"""
return (
agent.agent_name == history[-1].split(":")[0].strip()
if history
else True
)
class GroupChat:
"""Enhanced GroupChat manager with state persistence and comprehensive logging.
This class implements a multi-agent chat system with the following key features:
- State persistence to disk
- Comprehensive interaction logging
- Configurable agent selection
- Early stopping conditions
- Conversation export capabilities
The GroupChat coordinates multiple agents to have a goal-directed conversation,
with one agent speaking at a time based on a selector agent's decisions.
Attributes:
name (Optional[str]): Name of the group chat
description (Optional[str]): Description of the group chat's purpose
agents (List[Union["Agent", Callable]]): List of participating agents
max_rounds (int): Maximum number of conversation rounds
admin_name (str): Name of the administrator
group_objective (str): The goal/objective of the conversation
selector_agent (Union["Agent", Callable]): Agent that selects next speaker
rules (Optional[str]): Rules governing the conversation
state_path (Optional[str]): Path to save conversation state
showcase_agents_on (bool): Whether to showcase agent capabilities
"""
GroupChat class to enable multiple agents to communicate in a synchronous group chat.
Each agent is aware of all other agents, every message exchanged, and the social context.
"""
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
agents: List[Union["Agent", Callable]] = None,
max_rounds: int = 10,
admin_name: str = "Admin",
group_objective: str = None,
selector_agent: Union["Agent", Callable] = None,
rules: Optional[str] = None,
state_path: Optional[str] = None,
showcase_agents_on: bool = False,
name: str = "GroupChat",
description: str = "A group chat for multiple agents",
agents: List[Agent] = [],
speaker_fn: SpeakerFunction = round_robin,
max_turns: int = 10,
):
"""Initialize a new GroupChat instance.
"""
Initialize the GroupChat.
Args:
name: Name of the group chat
description: Description of the group chat's purpose
agents: List of participating agents
max_rounds: Maximum number of conversation rounds
admin_name: Name of the administrator
group_objective: The goal/objective of the conversation
selector_agent: Agent that selects next speaker
rules: Rules governing the conversation
state_path: Path to save conversation state
showcase_agents_on: Whether to showcase agent capabilities
Raises:
ValueError: If no agents are provided
name (str): Name of the group chat.
description (str): Description of the purpose of the group chat.
agents (List[Agent]): A list of agents participating in the chat.
speaker_fn (SpeakerFunction): The function to determine which agent should speak next.
max_turns (int): Maximum number of turns in the chat.
"""
self.name = name
self.description = description
self.agents = agents
self.max_rounds = max_rounds
self.admin_name = admin_name
self.group_objective = group_objective
self.selector_agent = selector_agent
self.rules = rules
self.state_path = state_path
self.showcase_agents_on = showcase_agents_on
if not agents:
raise ValueError("At least two agents are required")
# Generate unique state path if not provided
self.state_path = (
state_path or f"group_chat_{uuid4().hex}.json"
)
# Wrap all agents to standardize interface
self.wrapped_agents = [
AgentWrapper(
agent,
(
f"Agent_{i}"
if not hasattr(agent, "agent_name")
else agent.agent_name
),
)
for i, agent in enumerate(agents)
]
# Configure selector agent
self.selector_agent = AgentWrapper(
selector_agent or self.wrapped_agents[0].agent,
"Selector",
"Select the next speaker based on the conversation context",
)
# Initialize conversation state
self.state = GroupChatState(
self.speaker_fn = speaker_fn
self.max_turns = max_turns
self.chat_history = ChatHistory(
turns=[],
total_messages=0,
name=name,
description=description,
admin_name=admin_name,
group_objective=group_objective,
max_rounds=max_rounds,
rules=rules,
agent_metadata=[
agent.get_metadata() for agent in self.wrapped_agents
],
messages=[],
interactions=[],
)
# Showcase agents if enabled
if self.showcase_agents_on is True:
self.showcase_agents()
def showcase_agents(self):
"""Showcase available agents and update their system prompts.
This method displays agent capabilities and updates each agent's
system prompt with information about other agents in the group.
def _get_response_sync(
self, agent: Agent, prompt: str, turn_number: int
) -> AgentResponse:
"""
out = showcase_available_agents(
name=self.name,
description=self.description,
agents=self.wrapped_agents,
)
for agent in self.wrapped_agents:
# Initialize system_prompt if None
if agent.system_prompt is None:
agent.system_prompt = ""
agent.system_prompt += out
def save_state(self) -> None:
"""Save current conversation state to disk.
The state is saved as a JSON file at the configured state_path.
"""
with open(self.state_path, "w") as f:
json.dump(self.state.dict(), f, default=str, indent=2)
logger.info(f"State saved to {self.state_path}")
@classmethod
def load_state(cls, state_path: str) -> "GroupChat":
"""Load GroupChat from saved state.
Get the response from an agent synchronously.
Args:
state_path: Path to the saved state JSON file
agent (Agent): The agent responding.
prompt (str): The message triggering the response.
turn_number (int): The current turn number.
Returns:
GroupChat: A new GroupChat instance with restored state
Raises:
FileNotFoundError: If state file doesn't exist
json.JSONDecodeError: If state file is invalid JSON
"""
with open(state_path, "r") as f:
state_dict = json.load(f)
# Convert loaded data back to state model
state = GroupChatState(**state_dict)
# Initialize with minimal config, then restore state
instance = cls(
name=state.name,
admin_name=state.admin_name,
agents=[], # Temporary empty list
group_objective=state.group_objective,
)
instance.state = state
return instance
def _log_interaction(
self,
agent_name: str,
position: int,
input_text: str,
output_text: str,
) -> None:
"""Log a single interaction in the conversation.
Args:
agent_name: Name of the speaking agent
position: Position in conversation sequence
input_text: Input context provided to agent
output_text: Agent's response
AgentResponse: The agent's response captured in a structured format.
"""
log_entry = InteractionLog(
agent_name=agent_name,
position=position,
input_text=input_text,
output_text=output_text,
metadata={
"current_agents": [
a.agent_name for a in self.wrapped_agents
],
"round": position // len(self.wrapped_agents),
},
)
self.state.interactions.append(log_entry)
self.save_state()
def _add_message(self, role: str, content: str) -> None:
"""Add a message to the conversation history.
try:
context = f"""You are {agent.name} with role: {agent.system_prompt}.
Other agents: {[a.name for a in self.agents if a != agent]}
Previous messages: {[t.responses[-3:] for t in self.chat_history.turns[-3:]]}"""
message = agent.run(context + prompt)
return AgentResponse(
agent_name=agent.name,
role=agent.system_prompt,
message=message,
turn_number=turn_number,
preceding_context=self.get_recent_messages(3),
)
except Exception as e:
logger.error(f"Error from {agent.name}: {e}")
return AgentResponse(
agent_name=agent.name,
role=agent.system_prompt,
message=f"Error generating response: {str(e)}",
turn_number=turn_number,
preceding_context=[],
)
Args:
role: Speaker's role/name
content: Message content
def get_recent_messages(self, n: int = 3) -> List[str]:
"""
message = Message(role=role, content=content)
self.state.messages.append(message)
self.save_state()
def select_next_speaker(
self, last_speaker: AgentWrapper
) -> AgentWrapper:
"""Select the next speaker using the selector agent.
Get the most recent messages in the chat.
Args:
last_speaker: The agent who spoke last
n (int): The number of recent messages to retrieve.
Returns:
AgentWrapper: The next agent to speak
Note:
Falls back to round-robin selection if selector agent fails
List[str]: The most recent messages in the chat.
"""
conversation_history = "\n".join(
[
f"{msg.role}: {msg.content}"
for msg in self.state.messages
]
)
messages = []
for turn in self.chat_history.turns[-n:]:
for response in turn.responses:
messages.append(
f"{response.agent_name}: {response.message}"
)
return messages
selection_prompt = f"""
Current speakers: {[agent.agent_name for agent in self.wrapped_agents]}
Last speaker: {last_speaker.agent_name}
Group objective: {self.state.group_objective}
Based on the conversation history and group objective, select the next most appropriate speaker.
Only return the speaker's name.
Conversation history:
{conversation_history}
def run(self, task: str) -> ChatHistory:
"""
try:
next_speaker_name = self.selector_agent.run(
selection_prompt
).strip()
return next(
agent
for agent in self.wrapped_agents
if agent.agent_name in next_speaker_name
)
except (StopIteration, Exception) as e:
logger.warning(
f"Selector agent failed: {str(e)}. Falling back to round-robin."
)
# Fallback to round-robin if selection fails
current_idx = self.wrapped_agents.index(last_speaker)
return self.wrapped_agents[
(current_idx + 1) % len(self.wrapped_agents)
]
def run(self, task: str) -> str:
"""Execute the group chat conversation.
Run the group chat.
Args:
task: The initial task/question to discuss
task (str): The initial message to start the chat.
Returns:
str: The final response from the conversation
Raises:
Exception: If any error occurs during execution
ChatHistory: The history of the chat.
"""
try:
logger.info(f"Starting GroupChat with task: {task}")
self._add_message(self.state.admin_name, task)
current_speaker = self.wrapped_agents[0]
final_response = None
for round_num in range(self.state.max_rounds):
# Select next speaker
current_speaker = self.select_next_speaker(
current_speaker
)
logger.info(
f"Selected speaker: {current_speaker.agent_name}"
)
# Prepare context and get response
conversation_history = "\n".join(
[
f"{msg.role}: {msg.content}"
for msg in self.state.messages[
-10:
] # Last 10 messages for context
]
)
logger.info(
f"Starting chat '{self.name}' with task: {task}"
)
try:
response = current_speaker.run(
conversation_history
)
final_response = response
except Exception as e:
logger.error(
f"Agent {current_speaker.agent_name} failed: {str(e)}"
)
continue
# Log interaction and add to message history
self._log_interaction(
current_speaker.agent_name,
round_num,
conversation_history,
response,
)
self._add_message(
current_speaker.agent_name, response
for turn in range(self.max_turns):
current_turn = ChatTurn(
turn_number=turn, responses=[], task=task
)
# Optional: Add early stopping condition based on response content
if (
"TASK_COMPLETE" in response
or "CONCLUSION" in response
):
logger.info(
"Task completion detected, ending conversation"
)
break
return final_response or "No valid response generated"
for agent in self.agents:
if self.speaker_fn(
self.get_recent_messages(), agent
):
response = self._get_response_sync(
agent, task, turn
)
current_turn.responses.append(response)
self.chat_history.total_messages += 1
logger.debug(
f"Turn {turn}, {agent.name} responded"
)
self.chat_history.turns.append(current_turn)
return self.chat_history
except Exception as e:
logger.error(f"Error in GroupChat execution: {str(e)}")
raise
logger.error(f"Error in chat: {e}")
raise e
def get_conversation_summary(self) -> Dict[str, Any]:
"""Return a summary of the conversation.
def batched_run(self, tasks: List[str], *args, **kwargs):
"""
Run the group chat with a batch of tasks.
Args:
tasks (List[str]): The list of tasks to run in the chat.
Returns:
Dict containing conversation metrics and status
List[ChatHistory]: The history of each chat.
"""
return [self.run(task, *args, **kwargs) for task in tasks]
def concurrent_run(self, tasks: List[str], *args, **kwargs):
"""
return {
"id": self.state.id,
"total_interactions": len(self.state.interactions),
"participating_agents": [
agent.agent_name for agent in self.wrapped_agents
],
"conversation_length": len(self.state.messages),
"duration": (
datetime.utcnow() - self.state.created_at
).total_seconds(),
"objective_completed": any(
"TASK_COMPLETE" in msg.content
for msg in self.state.messages
),
}
def export_conversation(
self, format: str = "json"
) -> Union[str, Dict]:
"""Export the conversation in the specified format.
Run the group chat with a batch of tasks concurrently using a thread pool.
Args:
format: Output format ("json" or "text")
tasks (List[str]): The list of tasks to run in the chat.
Returns:
Union[str, Dict]: Conversation in requested format
Raises:
ValueError: If format is not supported
List[ChatHistory]: The history of each chat.
"""
if format == "json":
return self.state.dict()
elif format == "text":
return "\n".join(
[
f"{msg.role} ({msg.timestamp}): {msg.content}"
for msg in self.state.messages
]
with concurrent.futures.ThreadPoolExecutor() as executor:
return list(
executor.map(
lambda task: self.run(task, *args, **kwargs),
tasks,
)
)
else:
raise ValueError(f"Unsupported export format: {format}")
# if __name__ == "__main__":
# load_dotenv()
# # Get the OpenAI API key from the environment variable
# api_key = os.getenv("OPENAI_API_KEY")
# # Create an instance of the OpenAIChat class
# model = OpenAIChat(
# openai_api_key=api_key,
# model_name="gpt-4o-mini",
# temperature=0.1,
# )
# # Example agents
# agent1 = Agent(
# agent_name="Financial-Analysis-Agent",
# system_prompt="You are a financial analyst specializing in investment strategies.",
# llm=model,
# max_loops=1,
# autosave=False,
# dashboard=False,
# verbose=True,
# dynamic_temperature_enabled=True,
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# output_type="string",
# streaming_on=False,
# )
# agent2 = Agent(
# agent_name="Tax-Adviser-Agent",
# system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.",
# llm=model,
# max_loops=1,
# autosave=False,
# dashboard=False,
# verbose=True,
# dynamic_temperature_enabled=True,
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# output_type="string",
# streaming_on=False,
# )
# agents = [agent1, agent2]
# chat = GroupChat(
# name="Investment Advisory",
# description="Financial and tax analysis group",
# agents=agents,
# speaker_fn=expertise_based,
# )
# history = chat.run(
# "How to optimize tax strategy for investments?"
# )
# print(history.model_dump_json(indent=2))

@ -1,243 +0,0 @@
import os
import asyncio
from pydantic import BaseModel, Field
from typing import List, Dict, Any
from swarms import Agent
from dotenv import load_dotenv
from swarms.utils.formatter import formatter
# Load environment variables
load_dotenv()
# Get OpenAI API key
api_key = os.getenv("OPENAI_API_KEY")
# Define Pydantic schema for agent outputs
class AgentOutput(BaseModel):
"""Schema for capturing the output of each agent."""
agent_name: str = Field(..., description="The name of the agent")
message: str = Field(
...,
description="The agent's response or contribution to the group chat",
)
metadata: Dict[str, Any] = Field(
default_factory=dict,
description="Additional metadata about the agent's response",
)
class GroupChat:
"""
GroupChat class to enable multiple agents to communicate in an asynchronous group chat.
Each agent is aware of all other agents, every message exchanged, and the social context.
"""
def __init__(
self,
name: str,
description: str,
agents: List[Agent],
max_loops: int = 1,
):
"""
Initialize the GroupChat.
Args:
name (str): Name of the group chat.
description (str): Description of the purpose of the group chat.
agents (List[Agent]): A list of agents participating in the chat.
max_loops (int): Maximum number of loops to run through all agents.
"""
self.name = name
self.description = description
self.agents = agents
self.max_loops = max_loops
self.chat_history = (
[]
) # Stores all messages exchanged in the chat
formatter.print_panel(
f"Initialized GroupChat '{self.name}' with {len(self.agents)} agents. Max loops: {self.max_loops}",
title="Groupchat Swarm",
)
async def _agent_conversation(
self, agent: Agent, input_message: str
) -> AgentOutput:
"""
Facilitate a single agent's response to the chat.
Args:
agent (Agent): The agent responding.
input_message (str): The message triggering the response.
Returns:
AgentOutput: The agent's response captured in a structured format.
"""
formatter.print_panel(
f"Agent '{agent.agent_name}' is responding to the message: {input_message}",
title="Groupchat Swarm",
)
response = await asyncio.to_thread(agent.run, input_message)
output = AgentOutput(
agent_name=agent.agent_name,
message=response,
metadata={"context_length": agent.context_length},
)
# logger.debug(f"Agent '{agent.agent_name}' response: {response}")
return output
async def _run(self, initial_message: str) -> List[AgentOutput]:
"""
Execute the group chat asynchronously, looping through all agents up to max_loops.
Args:
initial_message (str): The initial message to start the chat.
Returns:
List[AgentOutput]: The responses of all agents across all loops.
"""
formatter.print_panel(
f"Starting group chat '{self.name}' with initial message: {initial_message}",
title="Groupchat Swarm",
)
self.chat_history.append(
{"sender": "System", "message": initial_message}
)
outputs = []
for loop in range(self.max_loops):
formatter.print_panel(
f"Group chat loop {loop + 1}/{self.max_loops}",
title="Groupchat Swarm",
)
for agent in self.agents:
# Create a custom input message for each agent, sharing the chat history and social context
input_message = (
f"Chat History:\n{self._format_chat_history()}\n\n"
f"Participants:\n"
+ "\n".join(
[
f"- {a.agent_name}: {a.system_prompt}"
for a in self.agents
]
)
+ f"\n\nNew Message: {initial_message}\n\n"
f"You are '{agent.agent_name}'. Remember to keep track of the social context, who is speaking, "
f"and respond accordingly based on your role: {agent.system_prompt}."
)
# Collect agent's response
output = await self._agent_conversation(
agent, input_message
)
outputs.append(output)
# Update chat history with the agent's response
self.chat_history.append(
{
"sender": agent.agent_name,
"message": output.message,
}
)
formatter.print_panel(
"Group chat completed. All agent responses captured.",
title="Groupchat Swarm",
)
return outputs
def run(self, task: str, *args, **kwargs):
return asyncio.run(self.run(task, *args, **kwargs))
def _format_chat_history(self) -> str:
"""
Format the chat history for agents to understand the context.
Returns:
str: The formatted chat history as a string.
"""
return "\n".join(
[
f"{entry['sender']}: {entry['message']}"
for entry in self.chat_history
]
)
def __str__(self) -> str:
"""String representation of the group chat's outputs."""
return self._format_chat_history()
def to_json(self) -> str:
"""JSON representation of the group chat's outputs."""
return [
{"sender": entry["sender"], "message": entry["message"]}
for entry in self.chat_history
]
# # Example Usage
# if __name__ == "__main__":
# load_dotenv()
# # Get the OpenAI API key from the environment variable
# api_key = os.getenv("OPENAI_API_KEY")
# # Create an instance of the OpenAIChat class
# model = OpenAIChat(
# openai_api_key=api_key,
# model_name="gpt-4o-mini",
# temperature=0.1,
# )
# # Example agents
# agent1 = Agent(
# agent_name="Financial-Analysis-Agent",
# system_prompt="You are a financial analyst specializing in investment strategies.",
# llm=model,
# max_loops=1,
# autosave=False,
# dashboard=False,
# verbose=True,
# dynamic_temperature_enabled=True,
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# output_type="string",
# streaming_on=False,
# )
# agent2 = Agent(
# agent_name="Tax-Adviser-Agent",
# system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.",
# llm=model,
# max_loops=1,
# autosave=False,
# dashboard=False,
# verbose=True,
# dynamic_temperature_enabled=True,
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# output_type="string",
# streaming_on=False,
# )
# # Create group chat
# group_chat = GroupChat(
# name="Financial Discussion",
# description="A group chat for financial analysis and tax advice.",
# agents=[agent1, agent2],
# )
# # Run the group chat
# asyncio.run(
# group_chat.run(
# "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria? What do you guys think?"
# )
# )

@ -1,19 +1,19 @@
import asyncio
import os
import threading
from concurrent.futures import ThreadPoolExecutor
import psutil
from dataclasses import dataclass
import threading
from typing import List, Any
from multiprocessing import cpu_count
import os
from typing import Any, List
import psutil
from swarms.structs.agent import Agent
from swarms.structs.omni_agent_types import AgentType
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
from swarms.structs.omni_agent_types import AgentType
def run_single_agent(agent: AgentType, task: str) -> Any:
"""Run a single agent synchronously"""

@ -45,12 +45,13 @@ class SequentialWorkflow:
self.shared_memory_system = shared_memory_system
self.reliability_check()
self.flow = self.sequential_flow()
self.agent_rearrange = AgentRearrange(
name=name,
description=description,
agents=agents,
flow=self.sequential_flow(),
flow=self.flow,
max_loops=max_loops,
output_type=output_type,
return_json=return_json,

@ -1,20 +1,115 @@
import os
from typing import List, Optional
from datetime import datetime
from dotenv import load_dotenv
from openai import OpenAI
from pydantic import BaseModel, Field
from pydantic.v1 import validator
from loguru import logger
from swarm_models import OpenAIChat
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
)
from swarm_models import OpenAIFunctionCaller, OpenAIChat
from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter
from swarms.structs.agents_available import showcase_available_agents
from swarms.structs.swarm_router import SwarmRouter, SwarmType
from loguru import logger
logger.add("swarm_builder.log", rotation="10 MB", backtrace=True)
load_dotenv()
class OpenAIFunctionCaller:
"""
A class to interact with the OpenAI API for generating text based on a system prompt and a task.
Attributes:
- system_prompt (str): The system prompt to guide the AI's response.
- api_key (str): The API key for the OpenAI service.
- temperature (float): The temperature parameter for the AI model, controlling randomness.
- base_model (BaseModel): The Pydantic model to parse the response into.
- max_tokens (int): The maximum number of tokens in the response.
- client (OpenAI): The OpenAI client instance.
"""
def __init__(
self,
system_prompt: str,
api_key: str,
temperature: float,
base_model: BaseModel,
max_tokens: int = 5000,
):
self.system_prompt = system_prompt
self.api_key = api_key
self.temperature = temperature
self.base_model = base_model
self.max_tokens = max_tokens
self.client = OpenAI(api_key=api_key)
def run(self, task: str, *args, **kwargs) -> BaseModel:
"""
Run the OpenAI model with the system prompt and task to generate a response.
Args:
- task (str): The task to be completed.
- *args: Additional positional arguments for the OpenAI API.
- **kwargs: Additional keyword arguments for the OpenAI API.
Returns:
- BaseModel: The parsed response based on the base_model.
"""
completion = self.client.beta.chat.completions.parse(
model="gpt-4o-2024-08-06",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": task},
],
response_format=self.base_model,
temperature=self.temperature,
max_tokens=self.max_tokens,
*args,
**kwargs,
)
return completion.choices[0].message.parsed
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
async def run_async(
self, task: str, *args, **kwargs
) -> BaseModel:
"""
Asynchronous version of the run method.
Args:
- task (str): The task to be completed.
- *args: Additional positional arguments for the OpenAI API.
- **kwargs: Additional keyword arguments for the OpenAI API.
Returns:
- BaseModel: The parsed response based on the base_model.
"""
completion = (
await self.client.beta.chat.completions.parse_async(
model="gpt-4o-2024-08-06",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": task},
],
response_format=self.base_model,
temperature=self.temperature,
max_tokens=self.max_tokens,
*args,
**kwargs,
)
)
return completion.choices[0].message.parsed
BOSS_SYSTEM_PROMPT = """
@ -59,29 +154,15 @@ class AgentConfig(BaseModel):
"""Configuration for an individual agent in a swarm"""
name: str = Field(
description="The name of the agent", example="Research-Agent"
description="The name of the agent",
)
description: str = Field(
description="A description of the agent's purpose and capabilities",
example="Agent responsible for researching and gathering information",
)
system_prompt: str = Field(
description="The system prompt that defines the agent's behavior",
example="You are a research agent. Your role is to gather and analyze information...",
)
@validator("name")
def validate_name(cls, v):
if not v.strip():
raise ValueError("Agent name cannot be empty")
return v.strip()
@validator("system_prompt")
def validate_system_prompt(cls, v):
if not v.strip():
raise ValueError("System prompt cannot be empty")
return v.strip()
class SwarmConfig(BaseModel):
"""Configuration for a swarm of cooperative agents"""
@ -96,7 +177,9 @@ class SwarmConfig(BaseModel):
)
agents: List[AgentConfig] = Field(
description="The list of agents that make up the swarm",
min_items=1,
)
max_loops: int = Field(
description="The maximum number of loops for the swarm to iterate on",
)
@validator("agents")
@ -106,23 +189,90 @@ class SwarmConfig(BaseModel):
return v
class AutoSwarmBuilderOutput(BaseModel):
"""A class that automatically builds and manages swarms of AI agents with enhanced error handling."""
name: Optional[str] = Field(
description="The name of the swarm",
example="DefaultSwarm",
default=None,
)
description: Optional[str] = Field(
description="The description of the swarm's purpose and capabilities",
example="Generic AI Agent Swarm",
default=None,
)
verbose: Optional[bool] = Field(
description="Whether to display verbose output",
default=None,
)
model_name: Optional[str] = Field(
description="The name of the OpenAI model to use",
default=None,
)
boss_output_schema: Optional[list] = Field(
description="The schema for the output of the BOSS system prompt",
default=None,
)
director_agents_created: Optional[SwarmConfig] = Field(
description="The agents created by the director",
default=None,
)
swarm_router_outputs: Optional[list] = Field(
description="The outputs from the swarm router",
default=None,
)
max_loops: Optional[int] = Field(
description="The maximum number of loops for the swarm to iterate on",
default=None,
)
swarm_type: Optional[SwarmType] = Field(
description="The type of swarm to build",
default=None,
)
class AutoSwarmBuilder:
"""A class that automatically builds and manages swarms of AI agents with enhanced error handling."""
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
name: Optional[str] = "autonomous-swarm-builder",
description: Optional[
str
] = "Given a task, this swarm will automatically create specialized agents and route it to the appropriate agents.",
verbose: bool = True,
api_key: Optional[str] = None,
model_name: str = "gpt-4",
model_name: str = "gpt-4o",
boss_output_schema: list = None,
swarm_router_outputs: AutoSwarmBuilderOutput = None,
max_loops: int = 1,
swarm_type: str = "SequentialWorkflow",
auto_generate_prompts_for_agents: bool = False,
shared_memory_system: callable = None,
):
self.name = name or "DefaultSwarm"
self.description = description or "Generic AI Agent Swarm"
self.verbose = verbose
self.agents_pool = []
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
self.api_key = os.getenv("OPENAI_API_KEY")
self.model_name = model_name
self.boss_output_schema = boss_output_schema
self.max_loops = max_loops
self.swarm_type = swarm_type
self.auto_generate_prompts_for_agents = (
auto_generate_prompts_for_agents
)
self.shared_memory_system = shared_memory_system
self.auto_swarm_builder_output = AutoSwarmBuilderOutput(
name=name,
description=description,
verbose=verbose,
model_name=model_name,
boss_output_schema=boss_output_schema or [],
swarm_router_outputs=swarm_router_outputs or [],
max_loops=max_loops,
swarm_type=swarm_type,
)
if not self.api_key:
raise ValueError(
@ -143,7 +293,6 @@ class AutoSwarmBuilder:
self.chat_model = OpenAIChat(
openai_api_key=self.api_key,
model_name=self.model_name,
temperature=0.1,
)
except Exception as e:
logger.error(
@ -151,11 +300,13 @@ class AutoSwarmBuilder:
)
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def run(self, task: str, image_url: Optional[str] = None) -> str:
def run(
self,
task: str,
image_url: Optional[str] = None,
*args,
**kwargs,
):
"""Run the swarm on a given task with error handling and retries."""
if not task or not task.strip():
raise ValueError("Task cannot be empty")
@ -164,7 +315,7 @@ class AutoSwarmBuilder:
try:
# Create agents for the task
agents = self._create_agents(task, image_url)
agents = self._create_agents(task)
if not agents:
raise ValueError(
"No agents were created for the task"
@ -175,20 +326,33 @@ class AutoSwarmBuilder:
"Routing task through swarm",
extra={"num_agents": len(agents)},
)
output = self.swarm_router(agents, task, image_url)
output = self.swarm_router(
agents=agents,
task=task,
image_url=image_url,
*args,
**kwargs,
)
self.auto_swarm_builder_output.swarm_router_outputs.append(
output
)
print(output)
logger.info("Swarm execution completed successfully")
return output
# return output
return self.auto_swarm_builder_output.model_dump_json(
indent=4
)
except Exception as e:
logger.error(
f"Error during swarm execution: {str(e)}",
exc_info=True,
)
raise
raise e
def _create_agents(
self, task: str, image_url: Optional[str] = None
self,
task: str,
) -> List[Agent]:
"""Create the necessary agents for a task with enhanced error handling."""
logger.info("Creating agents for task", extra={"task": task})
@ -202,7 +366,12 @@ class AutoSwarmBuilder:
)
agents_config = model.run(task)
print(f"{agents_config}")
logger.info(
f"Director has successfully created agents: {agents_config}"
)
self.auto_swarm_builder_output.director_agents_created = (
agents_config
)
if isinstance(agents_config, dict):
agents_config = SwarmConfig(**agents_config)
@ -224,15 +393,19 @@ class AutoSwarmBuilder:
)
agents.append(agent)
# Add available agents showcase to system prompts
agents_available = showcase_available_agents(
name=self.name,
description=self.description,
agents=agents,
)
print(
f"Agent created: {agent_config.name}: Description: {agent_config.description}"
)
# # Add available agents showcase to system prompts
# agents_available = showcase_available_agents(
# name=self.name,
# description=self.description,
# agents=agents,
# )
for agent in agents:
agent.system_prompt += "\n" + agents_available
# for agent in agents:
# agent.system_prompt += "\n" + agents_available
logger.info(
"Successfully created agents",
@ -251,6 +424,8 @@ class AutoSwarmBuilder:
agent_name: str,
agent_description: str,
agent_system_prompt: str,
*args,
**kwargs,
) -> Agent:
"""Build a single agent with enhanced error handling."""
logger.info(
@ -263,18 +438,11 @@ class AutoSwarmBuilder:
description=agent_description,
system_prompt=agent_system_prompt,
llm=self.chat_model,
autosave=True,
dashboard=False,
verbose=self.verbose,
dynamic_temperature_enabled=True,
saved_state_path=f"states/{agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
user_name="swarms_corp",
retry_attempts=3,
context_length=200000,
dynamic_temperature_enabled=False,
return_step_meta=False,
output_type="str",
streaming_on=False,
auto_generate_prompt=True,
streaming_on=True,
)
return agent
@ -292,7 +460,9 @@ class AutoSwarmBuilder:
self,
agents: List[Agent],
task: str,
image_url: Optional[str] = None,
img: Optional[str] = None,
*args,
**kwargs,
) -> str:
"""Route tasks between agents in the swarm with error handling and retries."""
logger.info(
@ -305,11 +475,14 @@ class AutoSwarmBuilder:
name=self.name,
description=self.description,
agents=agents,
swarm_type="auto",
swarm_type=self.swarm_type,
auto_generate_prompts=self.auto_generate_prompts_for_agents,
)
formatted_task = f"{self.name} {self.description} {task}"
result = swarm_router_instance.run(formatted_task)
# formatted_task = f"{self.name} {self.description} {task}"
result = swarm_router_instance.run(
task=task, *args, **kwargs
)
logger.info("Successfully completed swarm routing")
return result
@ -324,10 +497,10 @@ class AutoSwarmBuilder:
swarm = AutoSwarmBuilder(
name="ChipDesign-Swarm",
description="A swarm of specialized AI agents for chip design",
api_key="your-api-key", # Optional if set in environment
model_name="gpt-4", # Optional, defaults to gpt-4
swarm_type="ConcurrentWorkflow",
)
result = swarm.run(
"Design a new AI accelerator chip optimized for transformer model inference..."
)
print(result)

@ -1,14 +1,15 @@
from typing import List, Tuple, Optional
import json
from typing import List, Optional, Tuple
import numpy as np
from swarms.utils.lazy_loader import lazy_import_decorator
from pydantic import BaseModel, Field
import json
from tenacity import retry, stop_after_attempt, wait_exponential
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.auto_download_check_packages import (
auto_check_and_download_package,
)
from swarms.utils.lazy_loader import lazy_import_decorator
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="swarm_matcher")

@ -130,7 +130,7 @@ class SwarmRouter:
agents: List[Union[Agent, Callable]] = [],
swarm_type: SwarmType = "SequentialWorkflow", # "SpreadSheetSwarm" # "auto"
autosave: bool = False,
flow: str = None,
rearrange_flow: str = None,
return_json: bool = False,
auto_generate_prompts: bool = False,
shared_memory_system: Any = None,
@ -147,7 +147,7 @@ class SwarmRouter:
self.agents = agents
self.swarm_type = swarm_type
self.autosave = autosave
self.flow = flow
self.rearrange_flow = rearrange_flow
self.return_json = return_json
self.auto_generate_prompts = auto_generate_prompts
self.shared_memory_system = shared_memory_system
@ -296,7 +296,7 @@ class SwarmRouter:
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
flow=self.flow,
flow=self.rearrange_flow,
return_json=self.return_json,
output_type=self.output_type,
*args,
@ -323,11 +323,7 @@ class SwarmRouter:
*args,
**kwargs,
)
elif (
self.swarm_type == "SequentialWorkflow"
or self.swarm_type == "sequential"
or self.swarm_type == "Sequential"
):
elif self.swarm_type == "SequentialWorkflow":
return SequentialWorkflow(
name=self.name,
description=self.description,
@ -382,7 +378,7 @@ class SwarmRouter:
logger.log(level.upper(), message)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def _run(self, task: str, *args, **kwargs) -> Any:
def _run(self, task: str, img: str, *args, **kwargs) -> Any:
"""
Dynamically run the specified task on the selected or matched swarm type.
@ -402,11 +398,9 @@ class SwarmRouter:
try:
self._log(
"info",
f"Running task on {self.swarm_type} swarm",
task=task,
metadata=kwargs,
f"Running task on {self.swarm_type} swarm with task: {task}",
)
result = self.swarm.run(task, *args, **kwargs)
result = self.swarm.run(task=task, *args, **kwargs)
self._log(
"success",
@ -427,9 +421,11 @@ class SwarmRouter:
def run(
self,
task: str,
img: str = None,
device: str = "cpu",
all_cores: bool = True,
all_gpus: bool = False,
no_clusterops: bool = True,
*args,
**kwargs,
) -> Any:
@ -450,15 +446,22 @@ class SwarmRouter:
Raises:
Exception: If an error occurs during task execution.
"""
return exec_callable_with_clusterops(
func=self._run,
device=device,
all_cores=all_cores,
all_gpus=all_gpus,
task=task,
*args,
**kwargs,
)
try:
if no_clusterops:
return self._run(task=task, img=img, *args, **kwargs)
else:
return exec_callable_with_clusterops(
func=self._run,
device=device,
all_cores=all_cores,
all_gpus=all_gpus,
task=task,
*args,
**kwargs,
)
except Exception as e:
logger.error(f"Error executing task on swarm: {str(e)}")
raise
def __call__(self, task: str, *args, **kwargs) -> Any:
"""

@ -1,222 +1,147 @@
import pytest
import os
from dotenv import load_dotenv
from swarm_models import OpenAIChat
from swarm_models.anthropic import Anthropic
from swarms.structs.agent import Agent
from swarms.structs.groupchat import GroupChat, GroupChatManager
llm = OpenAIChat()
llm2 = Anthropic()
# Mock the OpenAI class for testing
class MockOpenAI:
def __init__(self, *args, **kwargs):
pass
def generate_reply(self, content):
return {"role": "mocked_agent", "content": "Mocked Reply"}
# Create fixtures for agents and a sample message
@pytest.fixture
def agent1():
return Agent(name="Agent1", llm=llm)
@pytest.fixture
def agent2():
return Agent(name="Agent2", llm=llm2)
@pytest.fixture
def sample_message():
return {"role": "Agent1", "content": "Hello, World!"}
# Test the initialization of GroupChat
def test_groupchat_initialization(agent1, agent2):
groupchat = GroupChat(agents=[agent1, agent2])
assert len(groupchat.agents) == 2
assert len(groupchat.messages) == 0
assert groupchat.max_round == 10
assert groupchat.admin_name == "Admin"
# Test resetting the GroupChat
def test_groupchat_reset(agent1, agent2, sample_message):
groupchat = GroupChat(agents=[agent1, agent2])
groupchat.messages.append(sample_message)
groupchat.reset()
assert len(groupchat.messages) == 0
# Test finding an agent by name
def test_groupchat_find_agent_by_name(agent1, agent2):
groupchat = GroupChat(agents=[agent1, agent2])
found_agent = groupchat.agent_by_name("Agent1")
assert found_agent == agent1
# Test selecting the next agent
def test_groupchat_select_next_agent(agent1, agent2):
groupchat = GroupChat(agents=[agent1, agent2])
next_agent = groupchat.next_agent(agent1)
assert next_agent == agent2
from swarms.structs.groupchat import GroupChat, expertise_based
# Add more tests for different methods and scenarios as needed
# Test the GroupChatManager
def test_groupchat_manager(agent1, agent2):
groupchat = GroupChat(agents=[agent1, agent2])
selector = agent1 # Assuming agent1 is the selector
manager = GroupChatManager(groupchat, selector)
task = "Task for agent2"
reply = manager(task)
assert reply["role"] == "Agent2"
assert reply["content"] == "Reply from Agent2"
# Test selecting the next speaker when there is only one agent
def test_groupchat_select_speaker_single_agent(agent1):
groupchat = GroupChat(agents=[agent1])
selector = agent1
manager = GroupChatManager(groupchat, selector)
task = "Task for agent1"
reply = manager(task)
assert reply["role"] == "Agent1"
assert reply["content"] == "Reply from Agent1"
# Test selecting the next speaker when GroupChat is underpopulated
def test_groupchat_select_speaker_underpopulated(agent1, agent2):
groupchat = GroupChat(agents=[agent1, agent2])
selector = agent1
manager = GroupChatManager(groupchat, selector)
task = "Task for agent1"
reply = manager(task)
assert reply["role"] == "Agent2"
assert reply["content"] == "Reply from Agent2"
# Test formatting history
def test_groupchat_format_history(agent1, agent2, sample_message):
groupchat = GroupChat(agents=[agent1, agent2])
groupchat.messages.append(sample_message)
formatted_history = groupchat.format_history(groupchat.messages)
expected_history = "'Agent1:Hello, World!"
assert formatted_history == expected_history
# Test agent names property
def test_groupchat_agent_names(agent1, agent2):
groupchat = GroupChat(agents=[agent1, agent2])
names = groupchat.agent_names
assert len(names) == 2
assert "Agent1" in names
assert "Agent2" in names
# Test GroupChatManager initialization
def test_groupchat_manager_initialization(agent1, agent2):
groupchat = GroupChat(agents=[agent1, agent2])
selector = agent1
manager = GroupChatManager(groupchat, selector)
assert manager.groupchat == groupchat
assert manager.selector == selector
# Test case to ensure GroupChatManager generates a reply from an agent
def test_groupchat_manager_generate_reply():
# Create a GroupChat with two agents
agents = [agent1, agent2]
groupchat = GroupChat(agents=agents, messages=[], max_round=10)
# Mock the OpenAI class and GroupChat selector
mocked_openai = MockOpenAI()
selector = agent1
# Initialize GroupChatManager
manager = GroupChatManager(
groupchat=groupchat, selector=selector, openai=mocked_openai
def setup_test_agents():
model = OpenAIChat(
openai_api_key=os.getenv("OPENAI_API_KEY"),
model_name="gpt-4",
temperature=0.1,
)
# Generate a reply
task = "Write me a riddle"
reply = manager(task)
return [
Agent(
agent_name="Agent1",
system_prompt="You only respond with 'A'",
llm=model,
),
Agent(
agent_name="Agent2",
system_prompt="You only respond with 'B'",
llm=model,
),
Agent(
agent_name="Agent3",
system_prompt="You only respond with 'C'",
llm=model,
),
]
def test_round_robin_speaking():
chat = GroupChat(agents=setup_test_agents())
history = chat.run("Say your letter")
# Verify agents speak in order
responses = [
r.message for t in history.turns for r in t.responses
]
assert responses == ["A", "B", "C"] * (len(history.turns))
def test_concurrent_processing():
chat = GroupChat(agents=setup_test_agents())
tasks = ["Task1", "Task2", "Task3"]
histories = chat.concurrent_run(tasks)
assert len(histories) == len(tasks)
for history in histories:
assert history.total_messages > 0
def test_expertise_based_speaking():
agents = setup_test_agents()
chat = GroupChat(agents=agents, speaker_fn=expertise_based)
# Test each agent's expertise trigger
for agent in agents:
history = chat.run(f"Trigger {agent.system_prompt}")
first_response = history.turns[0].responses[0]
assert first_response.agent_name == agent.agent_name
def test_max_turns_limit():
max_turns = 3
chat = GroupChat(agents=setup_test_agents(), max_turns=max_turns)
history = chat.run("Test message")
assert len(history.turns) == max_turns
def test_error_handling():
broken_agent = Agent(
agent_name="BrokenAgent",
system_prompt="You raise errors",
llm=None,
)
# Check if a valid reply is generated
assert "role" in reply
assert "content" in reply
assert reply["role"] in groupchat.agent_names
chat = GroupChat(agents=[broken_agent])
history = chat.run("Trigger error")
assert "Error" in history.turns[0].responses[0].message
# Test case to ensure GroupChat selects the next speaker correctly
def test_groupchat_select_speaker():
agent3 = Agent(name="agent3", llm=llm)
agents = [agent1, agent2, agent3]
groupchat = GroupChat(agents=agents, messages=[], max_round=10)
# Initialize GroupChatManager with agent1 as selector
selector = agent1
manager = GroupChatManager(groupchat=groupchat, selector=selector)
def test_conversation_context():
agents = setup_test_agents()
complex_prompt = "Previous message refers to A. Now trigger B. Finally discuss C."
# Simulate selecting the next speaker
last_speaker = agent1
next_speaker = manager.select_speaker(
last_speaker=last_speaker, selector=selector
)
chat = GroupChat(agents=agents, speaker_fn=expertise_based)
history = chat.run(complex_prompt)
# Ensure the next speaker is agent2
assert next_speaker == agent2
responses = [
r.agent_name for t in history.turns for r in t.responses
]
assert all(agent.agent_name in responses for agent in agents)
# Test case to ensure GroupChat handles underpopulated group correctly
def test_groupchat_underpopulated_group():
agent1 = Agent(name="agent1", llm=llm)
agents = [agent1]
groupchat = GroupChat(agents=agents, messages=[], max_round=10)
def test_large_agent_group():
large_group = setup_test_agents() * 5 # 15 agents
chat = GroupChat(agents=large_group)
history = chat.run("Test scaling")
# Initialize GroupChatManager with agent1 as selector
selector = agent1
manager = GroupChatManager(groupchat=groupchat, selector=selector)
assert history.total_messages > len(large_group)
# Simulate selecting the next speaker in an underpopulated group
last_speaker = agent1
next_speaker = manager.select_speaker(
last_speaker=last_speaker, selector=selector
)
# Ensure the next speaker is the same as the last speaker in an underpopulated group
assert next_speaker == last_speaker
def test_long_conversations():
chat = GroupChat(agents=setup_test_agents(), max_turns=50)
history = chat.run("Long conversation test")
assert len(history.turns) == 50
assert history.total_messages > 100
# Test case to ensure GroupChatManager handles the maximum rounds correctly
def test_groupchat_max_rounds():
agents = [agent1, agent2]
groupchat = GroupChat(agents=agents, messages=[], max_round=2)
# Initialize GroupChatManager with agent1 as selector
selector = agent1
manager = GroupChatManager(groupchat=groupchat, selector=selector)
def test_stress_batched_runs():
chat = GroupChat(agents=setup_test_agents())
tasks = ["Task"] * 100
histories = chat.batched_run(tasks)
# Simulate the conversation with max rounds
last_speaker = agent1
for _ in range(2):
next_speaker = manager.select_speaker(
last_speaker=last_speaker, selector=selector
)
last_speaker = next_speaker
assert len(histories) == len(tasks)
total_messages = sum(h.total_messages for h in histories)
assert total_messages > len(tasks) * 3
# Try one more round, should stay with the last speaker
next_speaker = manager.select_speaker(
last_speaker=last_speaker, selector=selector
)
# Ensure the next speaker is the same as the last speaker after reaching max rounds
assert next_speaker == last_speaker
if __name__ == "__main__":
load_dotenv()
functions = [
test_round_robin_speaking,
test_concurrent_processing,
test_expertise_based_speaking,
test_max_turns_limit,
test_error_handling,
test_conversation_context,
test_large_agent_group,
test_long_conversations,
test_stress_batched_runs,
]
# Continue adding more test cases as needed to cover various scenarios and functionalities of the code.
for func in functions:
try:
print(f"Running {func.__name__}...")
func()
print("✓ Passed")
except Exception as e:
print(f"✗ Failed: {str(e)}")

Loading…
Cancel
Save