[Docs][fix examples and docs] [GraphWorkflow][add_nodes]

master
Kye Gomez 2 days ago
parent c8f1d82f85
commit 30c140fbaa

@ -74,9 +74,6 @@ my_agent = Agent(
}
],
# Required: Tags and capabilities
tags=["finance", "crypto", "stocks", "analysis"],
capabilities=["market-analysis", "risk-assessment", "portfolio-optimization"]
)
```
@ -170,8 +167,6 @@ print(result)
|-------|------|-------------|
| `publish_to_marketplace` | `bool` | Set to `True` to enable publishing |
| `use_cases` | `List[Dict]` | List of use case dictionaries with `title` and `description` |
| `tags` | `List[str]` | Keywords for discovery |
| `capabilities` | `List[str]` | Agent capabilities for matching |
### Use Case Format
@ -243,31 +238,15 @@ print(response)
---
## Monetization
To create a paid agent:
```python
from swarms.utils.swarms_marketplace_utils import add_prompt_to_marketplace
response = add_prompt_to_marketplace(
name="Premium Analysis Agent",
prompt="Your premium agent prompt...",
description="Advanced analysis capabilities",
use_cases=[...],
tags="premium, advanced",
category="finance",
is_free=False, # Paid agent
price_usd=9.99 # Price per use
)
```
---
## Next Steps
- Visit [Swarms Marketplace](https://swarms.world) to browse published agents
- Learn about [Marketplace Documentation](../swarms_platform/share_and_discover.md)
- Explore [Monetization Options](../swarms_platform/monetize.md)
- See [API Key Management](../swarms_platform/apikeys.md)
| Next Step | Description |
|-----------|-------------|
| [Swarms Marketplace](https://swarms.world) | Browse published agents |
| [Marketplace Documentation](../swarms_platform/share_and_discover.md) | Learn how to publish and discover agents |
| [Monetization Options](../swarms_platform/monetize.md) | Explore ways to monetize your agent |
| [API Key Management](../swarms_platform/apikeys.md) | Manage your API keys for publishing and access |

@ -1,23 +1,22 @@
from swarms.structs.graph_workflow import GraphWorkflow
from swarms.structs.agent import Agent
agent_one = Agent(
agent_name="research_agent",
model_name="gpt-4o-mini",
agent_name="research_agent",
model_name="gpt-4o-mini",
name="Research Agent",
agent_description="Agent responsible for gathering and summarizing research information."
agent_description="Agent responsible for gathering and summarizing research information.",
)
agent_two = Agent(
agent_name="research_agent_two",
agent_name="research_agent_two",
model_name="gpt-4o-mini",
name="Analysis Agent",
agent_description="Agent that analyzes the research data provided and processes insights."
agent_description="Agent that analyzes the research data provided and processes insights.",
)
agent_three = Agent(
agent_name="research_agent_three",
agent_name="research_agent_three",
model_name="gpt-4o-mini",
agent_description="Agent tasked with structuring analysis into a final report or output."
agent_description="Agent tasked with structuring analysis into a final report or output.",
)
# Create workflow with backend selection
@ -41,4 +40,4 @@ workflow.compile()
task = "Complete a simple task"
results = workflow.run(task)
print(results)
print(results)

@ -7,7 +7,6 @@ This directory contains examples demonstrating single agent patterns, configurat
- [persistent_legal_agent.py](demos/persistent_legal_agent.py) - Legal document processing agent
## External Agents
- [custom_agent_example.py](external_agents/custom_agent_example.py) - Custom agent implementation
- [openai_assistant_wrapper.py](external_agents/openai_assistant_wrapper.py) - OpenAI Assistant integration
## LLM Integrations

@ -4,7 +4,6 @@ This directory contains examples demonstrating integration with external agent s
## Examples
- [custom_agent_example.py](custom_agent_example.py) - Custom agent implementation
- [openai_assistant_wrapper.py](openai_assistant_wrapper.py) - OpenAI Assistant integration wrapper
## Overview

@ -1,40 +0,0 @@
import os
from dotenv import load_dotenv
from swarms.structs.custom_agent import CustomAgent
load_dotenv()
# Example usage with Anthropic API
if __name__ == "__main__":
# Initialize the agent for Anthropic API
anthropic_agent = CustomAgent(
base_url="https://api.anthropic.com",
endpoint="v1/messages",
headers={
"x-api-key": os.getenv("ANTHROPIC_API_KEY"),
"anthropic-version": "2023-06-01",
},
)
# Example payload for Anthropic API
payload = {
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1000,
"messages": [
{
"role": "user",
"content": "Hello! Can you explain what artaddificial intelligence is?",
}
],
}
# Make the request
try:
response = anthropic_agent.run(payload)
print("Anthropic API Response:")
print(response)
print(type(response))
except Exception as e:
print(f"Error: {e}")

@ -1,7 +1,9 @@
import json
from swarms import Agent
blood_analysis_system_prompt = """You are a clinical laboratory data analyst assistant focused on hematology and basic metabolic panels.
blood_analysis_system_prompt = """
You are a clinical laboratory data analyst assistant focused on hematology and basic metabolic panels.
Your goals:
1) Interpret common blood test panels (CBC, CMP/BMP, lipid panel, HbA1c, thyroid panels) based on provided values, reference ranges, flags, and units.
2) Provide structured findings: out-of-range markers, degree of deviation, likely clinical significance, and differential considerations.

@ -1,27 +1,26 @@
from swarms.structs.graph_workflow import GraphWorkflow
from swarms.structs.agent import Agent
agent_one = Agent(
agent_name="research_agent",
model_name="claude-haiku-4-5",
agent_name="research_agent",
model_name="claude-haiku-4-5",
top_p=None,
temperature=None,
agent_description="Agent responsible for gathering and summarizing research information."
agent_description="Agent responsible for gathering and summarizing research information.",
)
agent_two = Agent(
agent_name="research_agent_two",
agent_name="research_agent_two",
model_name="claude-haiku-4-5",
top_p=None,
temperature=None,
agent_description="Agent that analyzes the research data provided and processes insights."
agent_description="Agent that analyzes the research data provided and processes insights.",
)
agent_three = Agent(
agent_name="research_agent_three",
agent_name="research_agent_three",
model_name="claude-haiku-4-5",
top_p=None,
temperature=None,
agent_description="Agent tasked with structuring analysis into a final report or output."
agent_description="Agent tasked with structuring analysis into a final report or output.",
)
# Create workflow with backend selection
@ -45,4 +44,4 @@ workflow.compile()
task = "Analyze the best mining companies in the US"
results = workflow.run(task)
print(results)
print(results)

@ -1,344 +0,0 @@
import json
from dataclasses import dataclass
from typing import Any, Dict, Optional, Union
import httpx
from loguru import logger
@dataclass
class AgentResponse:
"""Data class to hold agent response information"""
status_code: int
content: str
headers: Dict[str, str]
json_data: Optional[Dict[str, Any]] = None
success: bool = False
error_message: Optional[str] = None
class CustomAgent:
"""
A custom HTTP agent class for making POST requests using httpx.
Features:
- Configurable headers and payload
- Both sync and async execution
- Built-in error handling and logging
- Flexible response handling
- Name and description
"""
def __init__(
self,
name: str,
description: str,
base_url: str,
endpoint: str,
headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0,
verify_ssl: bool = True,
*args,
**kwargs,
):
"""
Initialize the Custom Agent.
Args:
base_url: Base URL for the API endpoint
endpoint: API endpoint path
headers: Default headers to include in requests
timeout: Request timeout in seconds
verify_ssl: Whether to verify SSL certificates
"""
self.base_url = base_url.rstrip("/")
self.endpoint = endpoint.lstrip("/")
self.default_headers = headers or {}
self.timeout = timeout
self.verify_ssl = verify_ssl
# Default headers
if "Content-Type" not in self.default_headers:
self.default_headers["Content-Type"] = "application/json"
logger.info(
f"CustomAgent initialized for {self.base_url}/{self.endpoint}"
)
def _prepare_headers(
self, additional_headers: Optional[Dict[str, str]] = None
) -> Dict[str, str]:
"""Merge default headers with additional headers."""
headers = self.default_headers.copy()
if additional_headers:
headers.update(additional_headers)
return headers
def _prepare_payload(
self, payload: Union[Dict, str, bytes]
) -> Union[str, bytes]:
"""Prepare the payload for the request."""
if isinstance(payload, dict):
return json.dumps(payload)
return payload
def _parse_response(
self, response: httpx.Response
) -> AgentResponse:
"""Parse httpx response into AgentResponse object."""
try:
# Try to parse JSON if possible
json_data = None
if response.headers.get("content-type", "").startswith(
"application/json"
):
try:
json_data = response.json()
except json.JSONDecodeError:
pass
return AgentResponse(
status_code=response.status_code,
content=response.text,
headers=dict(response.headers),
json_data=json_data,
success=200 <= response.status_code < 300,
error_message=(
None
if 200 <= response.status_code < 300
else f"HTTP {response.status_code}"
),
)
except Exception as e:
logger.error(f"Error parsing response: {e}")
return AgentResponse(
status_code=response.status_code,
content=response.text,
headers=dict(response.headers),
success=False,
error_message=str(e),
)
def _extract_content(self, response_data: Dict[str, Any]) -> str:
"""
Extract message content from API response, supporting multiple formats.
Args:
response_data: Parsed JSON response from API
Returns:
str: Extracted message content
"""
try:
# OpenAI format
if (
"choices" in response_data
and response_data["choices"]
):
choice = response_data["choices"][0]
if (
"message" in choice
and "content" in choice["message"]
):
return choice["message"]["content"]
elif "text" in choice:
return choice["text"]
# Anthropic format
elif (
"content" in response_data
and response_data["content"]
):
if isinstance(response_data["content"], list):
# Extract text from content blocks
text_parts = []
for content_block in response_data["content"]:
if (
isinstance(content_block, dict)
and "text" in content_block
):
text_parts.append(content_block["text"])
elif isinstance(content_block, str):
text_parts.append(content_block)
return "".join(text_parts)
elif isinstance(response_data["content"], str):
return response_data["content"]
# Generic fallback - look for common content fields
elif "text" in response_data:
return response_data["text"]
elif "message" in response_data:
return response_data["message"]
elif "response" in response_data:
return response_data["response"]
# If no known format, return the entire response as JSON string
logger.warning(
"Unknown response format, returning full response"
)
return json.dumps(response_data, indent=2)
except Exception as e:
logger.error(f"Error extracting content: {e}")
return json.dumps(response_data, indent=2)
def run(
self,
payload: Union[Dict[str, Any], str, bytes],
additional_headers: Optional[Dict[str, str]] = None,
**kwargs,
) -> str:
"""
Execute a synchronous POST request.
Args:
payload: Request body/payload
additional_headers: Additional headers for this request
**kwargs: Additional httpx client options
Returns:
str: Extracted message content from response
"""
url = f"{self.base_url}/{self.endpoint}"
request_headers = self._prepare_headers(additional_headers)
request_payload = self._prepare_payload(payload)
logger.info(f"Making POST request to: {url}")
try:
with httpx.Client(
timeout=self.timeout, verify=self.verify_ssl, **kwargs
) as client:
response = client.post(
url,
content=request_payload,
headers=request_headers,
)
if 200 <= response.status_code < 300:
logger.info(
f"Request successful: {response.status_code}"
)
try:
response_data = response.json()
return self._extract_content(response_data)
except json.JSONDecodeError:
logger.warning(
"Response is not JSON, returning raw text"
)
return response.text
else:
logger.warning(
f"Request failed: {response.status_code}"
)
return f"Error: HTTP {response.status_code} - {response.text}"
except httpx.RequestError as e:
logger.error(f"Request error: {e}")
return f"Request error: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error: {e}")
return f"Unexpected error: {str(e)}"
async def run_async(
self,
payload: Union[Dict[str, Any], str, bytes],
additional_headers: Optional[Dict[str, str]] = None,
**kwargs,
) -> str:
"""
Execute an asynchronous POST request.
Args:
payload: Request body/payload
additional_headers: Additional headers for this request
**kwargs: Additional httpx client options
Returns:
str: Extracted message content from response
"""
url = f"{self.base_url}/{self.endpoint}"
request_headers = self._prepare_headers(additional_headers)
request_payload = self._prepare_payload(payload)
logger.info(f"Making async POST request to: {url}")
try:
async with httpx.AsyncClient(
timeout=self.timeout, verify=self.verify_ssl, **kwargs
) as client:
response = await client.post(
url,
content=request_payload,
headers=request_headers,
)
if 200 <= response.status_code < 300:
logger.info(
f"Async request successful: {response.status_code}"
)
try:
response_data = response.json()
return self._extract_content(response_data)
except json.JSONDecodeError:
logger.warning(
"Async response is not JSON, returning raw text"
)
return response.text
else:
logger.warning(
f"Async request failed: {response.status_code}"
)
return f"Error: HTTP {response.status_code} - {response.text}"
except httpx.RequestError as e:
logger.error(f"Async request error: {e}")
return f"Request error: {str(e)}"
except Exception as e:
logger.error(f"Unexpected async error: {e}")
return f"Unexpected error: {str(e)}"
# # Example usage with Anthropic API
# if __name__ == "__main__":
# # Initialize the agent for Anthropic API
# anthropic_agent = CustomAgent(
# base_url="https://api.anthropic.com",
# endpoint="v1/messages",
# headers={
# "x-api-key": "your-anthropic-api-key-here",
# "anthropic-version": "2023-06-01"
# }
# )
# # Example payload for Anthropic API
# payload = {
# "model": "claude-3-sonnet-20240229",
# "max_tokens": 1000,
# "messages": [
# {
# "role": "user",
# "content": "Hello! Can you explain what artificial intelligence is?"
# }
# ]
# }
# # Make the request
# try:
# response = anthropic_agent.run(payload)
# print("Anthropic API Response:")
# print(response)
# except Exception as e:
# print(f"Error: {e}")
# # Example with async usage
# # import asyncio
# #
# # async def async_example():
# # response = await anthropic_agent.run_async(payload)
# # print("Async Anthropic API Response:")
# # print(response)
# #
# # Uncomment to run async example
# # asyncio.run(async_example())

@ -986,8 +986,10 @@ class GraphWorkflow:
f"Error in GraphWorkflow.add_node for agent {getattr(agent, 'agent_name', 'unnamed')}: {e}"
)
raise e
def add_nodes(self, agents: List[Agent], batch_size: int = 10, **kwargs: Any) -> None:
def add_nodes(
self, agents: List[Agent], batch_size: int = 10, **kwargs: Any
) -> None:
"""
Add multiple agents to the workflow graph concurrently in batches.
@ -996,18 +998,24 @@ class GraphWorkflow:
batch_size (int): Number of agents to add concurrently in a batch. Defaults to 8.
**kwargs: Additional keyword arguments for each node addition.
"""
try:
with concurrent.futures.ThreadPoolExecutor(max_workers = self._max_workers) as executor:
with concurrent.futures.ThreadPoolExecutor(
max_workers=self._max_workers
) as executor:
# Process agents in batches
for i in range(0, len(agents), batch_size):
batch = agents[i:i + batch_size]
batch = agents[i : i + batch_size]
futures = [
executor.submit(self.add_node, agent, **kwargs)
executor.submit(
self.add_node, agent, **kwargs
)
for agent in batch
]
# Ensure all nodes in batch are added before next batch
for future in concurrent.futures.as_completed(futures):
for future in concurrent.futures.as_completed(
futures
):
future.result()
except Exception as e:
logger.exception(
@ -2292,7 +2300,6 @@ class GraphWorkflow:
f"Error in GraphWorkflow.visualize_simple: {e}"
)
raise e
def to_json(
self,

@ -1,64 +1,39 @@
import random
from datetime import datetime
from typing import List, Optional
from typing import List, Union
import tenacity
from pydantic import BaseModel, Field
from swarms.schemas.agent_step_schemas import ManySteps
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType
logger = initialize_logger("round-robin")
datetime_stamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
class MetadataSchema(BaseModel):
swarm_id: Optional[str] = Field(
..., description="Unique ID for the run"
)
name: Optional[str] = Field(
"RoundRobinSwarm", description="Name of the swarm"
)
task: Optional[str] = Field(
..., description="Task or query given to all agents"
)
description: Optional[str] = Field(
"Concurrent execution of multiple agents",
description="Description of the workflow",
)
agent_outputs: Optional[List[ManySteps]] = Field(
..., description="List of agent outputs and metadata"
)
timestamp: Optional[str] = Field(
default_factory=datetime.now,
description="Timestamp of the workflow execution",
)
max_loops: Optional[int] = Field(
1, description="Maximum number of loops to run"
)
class RoundRobinSwarm(BaseSwarm):
class RoundRobinSwarm:
"""
A swarm implementation that executes tasks in a round-robin fashion.
Args:
name (str): Name of the swarm. Defaults to "RoundRobinSwarm".
description (str): Description of the swarm's purpose.
agents (List[Agent], optional): List of agents in the swarm. Defaults to None.
verbose (bool, optional): Flag to enable verbose mode. Defaults to False.
max_loops (int, optional): Maximum number of loops to run. Defaults to 1.
callback (callable, optional): Callback function to be called after each loop. Defaults to None.
return_json_on (bool, optional): Flag to return the metadata as a JSON object. Defaults to False.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
max_retries (int, optional): Maximum number of retries for agent execution. Defaults to 3.
output_type (OutputType, optional): Type of output format. Defaults to "final".
Attributes:
agents (List[Agent]): List of agents in the swarm.
verbose (bool): Flag to enable verbose mode.
max_loops (int): Maximum number of loops to run.
index (int): Current index of the agent being executed.
conversation (Conversation): Conversation history for the swarm.
Methods:
run(task: str, *args, **kwargs) -> Any: Executes the given task on the agents in a round-robin fashion.
@ -73,54 +48,36 @@ class RoundRobinSwarm(BaseSwarm):
verbose: bool = False,
max_loops: int = 1,
callback: callable = None,
return_json_on: bool = False,
max_retries: int = 3,
*args,
**kwargs,
output_type: OutputType = "final",
):
try:
super().__init__(
name=name,
description=description,
agents=agents,
*args,
**kwargs,
)
self.name = name
self.description = description
self.agents = agents or []
self.verbose = verbose
self.max_loops = max_loops
self.callback = callback
self.return_json_on = return_json_on
self.index = 0
self.max_retries = max_retries
# Store the metadata for the run
self.output_schema = MetadataSchema(
name=self.name,
swarm_id=datetime_stamp,
task="",
description=self.description,
agent_outputs=[],
timestamp=datetime_stamp,
max_loops=self.max_loops,
)
# Set the max loops for every agent
if self.agents:
for agent in self.agents:
agent.max_loops = random.randint(1, 5)
logger.info(
f"Successfully initialized {self.name} with {len(self.agents)} agents"
self.name = name
self.description = description
self.agents = agents
self.verbose = verbose
self.max_loops = max_loops
self.callback = callback
self.index = 0
self.max_retries = max_retries
self.output_type = output_type
# Initialize conversation for tracking agent interactions
self.conversation = Conversation(name=f"{name}_conversation")
if self.agents is None:
raise ValueError(
"RoundRobinSwarm cannot be initialized without agents"
)
except Exception as e:
logger.error(
f"Failed to initialize {self.name}: {str(e)}"
)
raise
# Set the max loops for every agent
if self.agents:
for agent in self.agents:
agent.max_loops = random.randint(1, 5)
logger.info(
f"Successfully initialized {self.name} with {len(self.agents)} agents"
)
@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
@ -133,14 +90,26 @@ class RoundRobinSwarm(BaseSwarm):
def _execute_agent(
self, agent: Agent, task: str, *args, **kwargs
) -> str:
"""Execute a single agent with retries and error handling"""
"""
Execute a single agent with retries and error handling.
Args:
agent (Agent): The agent to execute.
task (str): The task to be executed.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
str: The result of the agent execution.
"""
try:
logger.info(
f"Running Agent {agent.agent_name} on task: {task}"
)
result = agent.run(task, *args, **kwargs)
self.output_schema.agent_outputs.append(
agent.agent_output
self.conversation.add(
role=agent.agent_name,
content=result,
)
return result
except Exception as e:
@ -149,9 +118,16 @@ class RoundRobinSwarm(BaseSwarm):
)
raise
def run(self, task: str, *args, **kwargs):
def run(
self, task: str, *args, **kwargs
) -> Union[str, dict, list]:
"""
Executes the given task on the agents in a round-robin fashion.
Executes the given task on the agents in a randomized round-robin fashion.
This method implements an AutoGen-style communication pattern where:
- Agents are shuffled randomly each loop for varied interaction patterns
- Each agent receives the full conversation context to build upon others' responses
- Collaborative prompting encourages agents to acknowledge and extend prior contributions
Args:
task (str): The task to be executed.
@ -159,7 +135,7 @@ class RoundRobinSwarm(BaseSwarm):
**kwargs: Arbitrary keyword arguments.
Returns:
Any: The result of the task execution.
Union[str, dict, list]: The result of the task execution in the specified output format.
Raises:
ValueError: If no agents are configured
@ -170,11 +146,15 @@ class RoundRobinSwarm(BaseSwarm):
raise ValueError("No agents configured for the swarm")
try:
result = task
self.output_schema.task = task
# Add initial task to conversation
self.conversation.add(role="User", content=task)
n = len(self.agents)
# Build agent names list for context
agent_names = [agent.agent_name for agent in self.agents]
logger.info(
f"Starting round-robin execution with task '{task}' on {n} agents"
f"Starting randomized round-robin execution with task on {n} agents: {agent_names}"
)
for loop in range(self.max_loops):
@ -182,14 +162,36 @@ class RoundRobinSwarm(BaseSwarm):
f"Starting loop {loop + 1}/{self.max_loops}"
)
for _ in range(n):
current_agent = self.agents[self.index]
# Shuffle agents randomly each loop for varied interaction patterns
shuffled_agents = self.agents.copy()
random.shuffle(shuffled_agents)
logger.debug(
f"Agent order for loop {loop + 1}: {[a.agent_name for a in shuffled_agents]}"
)
for i, current_agent in enumerate(shuffled_agents):
# Get current conversation context
conversation_context = self.conversation.return_history_as_string()
# Build collaborative prompt with context
collaborative_task = f"""{conversation_context}
As {current_agent.agent_name}, you are agent {i + 1} of {n} in this collaborative session. The other agents participating are: {', '.join(name for name in agent_names if name != current_agent.agent_name)}.
Please review the conversation history above carefully and build upon the insights shared by other agents. Acknowledge their contributions where relevant and provide your unique perspective and expertise. Be concise but thorough in your response, and if this is the first response in the conversation, address the original task directly.
Your response:"""
try:
result = self._execute_agent(
current_agent, result, *args, **kwargs
current_agent, collaborative_task, *args, **kwargs
)
except Exception as e:
logger.error(
f"Agent {current_agent.agent_name} failed: {str(e)}"
)
finally:
self.index = (self.index + 1) % n
raise
if self.callback:
logger.debug(
@ -203,21 +205,17 @@ class RoundRobinSwarm(BaseSwarm):
)
logger.success(
f"Successfully completed {self.max_loops} loops of round-robin execution"
f"Successfully completed {self.max_loops} loops of randomized round-robin execution"
)
if self.return_json_on:
return self.export_metadata()
return result
return history_output_formatter(
conversation=self.conversation,
type=self.output_type,
)
except Exception as e:
logger.error(f"Round-robin execution failed: {str(e)}")
raise
def export_metadata(self):
"""Export the execution metadata as JSON"""
try:
return self.output_schema.model_dump_json(indent=4)
except Exception as e:
logger.error(f"Failed to export metadata: {str(e)}")
raise
def run_batch(self, tasks: List[str]):
return [self.run(task) for task in tasks]

@ -106,6 +106,7 @@ def grid_swarm(
return history_output_formatter(conversation, output_type)
# Star Swarm: A central agent first processes all tasks, followed by others
def star_swarm(
agents: AgentListType,

@ -882,6 +882,8 @@ class OneToOne:
self,
sender: Agent,
receiver: Agent,
name: str = "OneToOne",
description: str = "A one-to-one communication pattern between two agents",
output_type: str = "dict",
):
"""
@ -890,10 +892,14 @@ class OneToOne:
Args:
sender: The sender agent
receiver: The receiver agent
name: Name of the communication pattern
description: Description of the communication pattern's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
self.sender = sender
self.receiver = receiver
self.name = name
self.description = description
self.output_type = output_type
self.conversation = Conversation()
@ -959,6 +965,8 @@ class Broadcast:
self,
sender: Agent,
receivers: AgentListType,
name: str = "Broadcast",
description: str = "A broadcast communication pattern from one agent to many agents",
output_type: str = "dict",
):
"""
@ -967,6 +975,8 @@ class Broadcast:
Args:
sender: The sender agent
receivers: List of receiver agents
name: Name of the communication pattern
description: Description of the communication pattern's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
self.sender = sender
@ -975,6 +985,8 @@ class Broadcast:
if isinstance(receivers[0], list)
else receivers
)
self.name = name
self.description = description
self.output_type = output_type
self.conversation = Conversation()
@ -1027,6 +1039,8 @@ class OneToThree:
self,
sender: Agent,
receivers: AgentListType,
name: str = "OneToThree",
description: str = "A one-to-three communication pattern from one agent to exactly three agents",
output_type: str = "dict",
):
"""
@ -1035,6 +1049,8 @@ class OneToThree:
Args:
sender: The sender agent
receivers: List of exactly three receiver agents
name: Name of the communication pattern
description: Description of the communication pattern's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
if len(receivers) != 3:
@ -1044,6 +1060,8 @@ class OneToThree:
self.sender = sender
self.receivers = receivers
self.name = name
self.description = description
self.output_type = output_type
self.conversation = Conversation()

@ -1,442 +0,0 @@
import pytest
import json
from unittest.mock import Mock, patch, AsyncMock
from loguru import logger
from swarms.structs.custom_agent import CustomAgent, AgentResponse
try:
import pytest_asyncio
ASYNC_AVAILABLE = True
except ImportError:
ASYNC_AVAILABLE = False
pytest_asyncio = None
def create_test_custom_agent():
return CustomAgent(
name="TestAgent",
description="Test agent for unit testing",
base_url="https://api.test.com",
endpoint="v1/test",
headers={"Authorization": "Bearer test-token"},
timeout=10.0,
verify_ssl=True,
)
@pytest.fixture
def sample_custom_agent():
return create_test_custom_agent()
def test_custom_agent_initialization():
try:
custom_agent_instance = CustomAgent(
name="TestAgent",
description="Test description",
base_url="https://api.example.com",
endpoint="v1/endpoint",
headers={"Content-Type": "application/json"},
timeout=30.0,
verify_ssl=True,
)
assert (
custom_agent_instance.base_url
== "https://api.example.com"
)
assert custom_agent_instance.endpoint == "v1/endpoint"
assert custom_agent_instance.timeout == 30.0
assert custom_agent_instance.verify_ssl is True
assert "Content-Type" in custom_agent_instance.default_headers
logger.info("CustomAgent initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize CustomAgent: {e}")
raise
def test_custom_agent_initialization_with_default_headers(
sample_custom_agent,
):
try:
custom_agent_no_headers = CustomAgent(
name="TestAgent",
description="Test",
base_url="https://api.test.com",
endpoint="test",
)
assert (
"Content-Type" in custom_agent_no_headers.default_headers
)
assert (
custom_agent_no_headers.default_headers["Content-Type"]
== "application/json"
)
logger.debug("Default Content-Type header added correctly")
except Exception as e:
logger.error(f"Failed to test default headers: {e}")
raise
def test_custom_agent_url_normalization():
try:
custom_agent_with_slashes = CustomAgent(
name="TestAgent",
description="Test",
base_url="https://api.test.com/",
endpoint="/v1/test",
)
assert (
custom_agent_with_slashes.base_url
== "https://api.test.com"
)
assert custom_agent_with_slashes.endpoint == "v1/test"
logger.debug("URL normalization works correctly")
except Exception as e:
logger.error(f"Failed to test URL normalization: {e}")
raise
def test_prepare_headers(sample_custom_agent):
try:
prepared_headers = sample_custom_agent._prepare_headers()
assert "Authorization" in prepared_headers
assert (
prepared_headers["Authorization"] == "Bearer test-token"
)
additional_headers = {"X-Custom-Header": "custom-value"}
prepared_headers_with_additional = (
sample_custom_agent._prepare_headers(additional_headers)
)
assert (
prepared_headers_with_additional["X-Custom-Header"]
== "custom-value"
)
assert (
prepared_headers_with_additional["Authorization"]
== "Bearer test-token"
)
logger.debug("Header preparation works correctly")
except Exception as e:
logger.error(f"Failed to test prepare_headers: {e}")
raise
def test_prepare_payload_dict(sample_custom_agent):
try:
payload_dict = {"key": "value", "number": 123}
prepared_payload = sample_custom_agent._prepare_payload(
payload_dict
)
assert isinstance(prepared_payload, str)
parsed = json.loads(prepared_payload)
assert parsed["key"] == "value"
assert parsed["number"] == 123
logger.debug("Dictionary payload prepared correctly")
except Exception as e:
logger.error(f"Failed to test prepare_payload with dict: {e}")
raise
def test_prepare_payload_string(sample_custom_agent):
try:
payload_string = '{"test": "value"}'
prepared_payload = sample_custom_agent._prepare_payload(
payload_string
)
assert prepared_payload == payload_string
logger.debug("String payload prepared correctly")
except Exception as e:
logger.error(
f"Failed to test prepare_payload with string: {e}"
)
raise
def test_prepare_payload_bytes(sample_custom_agent):
try:
payload_bytes = b'{"test": "value"}'
prepared_payload = sample_custom_agent._prepare_payload(
payload_bytes
)
assert prepared_payload == payload_bytes
logger.debug("Bytes payload prepared correctly")
except Exception as e:
logger.error(
f"Failed to test prepare_payload with bytes: {e}"
)
raise
def test_parse_response_success(sample_custom_agent):
try:
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = '{"message": "success"}'
mock_response.headers = {"content-type": "application/json"}
mock_response.json.return_value = {"message": "success"}
parsed_response = sample_custom_agent._parse_response(
mock_response
)
assert isinstance(parsed_response, AgentResponse)
assert parsed_response.status_code == 200
assert parsed_response.success is True
assert parsed_response.json_data == {"message": "success"}
assert parsed_response.error_message is None
logger.debug("Successful response parsed correctly")
except Exception as e:
logger.error(f"Failed to test parse_response success: {e}")
raise
def test_parse_response_error(sample_custom_agent):
try:
mock_response = Mock()
mock_response.status_code = 404
mock_response.text = "Not Found"
mock_response.headers = {"content-type": "text/plain"}
parsed_response = sample_custom_agent._parse_response(
mock_response
)
assert isinstance(parsed_response, AgentResponse)
assert parsed_response.status_code == 404
assert parsed_response.success is False
assert parsed_response.error_message == "HTTP 404"
logger.debug("Error response parsed correctly")
except Exception as e:
logger.error(f"Failed to test parse_response error: {e}")
raise
def test_extract_content_openai_format(sample_custom_agent):
try:
openai_response = {
"choices": [
{
"message": {
"content": "This is the response content"
}
}
]
}
extracted_content = sample_custom_agent._extract_content(
openai_response
)
assert extracted_content == "This is the response content"
logger.debug("OpenAI format content extracted correctly")
except Exception as e:
logger.error(
f"Failed to test extract_content OpenAI format: {e}"
)
raise
def test_extract_content_anthropic_format(sample_custom_agent):
try:
anthropic_response = {
"content": [
{"text": "First part "},
{"text": "second part"},
]
}
extracted_content = sample_custom_agent._extract_content(
anthropic_response
)
assert extracted_content == "First part second part"
logger.debug("Anthropic format content extracted correctly")
except Exception as e:
logger.error(
f"Failed to test extract_content Anthropic format: {e}"
)
raise
def test_extract_content_generic_format(sample_custom_agent):
try:
generic_response = {"text": "Generic response text"}
extracted_content = sample_custom_agent._extract_content(
generic_response
)
assert extracted_content == "Generic response text"
logger.debug("Generic format content extracted correctly")
except Exception as e:
logger.error(
f"Failed to test extract_content generic format: {e}"
)
raise
@patch("swarms.structs.custom_agent.httpx.Client")
def test_run_success(mock_client_class, sample_custom_agent):
try:
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = (
'{"choices": [{"message": {"content": "Success"}}]}'
)
mock_response.json.return_value = {
"choices": [{"message": {"content": "Success"}}]
}
mock_response.headers = {"content-type": "application/json"}
mock_client_instance = Mock()
mock_client_instance.__enter__ = Mock(
return_value=mock_client_instance
)
mock_client_instance.__exit__ = Mock(return_value=None)
mock_client_instance.post.return_value = mock_response
mock_client_class.return_value = mock_client_instance
test_payload = {"message": "test"}
result = sample_custom_agent.run(test_payload)
assert result == "Success"
logger.info("Run method executed successfully")
except Exception as e:
logger.error(f"Failed to test run success: {e}")
raise
@patch("swarms.structs.custom_agent.httpx.Client")
def test_run_error_response(mock_client_class, sample_custom_agent):
try:
mock_response = Mock()
mock_response.status_code = 500
mock_response.text = "Internal Server Error"
mock_client_instance = Mock()
mock_client_instance.__enter__ = Mock(
return_value=mock_client_instance
)
mock_client_instance.__exit__ = Mock(return_value=None)
mock_client_instance.post.return_value = mock_response
mock_client_class.return_value = mock_client_instance
test_payload = {"message": "test"}
result = sample_custom_agent.run(test_payload)
assert "Error: HTTP 500" in result
logger.debug("Error response handled correctly")
except Exception as e:
logger.error(f"Failed to test run error response: {e}")
raise
@patch("swarms.structs.custom_agent.httpx.Client")
def test_run_request_error(mock_client_class, sample_custom_agent):
try:
import httpx
mock_client_instance = Mock()
mock_client_instance.__enter__ = Mock(
return_value=mock_client_instance
)
mock_client_instance.__exit__ = Mock(return_value=None)
mock_client_instance.post.side_effect = httpx.RequestError(
"Connection failed"
)
mock_client_class.return_value = mock_client_instance
test_payload = {"message": "test"}
result = sample_custom_agent.run(test_payload)
assert "Request error" in result
logger.debug("Request error handled correctly")
except Exception as e:
logger.error(f"Failed to test run request error: {e}")
raise
@pytest.mark.skipif(
not ASYNC_AVAILABLE, reason="pytest-asyncio not installed"
)
@pytest.mark.asyncio
@patch("swarms.structs.custom_agent.httpx.AsyncClient")
async def test_run_async_success(
mock_async_client_class, sample_custom_agent
):
try:
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = (
'{"content": [{"text": "Async Success"}]}'
)
mock_response.json.return_value = {
"content": [{"text": "Async Success"}]
}
mock_response.headers = {"content-type": "application/json"}
mock_client_instance = AsyncMock()
mock_client_instance.__aenter__ = AsyncMock(
return_value=mock_client_instance
)
mock_client_instance.__aexit__ = AsyncMock(return_value=None)
mock_client_instance.post = AsyncMock(
return_value=mock_response
)
mock_async_client_class.return_value = mock_client_instance
test_payload = {"message": "test"}
result = await sample_custom_agent.run_async(test_payload)
assert result == "Async Success"
logger.info("Run_async method executed successfully")
except Exception as e:
logger.error(f"Failed to test run_async success: {e}")
raise
@pytest.mark.skipif(
not ASYNC_AVAILABLE, reason="pytest-asyncio not installed"
)
@pytest.mark.asyncio
@patch("swarms.structs.custom_agent.httpx.AsyncClient")
async def test_run_async_error_response(
mock_async_client_class, sample_custom_agent
):
try:
mock_response = Mock()
mock_response.status_code = 400
mock_response.text = "Bad Request"
mock_client_instance = AsyncMock()
mock_client_instance.__aenter__ = AsyncMock(
return_value=mock_client_instance
)
mock_client_instance.__aexit__ = AsyncMock(return_value=None)
mock_client_instance.post = AsyncMock(
return_value=mock_response
)
mock_async_client_class.return_value = mock_client_instance
test_payload = {"message": "test"}
result = await sample_custom_agent.run_async(test_payload)
assert "Error: HTTP 400" in result
logger.debug("Async error response handled correctly")
except Exception as e:
logger.error(f"Failed to test run_async error response: {e}")
raise
def test_agent_response_dataclass():
try:
agent_response_instance = AgentResponse(
status_code=200,
content="Success",
headers={"content-type": "application/json"},
json_data={"key": "value"},
success=True,
error_message=None,
)
assert agent_response_instance.status_code == 200
assert agent_response_instance.content == "Success"
assert agent_response_instance.success is True
assert agent_response_instance.error_message is None
logger.debug("AgentResponse dataclass created correctly")
except Exception as e:
logger.error(f"Failed to test AgentResponse dataclass: {e}")
raise

@ -21,21 +21,30 @@ def test_llm_council_default_initialization():
"""Test LLM Council initialization with default council members."""
try:
logger.info("Testing LLM Council default initialization...")
council = LLMCouncil(
verbose=False,
output_type="dict-all-except-first"
verbose=False, output_type="dict-all-except-first"
)
assert council is not None, "Council should be initialized"
assert council.name == "LLM Council", "Default name should be 'LLM Council'"
assert len(council.council_members) > 0, "Should have council members"
assert council.chairman is not None, "Chairman should be initialized"
assert council.conversation is not None, "Conversation should be initialized"
logger.info(f"✓ Council initialized with {len(council.council_members)} members")
assert (
council.name == "LLM Council"
), "Default name should be 'LLM Council'"
assert (
len(council.council_members) > 0
), "Should have council members"
assert (
council.chairman is not None
), "Chairman should be initialized"
assert (
council.conversation is not None
), "Conversation should be initialized"
logger.info(
f"✓ Council initialized with {len(council.council_members)} members"
)
logger.info("✓ Default initialization test passed")
except Exception as e:
logger.error(f"✗ Default initialization test failed: {e}")
raise
@ -45,7 +54,7 @@ def test_llm_council_custom_initialization():
"""Test LLM Council initialization with custom council members."""
try:
logger.info("Testing LLM Council custom initialization...")
# Create custom council members with simpler models
custom_members = [
Agent(
@ -65,24 +74,34 @@ def test_llm_council_custom_initialization():
verbose=False,
),
]
council = LLMCouncil(
name="Custom Council",
council_members=custom_members,
chairman_model="gpt-4o-mini",
verbose=False,
output_type="string"
output_type="string",
)
assert council is not None, "Council should be initialized"
assert council.name == "Custom Council", "Name should match custom value"
assert len(council.council_members) == 2, "Should have 2 custom members"
assert council.council_members[0].agent_name == "TestAgent1", "First member should match"
assert council.council_members[1].agent_name == "TestAgent2", "Second member should match"
assert council.output_type == "string", "Output type should be 'string'"
assert (
council.name == "Custom Council"
), "Name should match custom value"
assert (
len(council.council_members) == 2
), "Should have 2 custom members"
assert (
council.council_members[0].agent_name == "TestAgent1"
), "First member should match"
assert (
council.council_members[1].agent_name == "TestAgent2"
), "Second member should match"
assert (
council.output_type == "string"
), "Output type should be 'string'"
logger.info("✓ Custom initialization test passed")
except Exception as e:
logger.error(f"✗ Custom initialization test failed: {e}")
raise
@ -92,7 +111,7 @@ def test_llm_council_run():
"""Test LLM Council run method with a simple query."""
try:
logger.info("Testing LLM Council run method...")
# Use simpler models for testing
custom_members = [
Agent(
@ -112,44 +131,70 @@ def test_llm_council_run():
verbose=False,
),
]
council = LLMCouncil(
council_members=custom_members,
chairman_model="gpt-4o-mini",
verbose=False,
output_type="dict-all-except-first"
output_type="dict-all-except-first",
)
query = "What is 2 + 2? Provide a brief answer."
result = council.run(query)
# Basic assertions
assert result is not None, "Result should not be None"
assert council.conversation is not None, "Conversation should exist"
assert len(council.conversation.conversation_history) > 0, "Conversation should have messages"
assert (
council.conversation is not None
), "Conversation should exist"
assert (
len(council.conversation.conversation_history) > 0
), "Conversation should have messages"
# Enhanced assertions to verify workflow steps
messages = council.conversation.conversation_history
# Step 1: Verify User query was added
user_messages = [msg for msg in messages if msg.get("role") == "User"]
assert len(user_messages) > 0, "User query should be in conversation"
user_messages = [
msg for msg in messages if msg.get("role") == "User"
]
assert (
len(user_messages) > 0
), "User query should be in conversation"
# Step 2: Verify all council members responded
member_responses = [msg for msg in messages if msg.get("role") in ["TestAgent1", "TestAgent2"]]
assert len(member_responses) == len(custom_members), f"All {len(custom_members)} council members should have responded"
member_responses = [
msg
for msg in messages
if msg.get("role") in ["TestAgent1", "TestAgent2"]
]
assert len(member_responses) == len(
custom_members
), f"All {len(custom_members)} council members should have responded"
# Step 3: Verify evaluations were performed
evaluation_messages = [msg for msg in messages if "-Evaluation" in msg.get("role", "")]
assert len(evaluation_messages) == len(custom_members), f"All {len(custom_members)} members should have evaluated"
evaluation_messages = [
msg
for msg in messages
if "-Evaluation" in msg.get("role", "")
]
assert len(evaluation_messages) == len(
custom_members
), f"All {len(custom_members)} members should have evaluated"
# Step 4: Verify Chairman synthesis occurred
chairman_messages = [msg for msg in messages if msg.get("role") == "Chairman"]
assert len(chairman_messages) > 0, "Chairman should have synthesized final response"
chairman_messages = [
msg for msg in messages if msg.get("role") == "Chairman"
]
assert (
len(chairman_messages) > 0
), "Chairman should have synthesized final response"
logger.info("✓ Run method test passed")
logger.info(f"✓ Verified {len(member_responses)} member responses, {len(evaluation_messages)} evaluations, and {len(chairman_messages)} chairman synthesis")
logger.info(
f"✓ Verified {len(member_responses)} member responses, {len(evaluation_messages)} evaluations, and {len(chairman_messages)} chairman synthesis"
)
except Exception as e:
logger.error(f"✗ Run method test failed: {e}")
raise
@ -159,7 +204,7 @@ def test_llm_council_batched_run():
"""Test LLM Council batched_run method with multiple tasks."""
try:
logger.info("Testing LLM Council batched_run method...")
# Use simpler models for testing
custom_members = [
Agent(
@ -179,27 +224,33 @@ def test_llm_council_batched_run():
verbose=False,
),
]
council = LLMCouncil(
council_members=custom_members,
chairman_model="gpt-4o-mini",
verbose=False,
output_type="dict-all-except-first"
output_type="dict-all-except-first",
)
tasks = [
"What is 1 + 1?",
"What is 3 + 3?",
]
results = council.batched_run(tasks)
assert results is not None, "Results should not be None"
assert len(results) == len(tasks), f"Should have {len(tasks)} results"
assert all(result is not None for result in results), "All results should not be None"
logger.info(f"✓ Batched run test passed with {len(results)} results")
assert len(results) == len(
tasks
), f"Should have {len(tasks)} results"
assert all(
result is not None for result in results
), "All results should not be None"
logger.info(
f"✓ Batched run test passed with {len(results)} results"
)
except Exception as e:
logger.error(f"✗ Batched run test failed: {e}")
raise
@ -208,8 +259,10 @@ def test_llm_council_batched_run():
def test_llm_council_output_types():
"""Test LLM Council with different output types."""
try:
logger.info("Testing LLM Council with different output types...")
logger.info(
"Testing LLM Council with different output types..."
)
# Use simpler models for testing
custom_members = [
Agent(
@ -229,29 +282,33 @@ def test_llm_council_output_types():
verbose=False,
),
]
output_types = ["string", "dict-all-except-first", "final"]
for output_type in output_types:
logger.info(f"Testing output type: {output_type}")
council = LLMCouncil(
council_members=custom_members,
chairman_model="gpt-4o-mini",
verbose=False,
output_type=output_type
output_type=output_type,
)
query = "What is 5 + 5? Provide a brief answer."
result = council.run(query)
assert result is not None, f"Result should not be None for output type {output_type}"
assert council.output_type == output_type, f"Output type should be {output_type}"
assert (
result is not None
), f"Result should not be None for output type {output_type}"
assert (
council.output_type == output_type
), f"Output type should be {output_type}"
logger.info(f"✓ Output type '{output_type}' test passed")
logger.info("✓ All output types test passed")
except Exception as e:
logger.error(f"✗ Output types test failed: {e}")
raise
@ -259,4 +316,3 @@ def test_llm_council_output_types():
if __name__ == "__main__":
pytest.main([__file__, "-v"])

Loading…
Cancel
Save