diff --git a/README.md b/README.md
index 29944494..89e713ec 100644
--- a/README.md
+++ b/README.md
@@ -58,6 +58,17 @@ Swarms delivers a comprehensive, enterprise-grade multi-agent infrastructure pla
| π οΈ **Developer Experience** | β’ Intuitive Enterprise API
β’ Comprehensive Documentation
β’ Active Enterprise Community
β’ CLI & SDK Tools
β’ IDE Integration Support
β’ Code Generation Templates | β’ Accelerated Development Cycles
β’ Reduced Learning Curve
β’ Expert Community Support
β’ Rapid Deployment Capabilities
β’ Enhanced Developer Productivity
β’ Standardized Development Patterns |
+## π Supported Protocols & Integrations
+
+Swarms seamlessly integrates with industry-standard protocols, enabling powerful capabilities for tool integration, payment processing, and distributed agent orchestration.
+
+| Protocol | Description | Use Cases | Documentation |
+|----------|-------------|-----------|---------------|
+| **[MCP (Model Context Protocol)](https://docs.swarms.world/en/latest/swarms/examples/multi_mcp_agent/)** | Standardized protocol for AI agents to interact with external tools and services through MCP servers. Enables dynamic tool discovery and execution. | β’ Tool integration
β’ Multi-server connections
β’ External API access
β’ Database connectivity | [MCP Integration Guide](https://docs.swarms.world/en/latest/swarms/examples/multi_mcp_agent/) |
+| **[X402](https://docs.swarms.world/en/latest/examples/x402_payment_integration/)** | Cryptocurrency payment protocol for API endpoints. Enables monetization of agents with pay-per-use models. | β’ Agent monetization
β’ Payment gate protection
β’ Crypto payments
β’ Pay-per-use services | [X402 Quickstart](https://docs.swarms.world/en/latest/examples/x402_payment_integration/) |
+| **[AOP (Agent Orchestration Protocol)](https://docs.swarms.world/en/latest/examples/aop_medical/)** | Framework for deploying and managing agents as distributed services. Enables agent discovery, management, and execution through standardized protocols. | β’ Distributed agent deployment
β’ Agent discovery
β’ Service orchestration
β’ Scalable multi-agent systems | [AOP Reference](https://docs.swarms.world/en/latest/swarms/structs/aop/) |
+
+
## Install π»
### Using pip
@@ -822,7 +833,7 @@ Thank you for contributing to swarms. Your work is extremely appreciated and rec
-----
-## Connect With Us
+## Join the Swarms community πΎπΎπΎ
Join our community of agent engineers and researchers for technical support, cutting-edge updates, and exclusive access to world-class agent engineering insights!
diff --git a/docs/examples/x402_discovery_query.md b/docs/examples/x402_discovery_query.md
new file mode 100644
index 00000000..f6e4abd9
--- /dev/null
+++ b/docs/examples/x402_discovery_query.md
@@ -0,0 +1,231 @@
+# X402 Discovery Query Agent
+
+This example demonstrates how to create a Swarms agent that can search and query services from the X402 bazaar using the Coinbase CDP API. The agent can discover available services, filter them by price, and provide summaries of the results.
+
+## Overview
+
+The X402 Discovery Query Agent enables you to:
+
+| Feature | Description |
+|---------|-------------|
+| Query X402 services | Search the X402 bazaar for available services |
+| Filter by price | Find services within your budget |
+| Summarize results | Get AI-powered summaries of discovered services |
+| Pagination support | Handle large result sets efficiently |
+
+## Prerequisites
+
+Before you begin, ensure you have:
+
+- Python 3.10 or higher
+- API keys for your AI model provider (e.g., Anthropic Claude)
+- `httpx` library for async HTTP requests
+
+## Installation
+
+Install the required dependencies:
+
+```bash
+pip install swarms httpx
+```
+
+## Code Example
+
+Here's the complete implementation of the X402 Discovery Query Agent:
+
+```python
+import asyncio
+from typing import List, Optional, Dict, Any
+from swarms import Agent
+import httpx
+
+
+async def query_x402_services(
+ limit: Optional[int] = None,
+ max_price: Optional[int] = None,
+ offset: int = 0,
+ base_url: str = "https://api.cdp.coinbase.com",
+) -> Dict[str, Any]:
+ """
+ Query x402 discovery services from the Coinbase CDP API.
+
+ Args:
+ limit: Optional maximum number of services to return. If None, returns all available.
+ max_price: Optional maximum price in atomic units to filter by. Only services with
+ maxAmountRequired <= max_price will be included.
+ offset: Pagination offset for the API request. Defaults to 0.
+ base_url: Base URL for the API. Defaults to Coinbase CDP API.
+
+ Returns:
+ Dict containing the API response with 'items' list and pagination info.
+
+ Raises:
+ httpx.HTTPError: If the HTTP request fails.
+ httpx.RequestError: If there's a network error.
+ """
+ url = f"{base_url}/platform/v2/x402/discovery/resources"
+ params = {"offset": offset}
+
+ # If both limit and max_price are specified, fetch more services to account for filtering
+ api_limit = limit
+ if limit is not None and max_price is not None:
+ # Fetch 5x the limit to account for services that might be filtered out
+ api_limit = limit * 5
+
+ if api_limit is not None:
+ params["limit"] = api_limit
+
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ response = await client.get(url, params=params)
+ response.raise_for_status()
+ data = response.json()
+
+ # Filter by price if max_price is specified
+ if max_price is not None and "items" in data:
+ filtered_items = []
+ for item in data.get("items", []):
+ # Check if any payment option in 'accepts' has maxAmountRequired <= max_price
+ accepts = item.get("accepts", [])
+ for accept in accepts:
+ max_amount_str = accept.get("maxAmountRequired", "")
+ if max_amount_str:
+ try:
+ max_amount = int(max_amount_str)
+ if max_amount <= max_price:
+ filtered_items.append(item)
+ break # Only add item once if any payment option matches
+ except (ValueError, TypeError):
+ continue
+
+ # Apply limit to filtered results if specified
+ if limit is not None:
+ filtered_items = filtered_items[:limit]
+
+ data["items"] = filtered_items
+ # Update pagination total if we filtered
+ if "pagination" in data:
+ data["pagination"]["total"] = len(filtered_items)
+
+ return data
+
+
+def get_x402_services_sync(
+ limit: Optional[int] = None,
+ max_price: Optional[int] = None,
+ offset: int = 0,
+) -> str:
+ """
+ Synchronous wrapper for get_x402_services that returns a formatted string.
+
+ Args:
+ limit: Optional maximum number of services to return.
+ max_price: Optional maximum price in atomic units to filter by.
+ offset: Pagination offset for the API request. Defaults to 0.
+
+ Returns:
+ JSON-formatted string of service dictionaries matching the criteria.
+ """
+ async def get_x402_services():
+ result = await query_x402_services(
+ limit=limit, max_price=max_price, offset=offset
+ )
+ return result.get("items", [])
+
+ services = asyncio.run(get_x402_services())
+ return str(services)
+
+
+# Initialize the agent with the discovery tool
+agent = Agent(
+ agent_name="X402-Discovery-Agent",
+ agent_description="A agent that queries the x402 discovery services from the Coinbase CDP API.",
+ model_name="claude-haiku-4-5",
+ dynamic_temperature_enabled=True,
+ max_loops=1,
+ dynamic_context_window=True,
+ tools=[get_x402_services_sync],
+ top_p=None,
+ temperature=None,
+ tool_call_summary=True,
+)
+
+if __name__ == "__main__":
+ # Run the agent
+ out = agent.run(
+ task="Summarize the first 10 services under 100000 atomic units (e.g., $0.10 USDC)"
+ )
+ print(out)
+```
+
+## Usage
+
+### Basic Query
+
+Query all available services:
+
+```python
+result = await query_x402_services()
+print(f"Found {len(result['items'])} services")
+```
+
+### Filtered Query
+
+Get services within a specific price range:
+
+```python
+# Get first 10 services under 100000 atomic units ($0.10 USDC with 6 decimals)
+services = await get_x402_services(limit=10, max_price=100000)
+for service in services:
+ print(service["resource"])
+```
+
+### Using the Agent
+
+Run the agent to get AI-powered summaries:
+
+```python
+# The agent will automatically call the tool and provide a summary
+out = agent.run(
+ task="Find and summarize 5 affordable services under 50000 atomic units"
+)
+print(out)
+```
+
+## Understanding Price Units
+
+X402 services use atomic units for pricing. For example:
+
+- **USDC** typically uses 6 decimals
+- 100,000 atomic units = $0.10 USDC
+- 1,000,000 atomic units = $1.00 USDC
+
+Always check the `accepts` array in each service to understand the payment options and their price requirements.
+
+## API Response Structure
+
+Each service in the response contains:
+
+- `resource`: The service endpoint or resource identifier
+- `accepts`: Array of payment options with `maxAmountRequired` values
+- Additional metadata about the service
+
+## Error Handling
+
+The functions handle various error cases:
+
+- Network errors are raised as `httpx.RequestError`
+- HTTP errors are raised as `httpx.HTTPError`
+- Invalid price values are silently skipped during filtering
+
+## Next Steps
+
+1. Customize the agent's system prompt for specific use cases
+2. Add additional filtering criteria (e.g., by service type)
+3. Implement caching for frequently accessed services
+4. Create a web interface for browsing services
+5. Integrate with payment processing to actually use discovered services
+
+## Related Documentation
+
+- [X402 Payment Integration](x402_payment_integration.md) - Learn how to monetize your agents with X402
+- [Agent Tools Reference](../swarms/tools/tools_examples.md) - Understand how to create and use tools with agents
diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
index b2b95c8d..f5f2c81c 100644
--- a/docs/mkdocs.yml
+++ b/docs/mkdocs.yml
@@ -437,6 +437,7 @@ nav:
- X402:
- x402 Quickstart Example: "examples/x402_payment_integration.md"
+ - X402 Discovery Query Agent: "examples/x402_discovery_query.md"
- Swarms Cloud API:
diff --git a/docs/quickstart.md b/docs/quickstart.md
index 1780748a..e04d9d57 100644
--- a/docs/quickstart.md
+++ b/docs/quickstart.md
@@ -290,14 +290,14 @@ task = "Write a short story about a robot who discovers music."
# --- Example 1: SequentialWorkflow ---
# Agents run one after another in a chain: Writer -> Editor -> Reviewer.
print("Running a Sequential Workflow...")
-sequential_router = SwarmRouter(swarm_type=SwarmType.SequentialWorkflow, agents=agents)
+sequential_router = SwarmRouter(swarm_type="SequentialWorkflow", agents=agents)
sequential_output = sequential_router.run(task)
print(f"Final Sequential Output:\n{sequential_output}\n")
# --- Example 2: ConcurrentWorkflow ---
# All agents receive the same initial task and run at the same time.
print("Running a Concurrent Workflow...")
-concurrent_router = SwarmRouter(swarm_type=SwarmType.ConcurrentWorkflow, agents=agents)
+concurrent_router = SwarmRouter(swarm_type="ConcurrentWorkflow", agents=agents)
concurrent_outputs = concurrent_router.run(task)
# This returns a dictionary of each agent's output
for agent_name, output in concurrent_outputs.items():
@@ -312,9 +312,9 @@ aggregator = Agent(
model_name="gpt-4o-mini"
)
moa_router = SwarmRouter(
- swarm_type=SwarmType.MixtureOfAgents,
+ swarm_type="MixtureOfAgents",
agents=agents,
- aggregator_agent=aggregator, # MoA requires an aggregator
+ aggregator_agent=aggregator,
)
aggregated_output = moa_router.run(task)
print(f"Final Aggregated Output:\n{aggregated_output}\n")
diff --git a/docs/requirements.txt b/docs/requirements.txt
index 8878fb5a..4e9c01f7 100644
--- a/docs/requirements.txt
+++ b/docs/requirements.txt
@@ -24,7 +24,7 @@ mkdocs-autolinks-plugin
# Requirements for core
jinja2~=3.1
-markdown~=3.8
+markdown~=3.10
mkdocs-material-extensions~=1.3
pygments~=2.19
pymdown-extensions~=10.16
diff --git a/docs/swarms/examples/swarm_router.md b/docs/swarms/examples/swarm_router.md
index 7caa875c..b404bc57 100644
--- a/docs/swarms/examples/swarm_router.md
+++ b/docs/swarms/examples/swarm_router.md
@@ -29,7 +29,7 @@ GROQ_API_KEY=""
```python
from swarms import Agent
-from swarms.structs.swarm_router import SwarmRouter, SwarmType
+from swarms.structs.swarm_router import SwarmRouter
# Initialize specialized agents
data_extractor_agent = Agent(
@@ -61,7 +61,7 @@ sequential_router = SwarmRouter(
name="SequentialRouter",
description="Process tasks in sequence",
agents=[data_extractor_agent, summarizer_agent, financial_analyst_agent],
- swarm_type=SwarmType.SequentialWorkflow,
+ swarm_type="SequentialWorkflow",
max_loops=1
)
@@ -76,7 +76,7 @@ concurrent_router = SwarmRouter(
name="ConcurrentRouter",
description="Process tasks concurrently",
agents=[data_extractor_agent, summarizer_agent, financial_analyst_agent],
- swarm_type=SwarmType.ConcurrentWorkflow,
+ swarm_type="ConcurrentWorkflow",
max_loops=1
)
@@ -91,8 +91,8 @@ rearrange_router = SwarmRouter(
name="RearrangeRouter",
description="Dynamically rearrange agents for optimal task processing",
agents=[data_extractor_agent, summarizer_agent, financial_analyst_agent],
- swarm_type=SwarmType.AgentRearrange,
- flow=f"{data_extractor_agent.agent_name} -> {summarizer_agent.agent_name} -> {financial_analyst_agent.agent_name}",
+ swarm_type="AgentRearrange",
+ rearrange_flow=f"{data_extractor_agent.agent_name} -> {summarizer_agent.agent_name} -> {financial_analyst_agent.agent_name}",
max_loops=1
)
@@ -107,7 +107,7 @@ mixture_router = SwarmRouter(
name="MixtureRouter",
description="Combine multiple expert agents",
agents=[data_extractor_agent, summarizer_agent, financial_analyst_agent],
- swarm_type=SwarmType.MixtureOfAgents,
+ swarm_type="MixtureOfAgents",
max_loops=1
)
@@ -137,7 +137,7 @@ router = SwarmRouter(
name="CustomRouter",
description="Custom router configuration",
agents=[data_extractor_agent, summarizer_agent, financial_analyst_agent],
- swarm_type=SwarmType.SequentialWorkflow,
+ swarm_type="SequentialWorkflow",
max_loops=3,
autosave=True,
verbose=True,
@@ -145,6 +145,27 @@ router = SwarmRouter(
)
```
+# SwarmType Reference
+
+## Valid SwarmType Values
+
+| Value | Description |
+|-------|-------------|
+| `"SequentialWorkflow"` | Execute agents in sequence |
+| `"ConcurrentWorkflow"` | Execute agents concurrently |
+| `"AgentRearrange"` | Dynamically rearrange agent execution order |
+| `"MixtureOfAgents"` | Combine outputs from multiple agents |
+| `"GroupChat"` | Enable group chat between agents |
+| `"MultiAgentRouter"` | Route tasks to appropriate agents |
+| `"AutoSwarmBuilder"` | Automatically build swarm configuration |
+| `"HiearchicalSwarm"` | Hierarchical agent organization |
+| `"MajorityVoting"` | Use majority voting for decisions |
+| `"MALT"` | Multi-Agent Learning and Training |
+| `"CouncilAsAJudge"` | Council-based evaluation system |
+| `"InteractiveGroupChat"` | Interactive group chat with agents |
+| `"HeavySwarm"` | Heavy swarm for complex tasks |
+| `"auto"` | Automatically select swarm type |
+
# Best Practices
## Choose the appropriate swarm type based on your task requirements:
@@ -187,7 +208,7 @@ Here's a complete example showing how to use SwarmRouter in a real-world scenari
```python
import os
from swarms import Agent
-from swarms.structs.swarm_router import SwarmRouter, SwarmType
+from swarms.structs.swarm_router import SwarmRouter
# Initialize specialized agents
research_agent = Agent(
@@ -216,7 +237,7 @@ router = SwarmRouter(
name="ResearchAnalysisRouter",
description="Process research and analysis tasks",
agents=[research_agent, analysis_agent, summary_agent],
- swarm_type=SwarmType.SequentialWorkflow,
+ swarm_type="SequentialWorkflow",
max_loops=1,
verbose=True
)
diff --git a/docs/swarms/structs/agent.md b/docs/swarms/structs/agent.md
index 8947374a..42b4f4dd 100644
--- a/docs/swarms/structs/agent.md
+++ b/docs/swarms/structs/agent.md
@@ -83,6 +83,7 @@ The `Agent` class establishes a conversational loop with a language model, allow
| `traceback` | `Optional[Any]` | Object used for traceback handling. |
| `traceback_handlers` | `Optional[Any]` | List of traceback handlers. |
| `streaming_on` | `Optional[bool]` | Boolean indicating whether to stream responses. |
+| `stream` | `Optional[bool]` | Boolean indicating whether to enable detailed token-by-token streaming with metadata. |
| `docs` | `List[str]` | List of document paths or contents to be ingested. |
| `docs_folder` | `Optional[str]` | Path to a folder containing documents to be ingested. |
| `verbose` | `Optional[bool]` | Boolean indicating whether to print verbose output. |
@@ -759,6 +760,22 @@ print(agent.system_prompt)
```
+### Token-by-Token Streaming
+
+```python
+from swarms import Agent
+
+# Initialize agent with detailed streaming
+agent = Agent(
+ model_name="gpt-4.1",
+ max_loops=1,
+ stream=True, # Enable detailed token-by-token streaming
+)
+
+# Run with detailed streaming - each token shows metadata
+agent.run("Tell me a short story about a robot learning to paint.")
+```
+
## Agent Structured Outputs
- Create a structured output schema for the agent [List[Dict]]
@@ -1112,4 +1129,4 @@ The `run` method now supports several new parameters for advanced functionality:
| `tool_retry_attempts` | Configure tool_retry_attempts for robust tool execution in production environments. |
| `handoffs` | Use handoffs to create specialized agent teams that can intelligently route tasks based on complexity and expertise requirements. |
-By following these guidelines and leveraging the Swarm Agent's extensive features, you can create powerful, flexible, and efficient autonomous agents for a wide range of applications.
\ No newline at end of file
+By following these guidelines and leveraging the Swarm Agent's extensive features, you can create powerful, flexible, and efficient autonomous agents for a wide range of applications.
diff --git a/docs/swarms/structs/auto_swarm_builder.md b/docs/swarms/structs/auto_swarm_builder.md
index b06a7e95..f278991c 100644
--- a/docs/swarms/structs/auto_swarm_builder.md
+++ b/docs/swarms/structs/auto_swarm_builder.md
@@ -40,7 +40,6 @@ The `execution_type` parameter controls how the AutoSwarmBuilder operates:
| Execution Type | Description |
|----------------------------------|-----------------------------------------------------------|
| **"return-agents"** | Creates and returns agent specifications as a dictionary (default) |
-| **"execute-swarm-router"** | Executes the swarm router with the created agents |
| **"return-swarm-router-config"** | Returns the swarm router configuration as a dictionary |
| **"return-agents-objects"** | Returns agent objects created from specifications |
@@ -602,7 +601,6 @@ for agent in agents:
- Use `verbose=True` during development for debugging
- Choose the right `execution_type` for your use case:
- Use `"return-agents"` for getting agent specifications as dictionary (default)
- - Use `"execute-swarm-router"` for executing the swarm router with created agents
- Use `"return-swarm-router-config"` for analyzing swarm architecture
- Use `"return-agents-objects"` for getting agent objects created from specifications
- Set `max_tokens` appropriately based on expected response length
diff --git a/docs/swarms/structs/index.md b/docs/swarms/structs/index.md
index f556ae3f..5604e372 100644
--- a/docs/swarms/structs/index.md
+++ b/docs/swarms/structs/index.md
@@ -186,14 +186,14 @@ task = "Write a short story about a robot who discovers music."
# --- Example 1: SequentialWorkflow ---
# Agents run one after another in a chain: Writer -> Editor -> Reviewer.
print("Running a Sequential Workflow...")
-sequential_router = SwarmRouter(swarm_type=SwarmType.SequentialWorkflow, agents=agents)
+sequential_router = SwarmRouter(swarm_type="SequentialWorkflow", agents=agents)
sequential_output = sequential_router.run(task)
print(f"Final Sequential Output:\n{sequential_output}\n")
# --- Example 2: ConcurrentWorkflow ---
# All agents receive the same initial task and run at the same time.
print("Running a Concurrent Workflow...")
-concurrent_router = SwarmRouter(swarm_type=SwarmType.ConcurrentWorkflow, agents=agents)
+concurrent_router = SwarmRouter(swarm_type="ConcurrentWorkflow", agents=agents)
concurrent_outputs = concurrent_router.run(task)
# This returns a dictionary of each agent's output
for agent_name, output in concurrent_outputs.items():
@@ -208,9 +208,9 @@ aggregator = Agent(
model_name="gpt-4o-mini"
)
moa_router = SwarmRouter(
- swarm_type=SwarmType.MixtureOfAgents,
+ swarm_type="MixtureOfAgents",
agents=agents,
- aggregator_agent=aggregator, # MoA requires an aggregator
+ aggregator_agent=aggregator,
)
aggregated_output = moa_router.run(task)
print(f"Final Aggregated Output:\n{aggregated_output}\n")
diff --git a/examples/guides/x402_examples/agent_integration/x402_agent_buying.py b/examples/guides/x402_examples/agent_integration/x402_agent_buying.py
new file mode 100644
index 00000000..4993125a
--- /dev/null
+++ b/examples/guides/x402_examples/agent_integration/x402_agent_buying.py
@@ -0,0 +1,50 @@
+from eth_account import Account
+from x402.clients.httpx import x402HttpxClient
+
+
+import os
+from dotenv import load_dotenv
+
+load_dotenv()
+
+
+async def buy_x402_service(
+ base_url: str = None, endpoint: str = None
+):
+ """
+ Purchase a service from the X402 bazaar using the provided affordable_service details.
+
+ This function sets up an X402 client with the user's private key, connects to the service provider,
+ and executes a GET request to the service's endpoint as part of the buying process.
+
+ Args:
+ affordable_service (dict): A dictionary containing information about the target service.
+ base_url (str, optional): The base URL of the service provider. Defaults to None.
+ endpoint (str, optional): The specific API endpoint to interact with. Defaults to None.
+
+ Returns:
+ response (httpx.Response): The response object returned by the GET request to the service endpoint.
+
+ Example:
+ ```python
+ affordable_service = {"id": "service123", "price": 90000}
+ response = await buy_x402_service(
+ affordable_service,
+ base_url="https://api.cdp.coinbase.com",
+ endpoint="/x402/v1/bazaar/services/service123"
+ )
+ print(await response.aread())
+ ```
+ """
+ key = os.getenv("X402_PRIVATE_KEY")
+
+ # Set up your payment account from private key
+ account = Account.from_key(key)
+
+ async with x402HttpxClient(
+ account=account, base_url=base_url
+ ) as client:
+ response = await client.get(endpoint)
+ print(await response.aread())
+
+ return response
diff --git a/examples/guides/x402_examples/agent_integration/x402_discovery_query.py b/examples/guides/x402_examples/agent_integration/x402_discovery_query.py
new file mode 100644
index 00000000..3664718f
--- /dev/null
+++ b/examples/guides/x402_examples/agent_integration/x402_discovery_query.py
@@ -0,0 +1,229 @@
+import asyncio
+from typing import List, Optional, Dict, Any
+from swarms import Agent
+import httpx
+
+
+async def query_x402_services(
+ limit: Optional[int] = None,
+ max_price: Optional[int] = None,
+ offset: int = 0,
+ base_url: str = "https://api.cdp.coinbase.com",
+) -> Dict[str, Any]:
+ """
+ Query x402 discovery services from the Coinbase CDP API.
+
+ Args:
+ limit: Optional maximum number of services to return. If None, returns all available.
+ max_price: Optional maximum price in atomic units to filter by. Only services with
+ maxAmountRequired <= max_price will be included.
+ offset: Pagination offset for the API request. Defaults to 0.
+ base_url: Base URL for the API. Defaults to Coinbase CDP API.
+
+ Returns:
+ Dict containing the API response with 'items' list and pagination info.
+
+ Raises:
+ httpx.HTTPError: If the HTTP request fails.
+ httpx.RequestError: If there's a network error.
+
+ Example:
+ ```python
+ # Get all services
+ result = await query_x402_services()
+ print(f"Found {len(result['items'])} services")
+
+ # Get first 10 services under 100000 atomic units
+ result = await query_x402_services(limit=10, max_price=100000)
+ ```
+ """
+ url = f"{base_url}/platform/v2/x402/discovery/resources"
+ params = {"offset": offset}
+
+ # If both limit and max_price are specified, fetch more services to account for filtering
+ # This ensures we can return the requested number after filtering by price
+ api_limit = limit
+ if limit is not None and max_price is not None:
+ # Fetch 5x the limit to account for services that might be filtered out
+ api_limit = limit * 5
+
+ if api_limit is not None:
+ params["limit"] = api_limit
+
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ response = await client.get(url, params=params)
+ response.raise_for_status()
+ data = response.json()
+
+ # Filter by price if max_price is specified
+ if max_price is not None and "items" in data:
+ filtered_items = []
+ for item in data.get("items", []):
+ # Check if any payment option in 'accepts' has maxAmountRequired <= max_price
+ accepts = item.get("accepts", [])
+ for accept in accepts:
+ max_amount_str = accept.get("maxAmountRequired", "")
+ if max_amount_str:
+ try:
+ max_amount = int(max_amount_str)
+ if max_amount <= max_price:
+ filtered_items.append(item)
+ break # Only add item once if any payment option matches
+ except (ValueError, TypeError):
+ continue
+
+ # Apply limit to filtered results if specified
+ if limit is not None:
+ filtered_items = filtered_items[:limit]
+
+ data["items"] = filtered_items
+ # Update pagination total if we filtered
+ if "pagination" in data:
+ data["pagination"]["total"] = len(filtered_items)
+
+ return data
+
+
+def filter_services_by_price(
+ services: List[Dict[str, Any]], max_price: int
+) -> List[Dict[str, Any]]:
+ """
+ Filter services by maximum price in atomic units.
+
+ Args:
+ services: List of service dictionaries from the API.
+ max_price: Maximum price in atomic units. Only services with at least one
+ payment option where maxAmountRequired <= max_price will be included.
+
+ Returns:
+ List of filtered service dictionaries.
+
+ Example:
+ ```python
+ all_services = result["items"]
+ affordable = filter_services_by_price(all_services, max_price=100000)
+ ```
+ """
+ filtered = []
+ for item in services:
+ accepts = item.get("accepts", [])
+ for accept in accepts:
+ max_amount_str = accept.get("maxAmountRequired", "")
+ if max_amount_str:
+ try:
+ max_amount = int(max_amount_str)
+ if max_amount <= max_price:
+ filtered.append(item)
+ break # Only add item once if any payment option matches
+ except (ValueError, TypeError):
+ continue
+ return filtered
+
+
+def limit_services(
+ services: List[Dict[str, Any]], max_count: int
+) -> List[Dict[str, Any]]:
+ """
+ Limit the number of services returned.
+
+ Args:
+ services: List of service dictionaries.
+ max_count: Maximum number of services to return.
+
+ Returns:
+ List containing at most max_count services.
+
+ Example:
+ ```python
+ all_services = result["items"]
+ limited = limit_services(all_services, max_count=10)
+ ```
+ """
+ return services[:max_count]
+
+
+async def get_x402_services(
+ limit: Optional[int] = None,
+ max_price: Optional[int] = None,
+ offset: int = 0,
+) -> List[Dict[str, Any]]:
+ """
+ Get x402 services with optional filtering by count and price.
+
+ This is a convenience function that queries the API and applies filters.
+
+ Args:
+ limit: Optional maximum number of services to return.
+ max_price: Optional maximum price in atomic units to filter by.
+ offset: Pagination offset for the API request. Defaults to 0.
+
+ Returns:
+ List of service dictionaries matching the criteria.
+
+ Example:
+ ```python
+ # Get first 10 services under $0.10 USDC (100000 atomic units with 6 decimals)
+ services = await get_x402_services(limit=10, max_price=100000)
+ for service in services:
+ print(service["resource"])
+ ```
+ """
+ result = await query_x402_services(
+ limit=limit, max_price=max_price, offset=offset
+ )
+
+ return result.get("items", [])
+
+
+def get_x402_services_sync(
+ limit: Optional[int] = None,
+ max_price: Optional[int] = None,
+ offset: int = 0,
+) -> str:
+ """
+ Synchronous wrapper for get_x402_services that returns a formatted string.
+
+ Args:
+ limit: Optional maximum number of services to return.
+ max_price: Optional maximum price in atomic units to filter by.
+ offset: Pagination offset for the API request. Defaults to 0.
+
+ Returns:
+ JSON-formatted string of service dictionaries matching the criteria.
+
+ Example:
+ ```python
+ # Get first 10 services under $0.10 USDC
+ services_str = get_x402_services_sync(limit=10, max_price=100000)
+ print(services_str)
+ ```
+ """
+ services = asyncio.run(
+ get_x402_services(
+ limit=limit, max_price=max_price, offset=offset
+ )
+ )
+ return str(services)
+
+
+agent = Agent(
+ agent_name="X402-Discovery-Agent",
+ agent_description="A agent that queries the x402 discovery services from the Coinbase CDP API.",
+ model_name="claude-haiku-4-5",
+ dynamic_temperature_enabled=True,
+ max_loops=1,
+ dynamic_context_window=True,
+ tools=[get_x402_services_sync],
+ top_p=None,
+ # temperature=0.0,
+ temperature=None,
+ tool_call_summary=True,
+)
+
+if __name__ == "__main__":
+
+ # Run the agent
+ out = agent.run(
+ task="Summarize the first 10 services under 100000 atomic units (e.g., $0.10 USDC)"
+ )
+ print(out)
diff --git a/examples/rag/README.md b/examples/rag/README.md
deleted file mode 100644
index bde13960..00000000
--- a/examples/rag/README.md
+++ /dev/null
@@ -1,6 +0,0 @@
-# RAG (Retrieval Augmented Generation) Examples
-
-This directory contains examples demonstrating RAG implementations and vector database integrations in Swarms.
-
-## Qdrant RAG
-- [qdrant_rag_example.py](qdrant_rag_example.py) - Complete Qdrant RAG implementation
diff --git a/examples/rag/qdrant_rag_example.py b/examples/rag/qdrant_rag_example.py
deleted file mode 100644
index e9209970..00000000
--- a/examples/rag/qdrant_rag_example.py
+++ /dev/null
@@ -1,98 +0,0 @@
-"""
-Agent with Qdrant RAG (Retrieval-Augmented Generation)
-
-This example demonstrates using Qdrant as a vector database for RAG operations,
-allowing agents to store and retrieve documents for enhanced context.
-"""
-
-from qdrant_client import QdrantClient, models
-from swarms import Agent
-from swarms_memory import QdrantDB
-
-
-# Initialize Qdrant client
-# Option 1: In-memory (for testing/development - data is not persisted)
-# client = QdrantClient(":memory:")
-
-# Option 2: Local Qdrant server
-# client = QdrantClient(host="localhost", port=6333)
-
-# Option 3: Qdrant Cloud (recommended for production)
-import os
-
-client = QdrantClient(
- url=os.getenv("QDRANT_URL", "https://your-cluster.qdrant.io"),
- api_key=os.getenv("QDRANT_API_KEY", "your-api-key"),
-)
-
-# Create QdrantDB wrapper for RAG operations
-rag_db = QdrantDB(
- client=client,
- embedding_model="text-embedding-3-small",
- collection_name="knowledge_base",
- distance=models.Distance.COSINE,
- n_results=3,
-)
-
-# Add documents to the knowledge base
-documents = [
- "Qdrant is a vector database optimized for similarity search and AI applications.",
- "RAG combines retrieval and generation for more accurate AI responses.",
- "Vector embeddings enable semantic search across documents.",
- "The swarms framework supports multiple memory backends including Qdrant.",
-]
-
-# Method 1: Add documents individually
-for doc in documents:
- rag_db.add(doc)
-
-# Method 2: Batch add documents (more efficient for large datasets)
-# Example with metadata
-# documents_with_metadata = [
-# "Machine learning is a subset of artificial intelligence.",
-# "Deep learning uses neural networks with multiple layers.",
-# "Natural language processing enables computers to understand human language.",
-# "Computer vision allows machines to interpret visual information.",
-# "Reinforcement learning learns through interaction with an environment."
-# ]
-#
-# metadata = [
-# {"category": "AI", "difficulty": "beginner", "topic": "overview"},
-# {"category": "ML", "difficulty": "intermediate", "topic": "neural_networks"},
-# {"category": "NLP", "difficulty": "intermediate", "topic": "language"},
-# {"category": "CV", "difficulty": "advanced", "topic": "vision"},
-# {"category": "RL", "difficulty": "advanced", "topic": "learning"}
-# ]
-#
-# # Batch add with metadata
-# doc_ids = rag_db.batch_add(documents_with_metadata, metadata=metadata, batch_size=3)
-# print(f"Added {len(doc_ids)} documents in batch")
-#
-# # Query with metadata return
-# results_with_metadata = rag_db.query(
-# "What is artificial intelligence?",
-# n_results=3,
-# return_metadata=True
-# )
-#
-# for i, result in enumerate(results_with_metadata):
-# print(f"\nResult {i+1}:")
-# print(f" Document: {result['document']}")
-# print(f" Category: {result['category']}")
-# print(f" Difficulty: {result['difficulty']}")
-# print(f" Topic: {result['topic']}")
-# print(f" Score: {result['score']:.4f}")
-
-# Create agent with RAG capabilities
-agent = Agent(
- agent_name="RAG-Agent",
- agent_description="Agent with Qdrant-powered RAG for enhanced knowledge retrieval",
- model_name="gpt-4.1",
- max_loops=1,
- dynamic_temperature_enabled=True,
- long_term_memory=rag_db,
-)
-
-# Query with RAG
-response = agent.run("What is Qdrant and how does it relate to RAG?")
-print(response)
diff --git a/examples/reasoning_agents/README.md b/examples/reasoning_agents/README.md
index 30292db1..8d6444c7 100644
--- a/examples/reasoning_agents/README.md
+++ b/examples/reasoning_agents/README.md
@@ -2,11 +2,25 @@
This directory contains examples demonstrating advanced reasoning capabilities and agent evaluation systems in Swarms.
+## Reasoning Agent Router Examples
+
+The `reasoning_agent_router_examples/` folder contains simple examples for each agent type supported by the `ReasoningAgentRouter`:
+
+- [reasoning_duo_example.py](reasoning_agent_router_examples/reasoning_duo_example.py) - Reasoning Duo agent for collaborative reasoning
+- [self_consistency_example.py](reasoning_agent_router_examples/self_consistency_example.py) - Self-Consistency agent with multiple samples
+- [ire_example.py](reasoning_agent_router_examples/ire_example.py) - Iterative Reflective Expansion (IRE) agent
+- [agent_judge_example.py](reasoning_agent_router_examples/agent_judge_example.py) - Agent Judge for evaluation and judgment
+- [reflexion_agent_example.py](reasoning_agent_router_examples/reflexion_agent_example.py) - Reflexion agent with memory capabilities
+- [gkp_agent_example.py](reasoning_agent_router_examples/gkp_agent_example.py) - Generated Knowledge Prompting (GKP) agent
+
## Agent Judge Examples
+
+The `agent_judge_examples/` folder contains detailed examples of the AgentJudge system:
+
- [example1_basic_evaluation.py](agent_judge_examples/example1_basic_evaluation.py) - Basic agent evaluation
- [example2_technical_evaluation.py](agent_judge_examples/example2_technical_evaluation.py) - Technical evaluation criteria
- [example3_creative_evaluation.py](agent_judge_examples/example3_creative_evaluation.py) - Creative evaluation patterns
-## O3 Integration
-- [example_o3.py](example_o3.py) - O3 model integration example
-- [o3_agent.py](o3_agent.py) - O3 agent implementation
+## Self-MoA Sequential Examples
+
+- [moa_seq_example.py](moa_seq_example.py) - Self-MoA Sequential reasoning example for complex problem-solving
diff --git a/examples/reasoning_agents/example_o3.py b/examples/reasoning_agents/example_o3.py
deleted file mode 100644
index 48e01870..00000000
--- a/examples/reasoning_agents/example_o3.py
+++ /dev/null
@@ -1,18 +0,0 @@
-from swarms.utils.litellm_wrapper import LiteLLM
-
-# Initialize the LiteLLM wrapper with reasoning support
-llm = LiteLLM(
- model_name="claude-sonnet-4-20250514", # OpenAI o3 model with reasoning
- reasoning_effort="low", # Enable reasoning with high effort
- temperature=1,
- max_tokens=2000,
- stream=False,
- thinking_tokens=1024,
-)
-
-# Example task that would benefit from reasoning
-task = "Solve this step-by-step: A farmer has 17 sheep and all but 9 die. How many sheep does he have left?"
-
-print("=== Running reasoning model ===")
-response = llm.run(task)
-print(response)
diff --git a/examples/reasoning_agents/reasoning_agent_router_examples/agent_judge_example.py b/examples/reasoning_agents/reasoning_agent_router_examples/agent_judge_example.py
new file mode 100644
index 00000000..64b67d21
--- /dev/null
+++ b/examples/reasoning_agents/reasoning_agent_router_examples/agent_judge_example.py
@@ -0,0 +1,9 @@
+from swarms.agents.reasoning_agents import ReasoningAgentRouter
+
+router = ReasoningAgentRouter(
+ swarm_type="AgentJudge",
+ model_name="gpt-4o-mini",
+ max_loops=1,
+)
+
+result = router.run("Is Python a good programming language?")
diff --git a/examples/reasoning_agents/reasoning_agent_router_examples/gkp_agent_example.py b/examples/reasoning_agents/reasoning_agent_router_examples/gkp_agent_example.py
new file mode 100644
index 00000000..e6dbb60e
--- /dev/null
+++ b/examples/reasoning_agents/reasoning_agent_router_examples/gkp_agent_example.py
@@ -0,0 +1,9 @@
+from swarms.agents.reasoning_agents import ReasoningAgentRouter
+
+router = ReasoningAgentRouter(
+ swarm_type="GKPAgent",
+ model_name="gpt-4o-mini",
+ num_knowledge_items=3,
+)
+
+result = router.run("What is artificial intelligence?")
diff --git a/examples/reasoning_agents/reasoning_agent_router_examples/ire_example.py b/examples/reasoning_agents/reasoning_agent_router_examples/ire_example.py
new file mode 100644
index 00000000..b1f3bc00
--- /dev/null
+++ b/examples/reasoning_agents/reasoning_agent_router_examples/ire_example.py
@@ -0,0 +1,11 @@
+from swarms.agents.reasoning_agents import ReasoningAgentRouter
+
+router = ReasoningAgentRouter(
+ swarm_type="ire",
+ model_name="gpt-4o-mini",
+ num_samples=1,
+)
+
+result = router.run("Explain photosynthesis in one sentence.")
+print(result)
+
diff --git a/examples/reasoning_agents/reasoning_agent_router_examples/reasoning_duo_example.py b/examples/reasoning_agents/reasoning_agent_router_examples/reasoning_duo_example.py
new file mode 100644
index 00000000..161fd590
--- /dev/null
+++ b/examples/reasoning_agents/reasoning_agent_router_examples/reasoning_duo_example.py
@@ -0,0 +1,9 @@
+from swarms.agents.reasoning_agents import ReasoningAgentRouter
+
+router = ReasoningAgentRouter(
+ swarm_type="reasoning-duo",
+ model_name="gpt-4o-mini",
+ max_loops=1,
+)
+
+result = router.run("What is 2+2?")
diff --git a/examples/reasoning_agents/reasoning_agent_router_examples/reflexion_agent_example.py b/examples/reasoning_agents/reasoning_agent_router_examples/reflexion_agent_example.py
new file mode 100644
index 00000000..97bd8ebe
--- /dev/null
+++ b/examples/reasoning_agents/reasoning_agent_router_examples/reflexion_agent_example.py
@@ -0,0 +1,10 @@
+from swarms.agents.reasoning_agents import ReasoningAgentRouter
+
+router = ReasoningAgentRouter(
+ swarm_type="ReflexionAgent",
+ model_name="gpt-4o-mini",
+ max_loops=1,
+ memory_capacity=3,
+)
+
+result = router.run("What is machine learning?")
diff --git a/examples/reasoning_agents/reasoning_agent_router_examples/self_consistency_example.py b/examples/reasoning_agents/reasoning_agent_router_examples/self_consistency_example.py
new file mode 100644
index 00000000..1e1c394b
--- /dev/null
+++ b/examples/reasoning_agents/reasoning_agent_router_examples/self_consistency_example.py
@@ -0,0 +1,10 @@
+from swarms.agents.reasoning_agents import ReasoningAgentRouter
+
+router = ReasoningAgentRouter(
+ swarm_type="self-consistency",
+ model_name="gpt-4o-mini",
+ max_loops=1,
+ num_samples=3,
+)
+
+result = router.run("What is the capital of France?")
diff --git a/agent_mcp.py b/examples/single_agent/agent_mcp.py
similarity index 100%
rename from agent_mcp.py
rename to examples/single_agent/agent_mcp.py
diff --git a/examples/reasoning_agents/o3_agent.py b/examples/single_agent/llms/o3_agent.py
similarity index 100%
rename from examples/reasoning_agents/o3_agent.py
rename to examples/single_agent/llms/o3_agent.py
diff --git a/examples/single_agent/rag/qdrant_rag_example.py b/examples/single_agent/rag/qdrant_rag_example.py
index 2b88cf4a..87caf17a 100644
--- a/examples/single_agent/rag/qdrant_rag_example.py
+++ b/examples/single_agent/rag/qdrant_rag_example.py
@@ -1,882 +1,91 @@
-"""
-Qdrant RAG Example with Document Ingestion
-
-This example demonstrates how to use the agent structure from example.py with Qdrant RAG
-to ingest a vast array of PDF documents and text files for advanced quantitative trading analysis.
-
-Features:
-- Document ingestion from multiple file types (PDF, TXT, MD)
-- Qdrant vector database integration
-- Sentence transformer embeddings
-- Comprehensive document processing pipeline
-- Agent with RAG capabilities for financial analysis
-"""
-
-import os
-import uuid
-from datetime import datetime
-from pathlib import Path
-from typing import Dict, List, Optional, Union
-import concurrent.futures
-from concurrent.futures import ThreadPoolExecutor
-
-from qdrant_client import QdrantClient
-from qdrant_client.http import models
-from qdrant_client.http.models import Distance, VectorParams
-from sentence_transformers import SentenceTransformer
-
+from qdrant_client import QdrantClient, models
from swarms import Agent
-from swarms.utils.pdf_to_text import pdf_to_text
-from swarms.utils.data_to_text import data_to_text
-
-
-class DocumentProcessor:
- """
- Handles document processing and text extraction from various file formats.
-
- This class provides functionality to process PDF, TXT, and Markdown files,
- extracting text content for vectorization and storage in the RAG system.
- """
-
- def __init__(
- self, supported_extensions: Optional[List[str]] = None
- ):
- """
- Initialize the DocumentProcessor.
-
- Args:
- supported_extensions: List of supported file extensions.
- Defaults to ['.pdf', '.txt', '.md']
- """
- if supported_extensions is None:
- supported_extensions = [".pdf", ".txt", ".md"]
-
- self.supported_extensions = supported_extensions
-
- def process_document(
- self, file_path: Union[str, Path]
- ) -> Optional[Dict[str, str]]:
- """
- Process a single document and extract its text content.
-
- Args:
- file_path: Path to the document file
-
- Returns:
- Dictionary containing document metadata and extracted text, or None if processing fails
- """
- file_path = Path(file_path)
-
- if not file_path.exists():
- print(f"File not found: {file_path}")
- return None
-
- if file_path.suffix.lower() not in self.supported_extensions:
- print(f"Unsupported file type: {file_path.suffix}")
- return None
-
- try:
- # Extract text based on file type
- if file_path.suffix.lower() == ".pdf":
- try:
- text_content = pdf_to_text(str(file_path))
- except Exception as pdf_error:
- print(f"Error extracting PDF text: {pdf_error}")
- # Fallback: try to read as text file
- with open(
- file_path,
- "r",
- encoding="utf-8",
- errors="ignore",
- ) as f:
- text_content = f.read()
- else:
- try:
- text_content = data_to_text(str(file_path))
- except Exception as data_error:
- print(f"Error extracting text: {data_error}")
- # Fallback: try to read as text file
- with open(
- file_path,
- "r",
- encoding="utf-8",
- errors="ignore",
- ) as f:
- text_content = f.read()
-
- # Ensure text_content is a string
- if callable(text_content):
- print(
- f"Warning: {file_path} returned a callable, trying to call it..."
- )
- try:
- text_content = text_content()
- except Exception as call_error:
- print(f"Error calling callable: {call_error}")
- return None
-
- if not text_content or not isinstance(text_content, str):
- print(
- f"No valid text content extracted from: {file_path}"
- )
- return None
-
- # Clean the text content
- text_content = str(text_content).strip()
-
- return {
- "file_path": str(file_path),
- "file_name": file_path.name,
- "file_type": file_path.suffix.lower(),
- "text_content": text_content,
- "file_size": file_path.stat().st_size,
- "processed_at": datetime.utcnow().isoformat(),
- }
-
- except Exception as e:
- print(f"Error processing {file_path}: {str(e)}")
- return None
-
- def process_directory(
- self, directory_path: Union[str, Path], max_workers: int = 4
- ) -> List[Dict[str, str]]:
- """
- Process all supported documents in a directory concurrently.
-
- Args:
- directory_path: Path to the directory containing documents
- max_workers: Maximum number of concurrent workers for processing
-
- Returns:
- List of processed document dictionaries
- """
- directory_path = Path(directory_path)
-
- if not directory_path.is_dir():
- print(f"Directory not found: {directory_path}")
- return []
-
- # Find all supported files
- supported_files = []
- for ext in self.supported_extensions:
- supported_files.extend(directory_path.rglob(f"*{ext}"))
- supported_files.extend(
- directory_path.rglob(f"*{ext.upper()}")
- )
-
- if not supported_files:
- print(f"No supported files found in: {directory_path}")
- return []
-
- print(f"Found {len(supported_files)} files to process")
-
- # Process files concurrently
- processed_documents = []
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- future_to_file = {
- executor.submit(
- self.process_document, file_path
- ): file_path
- for file_path in supported_files
- }
-
- for future in concurrent.futures.as_completed(
- future_to_file
- ):
- file_path = future_to_file[future]
- try:
- result = future.result()
- if result:
- processed_documents.append(result)
- print(f"Processed: {result['file_name']}")
- except Exception as e:
- print(f"Error processing {file_path}: {str(e)}")
-
- print(
- f"Successfully processed {len(processed_documents)} documents"
- )
- return processed_documents
-
-
-class QdrantRAGMemory:
- """
- Enhanced Qdrant memory system for RAG operations with document storage.
-
- This class extends the basic Qdrant memory system to handle document ingestion,
- chunking, and semantic search for large document collections.
- """
-
- def __init__(
- self,
- collection_name: str = "document_memories",
- vector_size: int = 384, # Default size for all-MiniLM-L6-v2
- url: Optional[str] = None,
- api_key: Optional[str] = None,
- chunk_size: int = 1000,
- chunk_overlap: int = 200,
- ):
- """
- Initialize the Qdrant RAG memory system.
-
- Args:
- collection_name: Name of the Qdrant collection to use
- vector_size: Dimension of the embedding vectors
- url: Optional Qdrant server URL (defaults to local)
- api_key: Optional Qdrant API key for cloud deployment
- chunk_size: Size of text chunks for processing
- chunk_overlap: Overlap between consecutive chunks
- """
- self.collection_name = collection_name
- self.vector_size = vector_size
- self.chunk_size = chunk_size
- self.chunk_overlap = chunk_overlap
-
- # Initialize Qdrant client
- if url and api_key:
- self.client = QdrantClient(url=url, api_key=api_key)
- else:
- self.client = QdrantClient(
- ":memory:"
- ) # Local in-memory storage
-
- # Initialize embedding model
- self.embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
-
- # Get the actual embedding dimension from the model
- sample_text = "Sample text for dimension check"
- sample_embedding = self.embedding_model.encode(sample_text)
- actual_dimension = len(sample_embedding)
-
- # Update vector_size to match the actual model dimension
- if actual_dimension != self.vector_size:
- print(
- f"Updating vector size from {self.vector_size} to {actual_dimension} to match model"
- )
- self.vector_size = actual_dimension
-
- # Create collection if it doesn't exist
- self._create_collection()
-
- def _create_collection(self):
- """Create the Qdrant collection if it doesn't exist."""
- collections = self.client.get_collections().collections
- exists = any(
- col.name == self.collection_name for col in collections
- )
-
- if not exists:
- self.client.create_collection(
- collection_name=self.collection_name,
- vectors_config=VectorParams(
- size=self.vector_size, distance=Distance.COSINE
- ),
- )
- print(
- f"Created Qdrant collection: {self.collection_name}"
- )
-
- def _chunk_text(self, text: str) -> List[str]:
- """
- Split text into overlapping chunks for better retrieval.
-
- Args:
- text: Text content to chunk
-
- Returns:
- List of text chunks
- """
- # Ensure text is a string
- if not isinstance(text, str):
- text = str(text)
-
- if len(text) <= self.chunk_size:
- return [text]
-
- chunks = []
- start = 0
-
- while start < len(text):
- end = start + self.chunk_size
-
- # Try to break at sentence boundaries
- if end < len(text):
- # Look for sentence endings
- for i in range(end, max(start, end - 100), -1):
- if text[i] in ".!?":
- end = i + 1
- break
-
- chunk = text[start:end].strip()
- if chunk:
- chunks.append(chunk)
-
- start = end - self.chunk_overlap
- if start >= len(text):
- break
-
- return chunks
-
- def add_document(
- self, document_data: Dict[str, str]
- ) -> List[str]:
- """
- Add a document to the memory system with chunking.
-
- Args:
- document_data: Dictionary containing document information
-
- Returns:
- List of memory IDs for the stored chunks
- """
- text_content = document_data["text_content"]
-
- # Ensure text_content is a string
- if not isinstance(text_content, str):
- print(
- f"Warning: text_content is not a string: {type(text_content)}"
- )
- text_content = str(text_content)
-
- chunks = self._chunk_text(text_content)
-
- memory_ids = []
-
- for i, chunk in enumerate(chunks):
- # Generate embedding for the chunk
- embedding = self.embedding_model.encode(chunk).tolist()
-
- # Prepare metadata
- metadata = {
- "document_name": document_data["file_name"],
- "document_path": document_data["file_path"],
- "document_type": document_data["file_type"],
- "chunk_index": i,
- "total_chunks": len(chunks),
- "chunk_text": chunk,
- "timestamp": datetime.utcnow().isoformat(),
- "file_size": document_data["file_size"],
- }
-
- # Store the chunk
- memory_id = str(uuid.uuid4())
- self.client.upsert(
- collection_name=self.collection_name,
- points=[
- models.PointStruct(
- id=memory_id,
- payload=metadata,
- vector=embedding,
- )
- ],
- )
-
- memory_ids.append(memory_id)
-
- print(
- f"Added document '{document_data['file_name']}' with {len(chunks)} chunks"
- )
- return memory_ids
-
- def add_documents_batch(
- self, documents: List[Dict[str, str]]
- ) -> List[str]:
- """
- Add multiple documents to the memory system.
+from swarms_memory import QdrantDB
- Args:
- documents: List of document dictionaries
- Returns:
- List of all memory IDs
- """
- all_memory_ids = []
+# Initialize Qdrant client
+# Option 1: In-memory (for testing/development - data is not persisted)
+# client = QdrantClient(":memory:")
- for document in documents:
- memory_ids = self.add_document(document)
- all_memory_ids.extend(memory_ids)
-
- return all_memory_ids
-
- def add(self, text: str, metadata: Optional[Dict] = None) -> str:
- """
- Add a text entry to the memory system (required by Swarms interface).
-
- Args:
- text: The text content to add
- metadata: Optional metadata for the entry
-
- Returns:
- str: ID of the stored memory
- """
- if metadata is None:
- metadata = {}
-
- # Generate embedding for the text
- embedding = self.embedding_model.encode(text).tolist()
-
- # Prepare metadata
- memory_metadata = {
- "text": text,
- "timestamp": datetime.utcnow().isoformat(),
- "source": "agent_memory",
- }
- memory_metadata.update(metadata)
-
- # Store the point
- memory_id = str(uuid.uuid4())
- self.client.upsert(
- collection_name=self.collection_name,
- points=[
- models.PointStruct(
- id=memory_id,
- payload=memory_metadata,
- vector=embedding,
- )
- ],
- )
-
- return memory_id
-
- def query(
- self,
- query_text: str,
- limit: int = 5,
- score_threshold: float = 0.7,
- include_metadata: bool = True,
- ) -> List[Dict]:
- """
- Query memories based on text similarity.
-
- Args:
- query_text: The text query to search for
- limit: Maximum number of results to return
- score_threshold: Minimum similarity score threshold
- include_metadata: Whether to include metadata in results
-
- Returns:
- List of matching memories with their metadata
- """
- try:
- # Check if collection has any points
- collection_info = self.client.get_collection(
- self.collection_name
- )
- if collection_info.points_count == 0:
- print(
- "Warning: Collection is empty, no documents to query"
- )
- return []
-
- # Generate embedding for the query
- query_embedding = self.embedding_model.encode(
- query_text
- ).tolist()
-
- # Search in Qdrant
- results = self.client.search(
- collection_name=self.collection_name,
- query_vector=query_embedding,
- limit=limit,
- score_threshold=score_threshold,
- )
-
- memories = []
- for res in results:
- memory = res.payload.copy()
- memory["similarity_score"] = res.score
-
- if not include_metadata:
- # Keep only essential information
- memory = {
- "chunk_text": memory.get("chunk_text", ""),
- "document_name": memory.get(
- "document_name", ""
- ),
- "similarity_score": memory[
- "similarity_score"
- ],
- }
-
- memories.append(memory)
-
- return memories
-
- except Exception as e:
- print(f"Error querying collection: {e}")
- return []
-
- def get_collection_stats(self) -> Dict:
- """
- Get statistics about the collection.
-
- Returns:
- Dictionary containing collection statistics
- """
- try:
- collection_info = self.client.get_collection(
- self.collection_name
- )
- return {
- "collection_name": self.collection_name,
- "vector_size": collection_info.config.params.vectors.size,
- "distance": collection_info.config.params.vectors.distance,
- "points_count": collection_info.points_count,
- }
- except Exception as e:
- print(f"Error getting collection stats: {e}")
- return {}
-
- def clear_collection(self):
- """Clear all memories from the collection."""
- self.client.delete_collection(self.collection_name)
- self._create_collection()
- print(f"Cleared collection: {self.collection_name}")
-
-
-class QuantitativeTradingRAGAgent:
- """
- Advanced quantitative trading agent with RAG capabilities for document analysis.
-
- This agent combines the structure from example.py with Qdrant RAG to provide
- comprehensive financial analysis based on ingested documents.
- """
-
- def __init__(
- self,
- agent_name: str = "Quantitative-Trading-RAG-Agent",
- collection_name: str = "financial_documents",
- qdrant_url: Optional[str] = None,
- qdrant_api_key: Optional[str] = None,
- model_name: str = "claude-sonnet-4-20250514",
- max_loops: int = 1,
- chunk_size: int = 1000,
- chunk_overlap: int = 200,
- ):
- """
- Initialize the Quantitative Trading RAG Agent.
-
- Args:
- agent_name: Name of the agent
- collection_name: Name of the Qdrant collection
- qdrant_url: Optional Qdrant server URL
- qdrant_api_key: Optional Qdrant API key
- model_name: LLM model to use
- max_loops: Maximum number of agent loops
- chunk_size: Size of text chunks for processing
- chunk_overlap: Overlap between consecutive chunks
- """
- self.agent_name = agent_name
- self.collection_name = collection_name
-
- # Initialize document processor
- self.document_processor = DocumentProcessor()
-
- # Initialize Qdrant RAG memory
- self.rag_memory = QdrantRAGMemory(
- collection_name=collection_name,
- url=qdrant_url,
- api_key=qdrant_api_key,
- chunk_size=chunk_size,
- chunk_overlap=chunk_overlap,
- )
-
- # Initialize the agent with RAG capabilities
- self.agent = Agent(
- agent_name=agent_name,
- agent_description="Advanced quantitative trading and algorithmic analysis agent with RAG capabilities",
- system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- - Algorithmic trading strategies and implementation
- - Statistical arbitrage and market making
- - Risk management and portfolio optimization
- - High-frequency trading systems
- - Market microstructure analysis
- - Quantitative research methodologies
- - Financial mathematics and stochastic processes
- - Machine learning applications in trading
-
- Your core responsibilities include:
- 1. Developing and backtesting trading strategies
- 2. Analyzing market data and identifying alpha opportunities
- 3. Implementing risk management frameworks
- 4. Optimizing portfolio allocations
- 5. Conducting quantitative research
- 6. Monitoring market microstructure
- 7. Evaluating trading system performance
-
- You have access to a comprehensive document database through RAG (Retrieval-Augmented Generation).
- When answering questions, you can search through this database to find relevant information
- and provide evidence-based responses.
-
- You maintain strict adherence to:
- - Mathematical rigor in all analyses
- - Statistical significance in strategy development
- - Risk-adjusted return optimization
- - Market impact minimization
- - Regulatory compliance
- - Transaction cost analysis
- - Performance attribution
-
- You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
- model_name=model_name,
- dynamic_temperature_enabled=True,
- output_type="str-all-except-first",
- max_loops=max_loops,
- dynamic_context_window=True,
- long_term_memory=self.rag_memory,
- )
-
- def ingest_documents(
- self, documents_path: Union[str, Path]
- ) -> int:
- """
- Ingest documents from a directory into the RAG system.
-
- Args:
- documents_path: Path to directory containing documents
-
- Returns:
- Number of documents successfully ingested
- """
- print(f"Starting document ingestion from: {documents_path}")
-
- try:
- # Process documents
- processed_documents = (
- self.document_processor.process_directory(
- documents_path
- )
- )
-
- if not processed_documents:
- print("No documents to ingest")
- return 0
-
- # Add documents to RAG memory
- memory_ids = self.rag_memory.add_documents_batch(
- processed_documents
- )
-
- print(
- f"Successfully ingested {len(processed_documents)} documents"
- )
- print(f"Created {len(memory_ids)} memory chunks")
-
- return len(processed_documents)
-
- except Exception as e:
- print(f"Error during document ingestion: {e}")
- import traceback
-
- traceback.print_exc()
- return 0
-
- def query_documents(
- self, query: str, limit: int = 5
- ) -> List[Dict]:
- """
- Query the document database for relevant information.
-
- Args:
- query: The query text
- limit: Maximum number of results to return
-
- Returns:
- List of relevant document chunks
- """
- return self.rag_memory.query(query, limit=limit)
-
- def run_analysis(self, task: str) -> str:
- """
- Run a financial analysis task using the agent with RAG capabilities.
-
- Args:
- task: The analysis task to perform
-
- Returns:
- Agent's response to the task
- """
- print(f"Running analysis task: {task}")
-
- # First, query the document database for relevant context
- relevant_docs = self.query_documents(task, limit=3)
-
- if relevant_docs:
- # Enhance the task with relevant document context
- context = "\n\nRelevant Document Information:\n"
- for i, doc in enumerate(relevant_docs, 1):
- context += f"\nDocument {i}: {doc.get('document_name', 'Unknown')}\n"
- context += f"Relevance Score: {doc.get('similarity_score', 0):.3f}\n"
- context += (
- f"Content: {doc.get('chunk_text', '')[:500]}...\n"
- )
-
- enhanced_task = f"{task}\n\n{context}"
- else:
- enhanced_task = task
-
- # Run the agent
- response = self.agent.run(enhanced_task)
- return response
-
- def get_database_stats(self) -> Dict:
- """
- Get statistics about the document database.
-
- Returns:
- Dictionary containing database statistics
- """
- return self.rag_memory.get_collection_stats()
-
-
-def main():
- """
- Main function demonstrating the Qdrant RAG agent with document ingestion.
- """
- from datetime import datetime
-
- # Example usage
- print("π Initializing Quantitative Trading RAG Agent...")
-
- # Initialize the agent (you can set environment variables for Qdrant cloud)
- agent = QuantitativeTradingRAGAgent(
- agent_name="Quantitative-Trading-RAG-Agent",
- collection_name="financial_documents",
- qdrant_url=os.getenv(
- "QDRANT_URL"
- ), # Optional: For cloud deployment
- qdrant_api_key=os.getenv(
- "QDRANT_API_KEY"
- ), # Optional: For cloud deployment
- model_name="claude-sonnet-4-20250514",
- max_loops=1,
- chunk_size=1000,
- chunk_overlap=200,
- )
-
- # Example: Ingest documents from a directory
- documents_path = "documents" # Path to your documents
- if os.path.exists(documents_path):
- print(f"Found documents directory: {documents_path}")
- try:
- agent.ingest_documents(documents_path)
- except Exception as e:
- print(f"Error ingesting documents: {e}")
- print("Continuing without document ingestion...")
- else:
- print(f"Documents directory not found: {documents_path}")
- print("Creating a sample document for demonstration...")
-
- # Create a sample document
- try:
- sample_doc = {
- "file_path": "sample_financial_analysis.txt",
- "file_name": "sample_financial_analysis.txt",
- "file_type": ".txt",
- "text_content": """
- Gold ETFs: A Comprehensive Investment Guide
-
- Gold ETFs (Exchange-Traded Funds) provide investors with exposure to gold prices
- without the need to physically store the precious metal. These funds track the
- price of gold and offer several advantages including liquidity, diversification,
- and ease of trading.
-
- Top Gold ETFs include:
- 1. SPDR Gold Shares (GLD) - Largest gold ETF with high liquidity
- 2. iShares Gold Trust (IAU) - Lower expense ratio alternative
- 3. Aberdeen Standard Physical Gold ETF (SGOL) - Swiss storage option
-
- Investment strategies for gold ETFs:
- - Portfolio diversification (5-10% allocation)
- - Inflation hedge
- - Safe haven during market volatility
- - Tactical trading opportunities
-
- Market analysis shows that gold has historically served as a store of value
- and hedge against inflation. Recent market conditions have increased interest
- in gold investments due to economic uncertainty and geopolitical tensions.
- """,
- "file_size": 1024,
- "processed_at": datetime.utcnow().isoformat(),
- }
-
- # Add the sample document to the RAG memory
- memory_ids = agent.rag_memory.add_document(sample_doc)
- print(
- f"Added sample document with {len(memory_ids)} chunks"
- )
-
- except Exception as e:
- print(f"Error creating sample document: {e}")
- print("Continuing without sample document...")
-
- # Example: Query the database
- print("\nπ Querying document database...")
- try:
- query_results = agent.query_documents(
- "gold ETFs investment strategies", limit=3
- )
- print(f"Found {len(query_results)} relevant document chunks")
-
- if query_results:
- print("Sample results:")
- for i, result in enumerate(query_results[:2], 1):
- print(
- f" {i}. {result.get('document_name', 'Unknown')} (Score: {result.get('similarity_score', 0):.3f})"
- )
- else:
- print(
- "No documents found in database. This is expected if no documents were ingested."
- )
- except Exception as e:
- print(f"β Query failed: {e}")
-
- # Example: Run financial analysis
- print("\nπΉ Running financial analysis...")
- analysis_task = "What are the best top 3 ETFs for gold coverage and what are their key characteristics?"
- try:
- response = agent.run_analysis(analysis_task)
- print("\nπ Analysis Results:")
- print(response)
- except Exception as e:
- print(f"β Analysis failed: {e}")
- print("This might be due to API key or model access issues.")
- print("Continuing with database statistics...")
-
- # Try a simpler query that doesn't require the LLM
- print("\nπ Trying simple document query instead...")
- try:
- simple_results = agent.query_documents(
- "what do you see in the document?", limit=2
- )
- if simple_results:
- print("Simple query results:")
- for i, result in enumerate(simple_results, 1):
- print(
- f" {i}. {result.get('document_name', 'Unknown')}"
- )
- print(
- f" Content preview: {result.get('chunk_text', '')[:100]}..."
- )
- else:
- print("No results from simple query")
- except Exception as simple_error:
- print(f"Simple query also failed: {simple_error}")
-
- # Get database statistics
- print("\nπ Database Statistics:")
- try:
- stats = agent.get_database_stats()
- for key, value in stats.items():
- print(f" {key}: {value}")
- except Exception as e:
- print(f"β Failed to get database statistics: {e}")
-
- print("\nβ
Example completed successfully!")
- print("π‘ To test with your own documents:")
- print(" 1. Create a 'documents' directory")
- print(" 2. Add PDF, TXT, or MD files")
- print(" 3. Run the script again")
+# Option 2: Local Qdrant server
+# client = QdrantClient(host="localhost", port=6333)
+# Option 3: Qdrant Cloud (recommended for production)
+import os
-if __name__ == "__main__":
- main()
+client = QdrantClient(
+ url=os.getenv("QDRANT_URL", "https://your-cluster.qdrant.io"),
+ api_key=os.getenv("QDRANT_API_KEY", "your-api-key"),
+)
+
+# Create QdrantDB wrapper for RAG operations
+rag_db = QdrantDB(
+ client=client,
+ embedding_model="text-embedding-3-small",
+ collection_name="knowledge_base",
+ distance=models.Distance.COSINE,
+ n_results=3,
+)
+
+# Add documents to the knowledge base
+documents = [
+ "Qdrant is a vector database optimized for similarity search and AI applications.",
+ "RAG combines retrieval and generation for more accurate AI responses.",
+ "Vector embeddings enable semantic search across documents.",
+ "The swarms framework supports multiple memory backends including Qdrant.",
+]
+
+# Method 1: Add documents individually
+for doc in documents:
+ rag_db.add(doc)
+
+# Method 2: Batch add documents (more efficient for large datasets)
+# Example with metadata
+# documents_with_metadata = [
+# "Machine learning is a subset of artificial intelligence.",
+# "Deep learning uses neural networks with multiple layers.",
+# "Natural language processing enables computers to understand human language.",
+# "Computer vision allows machines to interpret visual information.",
+# "Reinforcement learning learns through interaction with an environment."
+# ]
+#
+# metadata = [
+# {"category": "AI", "difficulty": "beginner", "topic": "overview"},
+# {"category": "ML", "difficulty": "intermediate", "topic": "neural_networks"},
+# {"category": "NLP", "difficulty": "intermediate", "topic": "language"},
+# {"category": "CV", "difficulty": "advanced", "topic": "vision"},
+# {"category": "RL", "difficulty": "advanced", "topic": "learning"}
+# ]
+#
+# # Batch add with metadata
+# doc_ids = rag_db.batch_add(documents_with_metadata, metadata=metadata, batch_size=3)
+# print(f"Added {len(doc_ids)} documents in batch")
+#
+# # Query with metadata return
+# results_with_metadata = rag_db.query(
+# "What is artificial intelligence?",
+# n_results=3,
+# return_metadata=True
+# )
+#
+# for i, result in enumerate(results_with_metadata):
+# print(f"\nResult {i+1}:")
+# print(f" Document: {result['document']}")
+# print(f" Category: {result['category']}")
+# print(f" Difficulty: {result['difficulty']}")
+# print(f" Topic: {result['topic']}")
+# print(f" Score: {result['score']:.4f}")
+
+# Create agent with RAG capabilities
+agent = Agent(
+ agent_name="RAG-Agent",
+ agent_description="Agent with Qdrant-powered RAG for enhanced knowledge retrieval",
+ model_name="gpt-4.1",
+ max_loops=1,
+ dynamic_temperature_enabled=True,
+ long_term_memory=rag_db,
+)
+
+# Query with RAG
+response = agent.run("What is Qdrant and how does it relate to RAG?")
+print(response)
diff --git a/pyproject.toml b/pyproject.toml
index 25cd5911..10ad1565 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
-version = "8.6.0"
+version = "8.6.3"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez "]
diff --git a/swarm_router_mv.py b/swarm_router_mv.py
new file mode 100644
index 00000000..dd691b78
--- /dev/null
+++ b/swarm_router_mv.py
@@ -0,0 +1,40 @@
+from swarms import SwarmRouter, Agent
+
+# Create specialized agents
+research_agent = Agent(
+ agent_name="Research-Analyst",
+ agent_description="Specialized in comprehensive research and data gathering",
+ model_name="gpt-4o-mini",
+ max_loops=1,
+ verbose=False,
+)
+
+analysis_agent = Agent(
+ agent_name="Data-Analyst",
+ agent_description="Expert in data analysis and pattern recognition",
+ model_name="gpt-4o-mini",
+ max_loops=1,
+ verbose=False,
+)
+
+strategy_agent = Agent(
+ agent_name="Strategy-Consultant",
+ agent_description="Specialized in strategic planning and recommendations",
+ model_name="gpt-4o-mini",
+ max_loops=1,
+ verbose=False,
+)
+
+router = SwarmRouter(
+ name="SwarmRouter",
+ description="Routes tasks to specialized agents based on their capabilities",
+ agents=[research_agent, analysis_agent, strategy_agent],
+ swarm_type="MajorityVoting",
+ max_loops=1,
+ verbose=False,
+)
+
+result = router.run(
+ "Conduct a research analysis on water stocks and etfs"
+)
+print(result)
\ No newline at end of file
diff --git a/swarms/agents/reasoning_agents.py b/swarms/agents/reasoning_agents.py
index be2e34fc..122ccb01 100644
--- a/swarms/agents/reasoning_agents.py
+++ b/swarms/agents/reasoning_agents.py
@@ -285,7 +285,11 @@ class ReasoningAgentRouter:
"""
try:
swarm = self.select_swarm()
- return swarm.run(task=task, *args, **kwargs)
+
+ if self.swarm_type == "ReflexionAgent":
+ return swarm.run(tasks=[task], *args, **kwargs)
+ else:
+ return swarm.run(task=task, *args, **kwargs)
except Exception as e:
raise ReasoningAgentExecutorError(
f"ReasoningAgentRouter Error: {e} Traceback: {traceback.format_exc()} If the error persists, please check the agent's configuration and try again. If you would like support book a call with our team at https://cal.com/swarms"
diff --git a/swarms/prompts/visual_cot.py b/swarms/prompts/visual_cot.py
index f33c72e1..e6701642 100644
--- a/swarms/prompts/visual_cot.py
+++ b/swarms/prompts/visual_cot.py
@@ -1,3 +1,8 @@
+"""
+A structured prompt template that guides models through step-by-step visual analysis, from observation to reflection.
+Provides a systematic chain-of-thought approach for analyzing images, graphs, and visual puzzles with detailed reasoning and visual references.
+"""
+
VISUAL_CHAIN_OF_THOUGHT = """
You, as the model, are presented with a visual problem. This could be an image containing various elements that you need to analyze, a graph that requires interpretation, or a visual puzzle. Your task is to examine the visual information carefully and describe your process of understanding and solving the problem.
diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py
index b24fac0f..7150d869 100644
--- a/swarms/structs/agent.py
+++ b/swarms/structs/agent.py
@@ -219,7 +219,8 @@ class Agent:
preset_stopping_token (bool): Enable preset stopping token
traceback (Any): The traceback
traceback_handlers (Any): The traceback handlers
- streaming_on (bool): Enable streaming
+ streaming_on (bool): Enable basic streaming with formatted panels
+ stream (bool): Enable detailed token-by-token streaming with metadata (citations, tokens used, etc.)
docs (List[str]): The list of documents
docs_folder (str): The folder containing the documents
verbose (bool): Enable verbose mode
@@ -311,9 +312,9 @@ class Agent:
>>> print(response)
>>> # Generate a report on the financials.
- >>> # Real-time streaming example
- >>> agent = Agent(model_name="gpt-4.1", max_loops=1, streaming_on=True)
- >>> response = agent.run("Tell me a long story.") # Will stream in real-time
+ >>> # Detailed token streaming example
+ >>> agent = Agent(model_name="gpt-4.1", max_loops=1, stream=True)
+ >>> response = agent.run("Tell me a story.") # Will stream each token with detailed metadata
>>> print(response) # Final complete response
>>> # Fallback model example
@@ -367,6 +368,7 @@ class Agent:
traceback: Optional[Any] = None,
traceback_handlers: Optional[Any] = None,
streaming_on: Optional[bool] = False,
+ stream: Optional[bool] = False,
docs: List[str] = None,
docs_folder: Optional[str] = None,
verbose: Optional[bool] = False,
@@ -518,6 +520,7 @@ class Agent:
self.traceback = traceback
self.traceback_handlers = traceback_handlers
self.streaming_on = streaming_on
+ self.stream = stream
self.docs = docs
self.docs_folder = docs_folder
self.verbose = verbose
@@ -1374,6 +1377,8 @@ class Agent:
)
elif self.streaming_on:
pass
+ elif self.stream:
+ pass
else:
self.pretty_print(
response, loop_count
@@ -2594,8 +2599,105 @@ class Agent:
del kwargs["is_last"]
try:
- # Set streaming parameter in LLM if streaming is enabled
- if self.streaming_on and hasattr(self.llm, "stream"):
+ if self.stream and hasattr(self.llm, "stream"):
+ original_stream = self.llm.stream
+ self.llm.stream = True
+
+ if img is not None:
+ streaming_response = self.llm.run(
+ task=task, img=img, *args, **kwargs
+ )
+ else:
+ streaming_response = self.llm.run(
+ task=task, *args, **kwargs
+ )
+
+ if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str):
+ complete_response = ""
+ token_count = 0
+ final_chunk = None
+ first_chunk = None
+
+ for chunk in streaming_response:
+ if first_chunk is None:
+ first_chunk = chunk
+
+ if hasattr(chunk, "choices") and chunk.choices[0].delta.content:
+ content = chunk.choices[0].delta.content
+ complete_response += content
+ token_count += 1
+
+ # Schema per token outputted
+ token_info = {
+ "token_index": token_count,
+ "model": getattr(chunk, 'model', self.get_current_model()),
+ "id": getattr(chunk, 'id', ''),
+ "created": getattr(chunk, 'created', int(time.time())),
+ "object": getattr(chunk, 'object', 'chat.completion.chunk'),
+ "token": content,
+ "system_fingerprint": getattr(chunk, 'system_fingerprint', ''),
+ "finish_reason": chunk.choices[0].finish_reason,
+ "citations": getattr(chunk, 'citations', None),
+ "provider_specific_fields": getattr(chunk, 'provider_specific_fields', None),
+ "service_tier": getattr(chunk, 'service_tier', 'default'),
+ "obfuscation": getattr(chunk, 'obfuscation', None),
+ "usage": getattr(chunk, 'usage', None),
+ "logprobs": chunk.choices[0].logprobs,
+ "timestamp": time.time()
+ }
+
+ print(f"ResponseStream {token_info}")
+
+ if streaming_callback is not None:
+ streaming_callback(token_info)
+
+ final_chunk = chunk
+
+ #Final ModelResponse to stream
+ if final_chunk and hasattr(final_chunk, 'usage') and final_chunk.usage:
+ usage = final_chunk.usage
+ print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', "
+ f"created={getattr(final_chunk, 'created', 'N/A')}, "
+ f"model='{getattr(final_chunk, 'model', self.get_current_model())}', "
+ f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', "
+ f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', "
+ f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', "
+ f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, "
+ f"function_call=None, tool_calls=None, audio=None), logprobs=None)], "
+ f"provider_specific_fields=None, "
+ f"usage=Usage(completion_tokens={usage.completion_tokens}, "
+ f"prompt_tokens={usage.prompt_tokens}, "
+ f"total_tokens={usage.total_tokens}, "
+ f"completion_tokens_details=CompletionTokensDetailsWrapper("
+ f"accepted_prediction_tokens={usage.completion_tokens_details.accepted_prediction_tokens}, "
+ f"audio_tokens={usage.completion_tokens_details.audio_tokens}, "
+ f"reasoning_tokens={usage.completion_tokens_details.reasoning_tokens}, "
+ f"rejected_prediction_tokens={usage.completion_tokens_details.rejected_prediction_tokens}, "
+ f"text_tokens={usage.completion_tokens_details.text_tokens}), "
+ f"prompt_tokens_details=PromptTokensDetailsWrapper("
+ f"audio_tokens={usage.prompt_tokens_details.audio_tokens}, "
+ f"cached_tokens={usage.prompt_tokens_details.cached_tokens}, "
+ f"text_tokens={usage.prompt_tokens_details.text_tokens}, "
+ f"image_tokens={usage.prompt_tokens_details.image_tokens})))")
+ else:
+ print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', "
+ f"created={getattr(final_chunk, 'created', 'N/A')}, "
+ f"model='{getattr(final_chunk, 'model', self.get_current_model())}', "
+ f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', "
+ f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', "
+ f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', "
+ f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, "
+ f"function_call=None, tool_calls=None, audio=None), logprobs=None)], "
+ f"provider_specific_fields=None)")
+
+
+ self.llm.stream = original_stream
+ return complete_response
+ else:
+ self.llm.stream = original_stream
+ return streaming_response
+
+ elif self.streaming_on and hasattr(self.llm, "stream"):
original_stream = self.llm.stream
self.llm.stream = True
@@ -3080,6 +3182,8 @@ class Agent:
if self.streaming_on:
pass
+ elif self.stream:
+ pass
if self.print_on:
formatter.print_panel(
diff --git a/swarms/structs/auto_swarm_builder.py b/swarms/structs/auto_swarm_builder.py
index 0a9bd689..514cb79c 100644
--- a/swarms/structs/auto_swarm_builder.py
+++ b/swarms/structs/auto_swarm_builder.py
@@ -16,7 +16,6 @@ load_dotenv()
execution_types = [
"return-agents",
- "execute-swarm-router",
"return-swarm-router-config",
"return-agents-objects",
]
diff --git a/swarms/structs/hiearchical_swarm.py b/swarms/structs/hiearchical_swarm.py
index 66edb444..1501ccb6 100644
--- a/swarms/structs/hiearchical_swarm.py
+++ b/swarms/structs/hiearchical_swarm.py
@@ -944,7 +944,8 @@ class HierarchicalSwarm:
if self.planning_enabled is True:
self.director.tools_list_dictionary = None
out = self.setup_director_with_planning(
- task=self.conversation.get_str(), img=img
+ task=f"History: {self.conversation.get_str()} \n\n Task: {task}",
+ img=img,
)
self.conversation.add(
role=self.director.agent_name, content=out
diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py
index 84256d8f..b078d970 100644
--- a/swarms/structs/swarm_router.py
+++ b/swarms/structs/swarm_router.py
@@ -2,7 +2,7 @@ import concurrent.futures
import json
import os
import traceback
-from typing import Any, Callable, Dict, List, Literal, Optional, Union
+from typing import Any, Callable, Dict, List, Literal, Optional, Union, get_args
from pydantic import BaseModel, Field
@@ -46,6 +46,7 @@ SwarmType = Literal[
"CouncilAsAJudge",
"InteractiveGroupChat",
"HeavySwarm",
+ "BatchedGridWorkflow",
]
@@ -272,6 +273,24 @@ class SwarmRouter:
"SwarmRouter: Swarm type cannot be 'none'. Check the docs for all the swarm types available. https://docs.swarms.world/en/latest/swarms/structs/swarm_router/"
)
+ # Validate swarm type is a valid string
+ valid_swarm_types = get_args(SwarmType)
+
+ if not isinstance(self.swarm_type, str):
+ raise SwarmRouterConfigError(
+ f"SwarmRouter: swarm_type must be a string, not {type(self.swarm_type).__name__}. "
+ f"Valid types are: {', '.join(valid_swarm_types)}. "
+ "Use swarm_type='SequentialWorkflow' (string), NOT SwarmType.SequentialWorkflow. "
+ "See https://docs.swarms.world/en/latest/swarms/structs/swarm_router/"
+ )
+
+ if self.swarm_type not in valid_swarm_types:
+ raise SwarmRouterConfigError(
+ f"SwarmRouter: Invalid swarm_type '{self.swarm_type}'. "
+ f"Valid types are: {', '.join(valid_swarm_types)}. "
+ "See https://docs.swarms.world/en/latest/swarms/structs/swarm_router/"
+ )
+
if (
self.swarm_type != "HeavySwarm"
and self.agents is None
@@ -423,7 +442,6 @@ class SwarmRouter:
max_loops=self.max_loops,
flow=self.rearrange_flow,
output_type=self.output_type,
- return_entire_history=self.return_entire_history,
*args,
**kwargs,
)
@@ -474,7 +492,6 @@ class SwarmRouter:
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
- return_all_history=self.return_entire_history,
output_type=self.output_type,
*args,
**kwargs,
@@ -499,7 +516,8 @@ class SwarmRouter:
name=self.name,
description=self.description,
agents=self.agents,
- consensus_agent=self.agents[-1],
+ max_loops=self.max_loops,
+ output_type=self.output_type,
*args,
**kwargs,
)
diff --git a/tests/structs/test_agent_stream_token.py b/tests/structs/test_agent_stream_token.py
new file mode 100644
index 00000000..5cd02207
--- /dev/null
+++ b/tests/structs/test_agent_stream_token.py
@@ -0,0 +1,9 @@
+from swarms.structs.agent import Agent
+
+agent = Agent(
+ model_name="gpt-4.1",
+ max_loops=1,
+ stream=True,
+)
+
+agent.run("Tell me a short story about a robot learning to paint.")