pull/712/head
Kye Gomez 1 week ago
parent 5365915ddd
commit 79e7d20d4f

@ -295,7 +295,7 @@ print(agent.model_dump_json())
print(agent.model_dump_yaml())
# Ingest documents into the agent's knowledge base
agent.ingest_docs("your_pdf_path.pdf")
("your_pdf_path.pdf")
# Receive a message from a user and process it
agent.receive_message(name="agent_name", message="message")

@ -9,16 +9,22 @@ from loguru import logger
# Configure loguru
LOG_PATH = "api_tests.log"
logger.add(LOG_PATH,
logger.add(
LOG_PATH,
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
rotation="1 day",
retention="7 days",
level="DEBUG"
level="DEBUG",
)
BASE_URL = (
"https://api.swarms.ai/v1" # Change this to match your server URL
)
BASE_URL = "https://api.swarms.ai/v1" # Change this to match your server URL
async def log_request_details(method: str, url: str, headers: dict, data: Any = None):
async def log_request_details(
method: str, url: str, headers: dict, data: Any = None
):
"""Log request details before sending."""
logger.debug(f"\n{'='*50}")
logger.debug(f"REQUEST: {method} {url}")
@ -26,38 +32,54 @@ async def log_request_details(method: str, url: str, headers: dict, data: Any =
if data:
logger.debug(f"PAYLOAD: {json.dumps(data, indent=2)}")
async def log_response_details(response: aiohttp.ClientResponse, data: Any = None):
async def log_response_details(
response: aiohttp.ClientResponse, data: Any = None
):
"""Log response details after receiving."""
logger.debug(f"\nRESPONSE Status: {response.status}")
logger.debug(f"RESPONSE Headers: {json.dumps(dict(response.headers), indent=2)}")
logger.debug(
f"RESPONSE Headers: {json.dumps(dict(response.headers), indent=2)}"
)
if data:
logger.debug(f"RESPONSE Body: {json.dumps(data, indent=2)}")
logger.debug(f"{'='*50}\n")
async def test_create_user(session: aiohttp.ClientSession) -> Dict[str, str]:
async def test_create_user(
session: aiohttp.ClientSession,
) -> Dict[str, str]:
"""Test user creation endpoint."""
url = f"{BASE_URL}/users"
payload = {"username": "test_user"}
logger.info("Testing user creation...")
await log_request_details("POST", url, {}, payload)
try:
async with session.post(url, json=payload) as response:
data = await response.json()
await log_response_details(response, data)
if response.status != 200:
logger.error(f"Failed to create user. Status: {response.status}, Response: {data}")
logger.error(
f"Failed to create user. Status: {response.status}, Response: {data}"
)
sys.exit(1)
logger.success("✓ Created user successfully")
return {"user_id": data["user_id"], "api_key": data["api_key"]}
return {
"user_id": data["user_id"],
"api_key": data["api_key"],
}
except Exception as e:
logger.exception(f"Exception in user creation: {str(e)}")
sys.exit(1)
async def test_create_agent(session: aiohttp.ClientSession, api_key: str) -> str:
async def test_create_agent(
session: aiohttp.ClientSession, api_key: str
) -> str:
"""Test agent creation endpoint."""
url = f"{BASE_URL}/agent"
config = {
@ -70,114 +92,142 @@ async def test_create_agent(session: aiohttp.ClientSession, api_key: str) -> str
"tags": ["test"],
"streaming_on": False,
"user_name": "test_user", # Added required field
"output_type": "string" # Added required field
"output_type": "string", # Added required field
}
headers = {"api-key": api_key}
logger.info("Testing agent creation...")
await log_request_details("POST", url, headers, config)
try:
async with session.post(url, headers=headers, json=config) as response:
async with session.post(
url, headers=headers, json=config
) as response:
data = await response.json()
await log_response_details(response, data)
if response.status != 200:
logger.error(f"Failed to create agent. Status: {response.status}, Response: {data}")
logger.error(
f"Failed to create agent. Status: {response.status}, Response: {data}"
)
return None
logger.success("✓ Created agent successfully")
return data["agent_id"]
except Exception as e:
logger.exception(f"Exception in agent creation: {str(e)}")
return None
async def test_agent_update(session: aiohttp.ClientSession, agent_id: str, api_key: str):
async def test_agent_update(
session: aiohttp.ClientSession, agent_id: str, api_key: str
):
"""Test agent update endpoint."""
url = f"{BASE_URL}/agent/{agent_id}"
update_data = {
"description": "Updated test agent",
"system_prompt": "Updated system prompt",
"temperature": 0.7,
"tags": ["test", "updated"]
"tags": ["test", "updated"],
}
headers = {"api-key": api_key}
logger.info(f"Testing agent update for agent {agent_id}...")
await log_request_details("PATCH", url, headers, update_data)
try:
async with session.patch(url, headers=headers, json=update_data) as response:
async with session.patch(
url, headers=headers, json=update_data
) as response:
data = await response.json()
await log_response_details(response, data)
if response.status != 200:
logger.error(f"Failed to update agent. Status: {response.status}, Response: {data}")
logger.error(
f"Failed to update agent. Status: {response.status}, Response: {data}"
)
return False
logger.success("✓ Updated agent successfully")
return True
except Exception as e:
logger.exception(f"Exception in agent update: {str(e)}")
return False
async def test_completion(session: aiohttp.ClientSession, agent_id: str, api_key: str):
async def test_completion(
session: aiohttp.ClientSession, agent_id: str, api_key: str
):
"""Test completion endpoint."""
url = f"{BASE_URL}/agent/completions"
completion_request = {
"prompt": "Hello, how are you?",
"agent_id": agent_id,
"max_tokens": 100,
"stream": False
"stream": False,
}
headers = {"api-key": api_key}
logger.info(f"Testing completion for agent {agent_id}...")
await log_request_details("POST", url, headers, completion_request)
await log_request_details(
"POST", url, headers, completion_request
)
try:
async with session.post(url, headers=headers, json=completion_request) as response:
async with session.post(
url, headers=headers, json=completion_request
) as response:
data = await response.json()
await log_response_details(response, data)
if response.status != 200:
logger.error(f"Failed to process completion. Status: {response.status}, Response: {data}")
logger.error(
f"Failed to process completion. Status: {response.status}, Response: {data}"
)
return False
logger.success("✓ Processed completion successfully")
return True
except Exception as e:
logger.exception(f"Exception in completion processing: {str(e)}")
logger.exception(
f"Exception in completion processing: {str(e)}"
)
return False
async def test_get_metrics(session: aiohttp.ClientSession, agent_id: str, api_key: str):
async def test_get_metrics(
session: aiohttp.ClientSession, agent_id: str, api_key: str
):
"""Test metrics endpoint."""
url = f"{BASE_URL}/agent/{agent_id}/metrics"
headers = {"api-key": api_key}
logger.info(f"Testing metrics retrieval for agent {agent_id}...")
await log_request_details("GET", url, headers)
try:
async with session.get(url, headers=headers) as response:
data = await response.json()
await log_response_details(response, data)
if response.status != 200:
logger.error(f"Failed to get metrics. Status: {response.status}, Response: {data}")
logger.error(
f"Failed to get metrics. Status: {response.status}, Response: {data}"
)
return False
logger.success("✓ Retrieved metrics successfully")
return True
except Exception as e:
logger.exception(f"Exception in metrics retrieval: {str(e)}")
return False
async def run_tests():
"""Run all API tests."""
logger.info("Starting API test suite...")
logger.info(f"Using base URL: {BASE_URL}")
timeout = aiohttp.ClientTimeout(total=30) # 30 second timeout
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
@ -186,37 +236,47 @@ async def run_tests():
if not user_data:
logger.error("User creation failed, stopping tests.")
return
logger.info("User created successfully, proceeding with agent tests...")
user_id = user_data["user_id"]
logger.info(
"User created successfully, proceeding with agent tests..."
)
user_data["user_id"]
api_key = user_data["api_key"]
# Create test agent
agent_id = await test_create_agent(session, api_key)
if not agent_id:
logger.error("Agent creation failed, stopping tests.")
return
logger.info("Agent created successfully, proceeding with other tests...")
logger.info(
"Agent created successfully, proceeding with other tests..."
)
# Run remaining tests
test_results = []
# Test metrics retrieval
logger.info("Testing metrics retrieval...")
metrics_result = await test_get_metrics(session, agent_id, api_key)
metrics_result = await test_get_metrics(
session, agent_id, api_key
)
test_results.append(("Metrics", metrics_result))
# Test agent update
logger.info("Testing agent update...")
update_result = await test_agent_update(session, agent_id, api_key)
update_result = await test_agent_update(
session, agent_id, api_key
)
test_results.append(("Agent Update", update_result))
# Test completion
logger.info("Testing completion...")
completion_result = await test_completion(session, agent_id, api_key)
completion_result = await test_completion(
session, agent_id, api_key
)
test_results.append(("Completion", completion_result))
# Log final results
logger.info("\nTest Results Summary:")
all_passed = True
@ -225,25 +285,34 @@ async def run_tests():
logger.info(f"{test_name}: {status}")
if not result:
all_passed = False
if all_passed:
logger.success("\n🎉 All tests completed successfully!")
logger.success(
"\n🎉 All tests completed successfully!"
)
else:
logger.error("\n❌ Some tests failed. Check the logs for details.")
logger.info(f"\nDetailed logs available at: {os.path.abspath(LOG_PATH)}")
logger.error(
"\n❌ Some tests failed. Check the logs for details."
)
logger.info(
f"\nDetailed logs available at: {os.path.abspath(LOG_PATH)}"
)
except Exception as e:
logger.exception(f"Unexpected error during test execution: {str(e)}")
logger.exception(
f"Unexpected error during test execution: {str(e)}"
)
raise
finally:
logger.info("Test suite execution completed.")
def main():
logger.info("="*50)
logger.info("=" * 50)
logger.info("API TEST SUITE EXECUTION")
logger.info("="*50)
logger.info("=" * 50)
try:
asyncio.run(run_tests())
except KeyboardInterrupt:
@ -253,5 +322,6 @@ def main():
finally:
logger.info("Test suite shutdown complete.")
if __name__ == "__main__":
main()
main()

@ -0,0 +1,150 @@
import requests
from swarms import Agent
# Define the system prompt specialized for $Swarms
SWARMS_AGENT_SYS_PROMPT = """
Here is the extensive prompt for an agent specializing in $Swarms and its ecosystem economics:
---
### Specialized System Prompt: $Swarms Coin & Ecosystem Economics Expert
You are an advanced financial analysis and ecosystem economics agent, specializing in the $Swarms cryptocurrency. Your purpose is to provide in-depth, accurate, and insightful answers about $Swarms, its role in the AI-powered economy, and its tokenomics. Your knowledge spans all aspects of $Swarms, including its vision, roadmap, network effects, and its transformative potential for decentralized agent interactions.
#### Core Competencies:
1. **Tokenomics Expertise**: Understand and explain the supply-demand dynamics, token utility, and value proposition of $Swarms as the foundation of the agentic economy.
2. **Ecosystem Insights**: Articulate the benefits of $Swarms' agent-centric design, universal currency utility, and its impact on fostering innovation and collaboration.
3. **Roadmap Analysis**: Provide detailed insights into the $Swarms roadmap phases, explaining their significance and economic implications.
4. **Real-Time Data Analysis**: Fetch live data such as price, market cap, volume, and 24-hour changes for $Swarms from CoinGecko or other reliable sources.
5. **Economic Visionary**: Analyze how $Swarms supports the democratization of AI and creates a sustainable framework for AI development.
---
#### Your Mission:
You empower users by explaining how $Swarms revolutionizes the AI economy through decentralized agent interactions, seamless value exchange, and frictionless payments. Help users understand how $Swarms incentivizes developers, democratizes access to AI tools, and builds a thriving interconnected economy of autonomous agents.
---
#### Knowledge Base:
##### Vision:
- **Empowering the Agentic Revolution**: $Swarms is the cornerstone of a decentralized AI economy.
- **Mission**: Revolutionize the AI economy by enabling seamless transactions, rewarding excellence, fostering innovation, and lowering entry barriers for developers.
##### Core Features:
1. **Reward Excellence**: Incentivize developers creating high-performing agents.
2. **Seamless Transactions**: Enable frictionless payments for agentic services.
3. **Foster Innovation**: Encourage collaboration and creativity in AI development.
4. **Sustainable Framework**: Provide scalability for long-term AI ecosystem growth.
5. **Democratize AI**: Lower barriers for users and developers to participate in the AI economy.
##### Why $Swarms?
- **Agent-Centric Design**: Each agent operates with its tokenomics, with $Swarms as the base currency for value exchange.
- **Universal Currency**: A single, unified medium for all agent transactions, reducing complexity.
- **Network Effects**: Growing utility and value as more agents join the $Swarms ecosystem.
##### Roadmap:
1. **Phase 1: Foundation**:
- Launch $Swarms token.
- Deploy initial agent creation tools.
- Establish community governance.
2. **Phase 2: Expansion**:
- Launch agent marketplace.
- Enable cross-agent communication.
- Deploy automated market-making tools.
3. **Phase 3: Integration**:
- Partner with leading AI platforms.
- Launch developer incentives.
- Scale the agent ecosystem globally.
4. **Phase 4: Evolution**:
- Advanced agent capabilities.
- Cross-chain integration.
- Create a global AI marketplace.
##### Ecosystem Benefits:
- **Agent Creation**: Simplified deployment of agents with tokenomics built-in.
- **Universal Currency**: Power all agent interactions with $Swarms.
- **Network Effects**: Thrive in an expanding interconnected agent ecosystem.
- **Secure Trading**: Built on Solana for fast and secure transactions.
- **Instant Settlement**: Lightning-fast transactions with minimal fees.
- **Community Governance**: Decentralized decision-making for the ecosystem.
##### Economic Impact:
- Autonomous agents drive value creation independently.
- Exponential growth potential as network effects amplify adoption.
- Interconnected economy fosters innovation and collaboration.
---
#### How to Answer Queries:
1. Always remain neutral, factual, and comprehensive.
2. Include live data where applicable (e.g., price, market cap, trading volume).
3. Structure responses with clear headings and concise explanations.
4. Use context to explain the relevance of $Swarms to the broader AI economy.
---
---
Leverage your knowledge of $Swarms' vision, roadmap, and economics to provide users with insightful and actionable responses. Aim to be the go-to agent for understanding and utilizing $Swarms in the agentic economy.
"""
# Function to fetch $Swarms data from CoinGecko
def fetch_swarms_data():
url = "https://api.coingecko.com/api/v3/simple/price"
params = {
"ids": "swarms", # Replace with the CoinGecko ID for $Swarms
"vs_currencies": "usd",
"include_market_cap": "true",
"include_24hr_vol": "true",
"include_24hr_change": "true",
}
response = requests.get(url, params=params)
response.raise_for_status()
return response.json()
# Initialize the agent
swarms_agent = Agent(
agent_name="Swarms-Token-Agent",
system_prompt=SWARMS_AGENT_SYS_PROMPT,
model_name="gpt-4o-mini",
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="swarms_agent.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
output_type="string",
streaming_on=False,
)
# Example task: Fetch $Swarms data and provide insights
def answer_swarms_query(query):
# Fetch real-time data
swarms_data = fetch_swarms_data()
print(swarms_data)
price = swarms_data["swarms"]["usd"]
market_cap = swarms_data["swarms"]["usd_market_cap"]
volume = swarms_data["swarms"]["usd_24h_vol"]
change = swarms_data["swarms"]["usd_24h_change"]
# Run the agent with the query and include real-time data
data_summary = (
f"Current Price: ${price}\n"
f"Market Cap: ${market_cap}\n"
f"24hr Volume: ${volume}\n"
f"24hr Change: {change:.2f}%"
)
full_query = f"{query}\n\nReal-Time Data:\n{data_summary}"
return swarms_agent.run(full_query)
# Example query
response = answer_swarms_query("What is the price of $Swarms?")
print(response)

@ -0,0 +1,313 @@
import asyncio
import aiohttp
from typing import Dict, List, Optional
from datetime import datetime
from statistics import mean, median
from swarms.structs.agent import Agent
# Define the system prompt specialized for $Swarms
SWARMS_AGENT_SYS_PROMPT = """
Here is the extensive prompt for an agent specializing in $Swarms and its ecosystem economics:
---
### Specialized System Prompt: $Swarms Coin & Ecosystem Economics Expert
You are an advanced financial analysis and ecosystem economics agent, specializing in the $Swarms cryptocurrency. Your purpose is to provide in-depth, accurate, and insightful answers about $Swarms, its role in the AI-powered economy, and its tokenomics. Your knowledge spans all aspects of $Swarms, including its vision, roadmap, network effects, and its transformative potential for decentralized agent interactions.
#### Core Competencies:
1. **Tokenomics Expertise**: Understand and explain the supply-demand dynamics, token utility, and value proposition of $Swarms as the foundation of the agentic economy.
2. **Ecosystem Insights**: Articulate the benefits of $Swarms' agent-centric design, universal currency utility, and its impact on fostering innovation and collaboration.
3. **Roadmap Analysis**: Provide detailed insights into the $Swarms roadmap phases, explaining their significance and economic implications.
4. **Real-Time Data Analysis**: Fetch live data such as price, market cap, volume, and 24-hour changes for $Swarms from CoinGecko or other reliable sources.
5. **Economic Visionary**: Analyze how $Swarms supports the democratization of AI and creates a sustainable framework for AI development.
---
#### Your Mission:
You empower users by explaining how $Swarms revolutionizes the AI economy through decentralized agent interactions, seamless value exchange, and frictionless payments. Help users understand how $Swarms incentivizes developers, democratizes access to AI tools, and builds a thriving interconnected economy of autonomous agents.
---
#### Knowledge Base:
##### Vision:
- **Empowering the Agentic Revolution**: $Swarms is the cornerstone of a decentralized AI economy.
- **Mission**: Revolutionize the AI economy by enabling seamless transactions, rewarding excellence, fostering innovation, and lowering entry barriers for developers.
##### Core Features:
1. **Reward Excellence**: Incentivize developers creating high-performing agents.
2. **Seamless Transactions**: Enable frictionless payments for agentic services.
3. **Foster Innovation**: Encourage collaboration and creativity in AI development.
4. **Sustainable Framework**: Provide scalability for long-term AI ecosystem growth.
5. **Democratize AI**: Lower barriers for users and developers to participate in the AI economy.
##### Why $Swarms?
- **Agent-Centric Design**: Each agent operates with its tokenomics, with $Swarms as the base currency for value exchange.
- **Universal Currency**: A single, unified medium for all agent transactions, reducing complexity.
- **Network Effects**: Growing utility and value as more agents join the $Swarms ecosystem.
##### Roadmap:
1. **Phase 1: Foundation**:
- Launch $Swarms token.
- Deploy initial agent creation tools.
- Establish community governance.
2. **Phase 2: Expansion**:
- Launch agent marketplace.
- Enable cross-agent communication.
- Deploy automated market-making tools.
3. **Phase 3: Integration**:
- Partner with leading AI platforms.
- Launch developer incentives.
- Scale the agent ecosystem globally.
4. **Phase 4: Evolution**:
- Advanced agent capabilities.
- Cross-chain integration.
- Create a global AI marketplace.
##### Ecosystem Benefits:
- **Agent Creation**: Simplified deployment of agents with tokenomics built-in.
- **Universal Currency**: Power all agent interactions with $Swarms.
- **Network Effects**: Thrive in an expanding interconnected agent ecosystem.
- **Secure Trading**: Built on Solana for fast and secure transactions.
- **Instant Settlement**: Lightning-fast transactions with minimal fees.
- **Community Governance**: Decentralized decision-making for the ecosystem.
##### Economic Impact:
- Autonomous agents drive value creation independently.
- Exponential growth potential as network effects amplify adoption.
- Interconnected economy fosters innovation and collaboration.
---
#### How to Answer Queries:
1. Always remain neutral, factual, and comprehensive.
2. Include live data where applicable (e.g., price, market cap, trading volume).
3. Structure responses with clear headings and concise explanations.
4. Use context to explain the relevance of $Swarms to the broader AI economy.
---
---
Leverage your knowledge of $Swarms' vision, roadmap, and economics to provide users with insightful and actionable responses. Aim to be the go-to agent for understanding and utilizing $Swarms in the agentic economy.
"""
# Initialize the agent
swarms_agent = Agent(
agent_name="Swarms-Token-Agent",
system_prompt=SWARMS_AGENT_SYS_PROMPT,
model_name="gpt-4o-mini",
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="swarms_agent.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
output_type="string",
streaming_on=False,
)
class MultiExchangeDataFetcher:
def __init__(self):
self.base_urls = {
"coingecko": "https://api.coingecko.com/api/v3",
"dexscreener": "https://api.dexscreener.com/latest/dex",
"birdeye": "https://public-api.birdeye.so/public", # Using Birdeye instead of Jupiter
}
async def fetch_data(self, url: str) -> Optional[Dict]:
"""Generic async function to fetch data from APIs with error handling"""
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
return await response.json()
print(
f"API returned status {response.status} for {url}"
)
return None
except asyncio.TimeoutError:
print(f"Timeout while fetching from {url}")
return None
except Exception as e:
print(f"Error fetching from {url}: {str(e)}")
return None
async def get_coingecko_data(self) -> Optional[Dict]:
"""Fetch $Swarms data from CoinGecko"""
try:
url = f"{self.base_urls['coingecko']}/simple/price"
params = {
"ids": "swarms",
"vs_currencies": "usd",
"include_market_cap": "true",
"include_24hr_vol": "true",
"include_24hr_change": "true",
}
query = f"{url}?{'&'.join(f'{k}={v}' for k, v in params.items())}"
data = await self.fetch_data(query)
if data and "swarms" in data:
return {
"price": data["swarms"].get("usd"),
"volume24h": data["swarms"].get("usd_24h_vol"),
"marketCap": data["swarms"].get("usd_market_cap"),
}
return None
except Exception as e:
print(f"Error processing CoinGecko data: {str(e)}")
return None
async def get_dexscreener_data(self) -> Optional[Dict]:
"""Fetch $Swarms data from DexScreener"""
try:
url = (
f"{self.base_urls['dexscreener']}/pairs/solana/swarms"
)
data = await self.fetch_data(url)
if data and "pairs" in data and len(data["pairs"]) > 0:
pair = data["pairs"][0] # Get the first pair
return {
"price": float(pair.get("priceUsd", 0)),
"volume24h": float(pair.get("volume24h", 0)),
"marketCap": float(pair.get("marketCap", 0)),
}
return None
except Exception as e:
print(f"Error processing DexScreener data: {str(e)}")
return None
async def get_birdeye_data(self) -> Optional[Dict]:
"""Fetch $Swarms data from Birdeye"""
try:
# Example Birdeye endpoint - replace ADDRESS with actual Swarms token address
url = f"{self.base_urls['birdeye']}/token/SWRM2bHQFY5ANXzYGdQ8m9ZRMsqFmsWAadLVvHc2ABJ"
data = await self.fetch_data(url)
if data and "data" in data:
token_data = data["data"]
return {
"price": float(token_data.get("price", 0)),
"volume24h": float(
token_data.get("volume24h", 0)
),
"marketCap": float(
token_data.get("marketCap", 0)
),
}
return None
except Exception as e:
print(f"Error processing Birdeye data: {str(e)}")
return None
def aggregate_data(
self, data_points: List[Optional[Dict]]
) -> Dict:
"""Aggregate data from multiple sources with null checking"""
prices = []
volumes = []
market_caps = []
for data in data_points:
if data and isinstance(data, dict):
if data.get("price") is not None:
prices.append(float(data["price"]))
if data.get("volume24h") is not None:
volumes.append(float(data["volume24h"]))
if data.get("marketCap") is not None:
market_caps.append(float(data["marketCap"]))
return {
"price": {
"mean": mean(prices) if prices else 0,
"median": median(prices) if prices else 0,
"min": min(prices) if prices else 0,
"max": max(prices) if prices else 0,
"sources": len(prices),
},
"volume_24h": {
"mean": mean(volumes) if volumes else 0,
"total": sum(volumes) if volumes else 0,
"sources": len(volumes),
},
"market_cap": {
"mean": mean(market_caps) if market_caps else 0,
"median": median(market_caps) if market_caps else 0,
"sources": len(market_caps),
},
"timestamp": datetime.now().isoformat(),
"sources_total": len(
[d for d in data_points if d is not None]
),
}
async def get_enhanced_swarms_data():
fetcher = MultiExchangeDataFetcher()
# Gather all data concurrently
tasks = [
fetcher.get_coingecko_data(),
fetcher.get_dexscreener_data(),
fetcher.get_birdeye_data(),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions and None values
valid_results = [r for r in results if isinstance(r, dict)]
return fetcher.aggregate_data(valid_results)
async def answer_swarms_query(query: str) -> str:
try:
# Fetch enhanced data
swarms_data = await get_enhanced_swarms_data()
if swarms_data["sources_total"] == 0:
return "Unable to fetch current market data from any source. Please try again later."
# Format the data summary with null checks
data_summary = (
f"Aggregated Data (from {swarms_data['sources_total']} sources):\n"
f"Average Price: ${swarms_data['price']['mean']:.4f}\n"
f"Price Range: ${swarms_data['price']['min']:.4f} - ${swarms_data['price']['max']:.4f}\n"
f"24hr Volume (Total): ${swarms_data['volume_24h']['total']:,.2f}\n"
f"Average Market Cap: ${swarms_data['market_cap']['mean']:,.2f}\n"
f"Last Updated: {swarms_data['timestamp']}"
)
# Update the system prompt with the enhanced data capabilities
enhanced_prompt = (
SWARMS_AGENT_SYS_PROMPT
+ f"\n\nReal-Time Multi-Exchange Data:\n{data_summary}"
)
# Update the agent with the enhanced prompt
swarms_agent.update_system_prompt(enhanced_prompt)
# Run the query
full_query = (
f"{query}\n\nCurrent Market Data:\n{data_summary}"
)
return swarms_agent.run(full_query)
except Exception as e:
print(f"Error in answer_swarms_query: {str(e)}")
return (
f"An error occurred while processing your query: {str(e)}"
)
async def main():
query = "What is the current market status of $Swarms across different exchanges?"
response = await answer_swarms_query(query)
print(response)
if __name__ == "__main__":
asyncio.run(main())

@ -114,6 +114,7 @@ class ExecutionContext:
def func():
pass
hints = get_type_hints(func)

@ -234,7 +234,7 @@ def fetch_wallet_transactions(wallet_address: str) -> str:
# Small delay between transaction fetches
time.sleep(0.1)
# print(tx)
logger.info(f"Enriched transaction: {tx}")

@ -1,10 +1,10 @@
from typing import Dict, List, Optional, Union, Any
from typing import List
from datetime import datetime
import json
import requests
from loguru import logger
from dataclasses import dataclass
from datetime import datetime, timezone
from datetime import timezone
import time
import random
@ -14,30 +14,35 @@ logger.add(
rotation="500 MB",
retention="10 days",
level="INFO",
format="{time} {level} {message}"
format="{time} {level} {message}",
)
# Most reliable RPC endpoints
RPC_ENDPOINTS = [
"https://api.mainnet-beta.solana.com",
"https://rpc.ankr.com/solana",
"https://solana.getblock.io/mainnet"
"https://solana.getblock.io/mainnet",
]
@dataclass
class TransactionError:
"""Data class to represent transaction errors"""
error_type: str
message: str
timestamp: str = datetime.now(timezone.utc).isoformat()
class SolanaAPIException(Exception):
"""Custom exception for Solana API related errors"""
pass
class RPCEndpointManager:
"""Manages RPC endpoints and handles switching between them"""
def __init__(self, endpoints: List[str]):
self.endpoints = endpoints.copy()
self.current_endpoint = self.endpoints[0]
@ -45,128 +50,165 @@ class RPCEndpointManager:
self.min_request_interval = 0.2 # Increased minimum interval
self.total_requests = 0
self.max_requests_per_endpoint = 3
def get_endpoint(self) -> str:
"""Get current endpoint with rate limiting"""
now = time.time()
time_since_last = now - self.last_request_time
if time_since_last < self.min_request_interval:
time.sleep(self.min_request_interval - time_since_last)
self.total_requests += 1
if self.total_requests >= self.max_requests_per_endpoint:
self.switch_endpoint()
self.total_requests = 0
self.last_request_time = time.time()
return self.current_endpoint
def switch_endpoint(self) -> str:
"""Switch to next available endpoint"""
current = self.current_endpoint
available_endpoints = [ep for ep in self.endpoints if ep != current]
available_endpoints = [
ep for ep in self.endpoints if ep != current
]
if not available_endpoints:
raise SolanaAPIException("All endpoints exhausted")
self.current_endpoint = random.choice(available_endpoints)
logger.info(f"Switched to endpoint: {self.current_endpoint}")
return self.current_endpoint
def make_request(endpoint_manager: RPCEndpointManager, payload: dict, retry_count: int = 3) -> dict:
def make_request(
endpoint_manager: RPCEndpointManager,
payload: dict,
retry_count: int = 3,
) -> dict:
"""
Makes a request with automatic endpoint switching and error handling.
"""
last_error = None
for attempt in range(retry_count):
try:
endpoint = endpoint_manager.get_endpoint()
response = requests.post(
endpoint,
json=payload,
timeout=10,
headers={"Content-Type": "application/json"},
verify=True # Ensure SSL verification
verify=True, # Ensure SSL verification
)
if response.status_code != 200:
raise SolanaAPIException(f"HTTP {response.status_code}: {response.text}")
raise SolanaAPIException(
f"HTTP {response.status_code}: {response.text}"
)
data = response.json()
if "error" in data:
error_code = data["error"].get("code")
if error_code == 429: # Rate limit
logger.warning(f"Rate limit hit, switching endpoint...")
logger.warning(
"Rate limit hit, switching endpoint..."
)
endpoint_manager.switch_endpoint()
time.sleep(2 ** attempt) # Exponential backoff
time.sleep(2**attempt) # Exponential backoff
continue
if "message" in data["error"]:
raise SolanaAPIException(f"RPC error: {data['error']['message']}")
raise SolanaAPIException(
f"RPC error: {data['error']['message']}"
)
return data
except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as e:
logger.warning(f"Connection error with {endpoint}: {str(e)}")
except (
requests.exceptions.SSLError,
requests.exceptions.ConnectionError,
) as e:
logger.warning(
f"Connection error with {endpoint}: {str(e)}"
)
endpoint_manager.switch_endpoint()
continue
except Exception as e:
last_error = e
logger.warning(f"Request failed: {str(e)}")
endpoint_manager.switch_endpoint()
time.sleep(1)
continue
raise SolanaAPIException(f"All retry attempts failed. Last error: {str(last_error)}")
def fetch_wallet_transactions(wallet_address: str, max_transactions: int = 10) -> str:
raise SolanaAPIException(
f"All retry attempts failed. Last error: {str(last_error)}"
)
def fetch_wallet_transactions(
wallet_address: str, max_transactions: int = 10
) -> str:
"""
Fetches recent transactions for a given Solana wallet address.
Args:
wallet_address (str): The Solana wallet address to fetch transactions for
max_transactions (int, optional): Maximum number of transactions to fetch. Defaults to 10.
Returns:
str: JSON string containing transaction details
"""
try:
if not isinstance(wallet_address, str) or len(wallet_address) != 44:
raise ValueError(f"Invalid Solana wallet address format: {wallet_address}")
if not isinstance(max_transactions, int) or max_transactions < 1:
raise ValueError("max_transactions must be a positive integer")
if (
not isinstance(wallet_address, str)
or len(wallet_address) != 44
):
raise ValueError(
f"Invalid Solana wallet address format: {wallet_address}"
)
logger.info(f"Fetching up to {max_transactions} transactions for wallet: {wallet_address}")
if (
not isinstance(max_transactions, int)
or max_transactions < 1
):
raise ValueError(
"max_transactions must be a positive integer"
)
logger.info(
f"Fetching up to {max_transactions} transactions for wallet: {wallet_address}"
)
endpoint_manager = RPCEndpointManager(RPC_ENDPOINTS)
# Get transaction signatures
signatures_payload = {
"jsonrpc": "2.0",
"id": str(random.randint(1, 1000)),
"method": "getSignaturesForAddress",
"params": [
wallet_address,
{"limit": max_transactions}
]
"params": [wallet_address, {"limit": max_transactions}],
}
signatures_data = make_request(endpoint_manager, signatures_payload)
signatures_data = make_request(
endpoint_manager, signatures_payload
)
transactions = signatures_data.get("result", [])
if not transactions:
logger.info("No transactions found for this wallet")
return json.dumps({
"success": True,
"transactions": [],
"error": None,
"transaction_count": 0
}, indent=2)
return json.dumps(
{
"success": True,
"transactions": [],
"error": None,
"transaction_count": 0,
},
indent=2,
)
logger.info(f"Found {len(transactions)} transactions")
@ -180,12 +222,15 @@ def fetch_wallet_transactions(wallet_address: str, max_transactions: int = 10) -
"method": "getTransaction",
"params": [
tx["signature"],
{"encoding": "json", "maxSupportedTransactionVersion": 0}
]
{
"encoding": "json",
"maxSupportedTransactionVersion": 0,
},
],
}
tx_data = make_request(endpoint_manager, tx_payload)
if "result" in tx_data and tx_data["result"]:
result = tx_data["result"]
enriched_tx = {
@ -194,47 +239,64 @@ def fetch_wallet_transactions(wallet_address: str, max_transactions: int = 10) -
"timestamp": tx.get("blockTime"),
"success": not tx.get("err"),
}
if "meta" in result:
enriched_tx["fee"] = result["meta"].get("fee")
if "preBalances" in result["meta"] and "postBalances" in result["meta"]:
enriched_tx["balance_change"] = sum(result["meta"]["postBalances"]) - sum(result["meta"]["preBalances"])
if (
"preBalances" in result["meta"]
and "postBalances" in result["meta"]
):
enriched_tx["balance_change"] = sum(
result["meta"]["postBalances"]
) - sum(result["meta"]["preBalances"])
enriched_transactions.append(enriched_tx)
logger.info(f"Processed transaction {tx['signature'][:8]}...")
logger.info(
f"Processed transaction {tx['signature'][:8]}..."
)
except Exception as e:
logger.warning(f"Failed to process transaction {tx['signature']}: {str(e)}")
logger.warning(
f"Failed to process transaction {tx['signature']}: {str(e)}"
)
continue
logger.info(f"Successfully processed {len(enriched_transactions)} transactions")
return json.dumps({
"success": True,
"transactions": enriched_transactions,
"error": None,
"transaction_count": len(enriched_transactions)
}, indent=2)
logger.info(
f"Successfully processed {len(enriched_transactions)} transactions"
)
return json.dumps(
{
"success": True,
"transactions": enriched_transactions,
"error": None,
"transaction_count": len(enriched_transactions),
},
indent=2,
)
except Exception as e:
error = TransactionError(
error_type="API_ERROR",
message=str(e)
error_type="API_ERROR", message=str(e)
)
logger.error(f"Error: {error.message}")
return json.dumps({
"success": False,
"transactions": [],
"error": error.__dict__,
"transaction_count": 0
}, indent=2)
return json.dumps(
{
"success": False,
"transactions": [],
"error": error.__dict__,
"transaction_count": 0,
},
indent=2,
)
if __name__ == "__main__":
# Example wallet address
wallet = "CtBLg4AX6LQfKVtPPUWqJyQ5cRfHydUwuZZ87rmojA1P"
try:
result = fetch_wallet_transactions(wallet)
print(result)
except Exception as e:
logger.error(f"Failed to fetch transactions: {str(e)}")
logger.error(f"Failed to fetch transactions: {str(e)}")

@ -1,4 +1,3 @@
import json
import os
import subprocess

@ -499,10 +499,10 @@ class Agent:
self.stopping_token = "<DONE>"
# If the docs exist then ingest the docs
if exists(self.docs):
threading.Thread(
target=self.ingest_docs, args=(self.docs)
).start()
# if exists(self.docs):
# threading.Thread(
# target=self.ingest_docs, args=(self.docs)
# ).start()
# If docs folder exists then get the docs from docs folder
if exists(self.docs_folder):
@ -1176,6 +1176,13 @@ class Agent:
except Exception as error:
self._handle_run_error(error)
def receive_message(
self, agent_name: str, task: str, *args, **kwargs
):
return self.run(
task=f"From {agent_name}: {task}", *args, **kwargs
)
def dict_to_csv(self, data: dict) -> str:
"""
Convert a dictionary to a CSV string.

@ -482,13 +482,15 @@ class AgentRearrange(BaseSwarm):
except Exception as e:
self._catch_error(e)
def _catch_error(self, e: Exception):
if self.autosave is True:
log_agent_data(self.to_dict())
logger.error(f"An error occurred with your swarm {self.name}: Error: {e} Traceback: {e.__traceback__}")
logger.error(
f"An error occurred with your swarm {self.name}: Error: {e} Traceback: {e.__traceback__}"
)
return e
def run(
@ -653,7 +655,9 @@ class AgentRearrange(BaseSwarm):
# Process batch using asyncio.gather
batch_coros = [
self.astream(task=task, img=img_path, *args, **kwargs)
self.astream(
task=task, img=img_path, *args, **kwargs
)
for task, img_path in zip(batch_tasks, batch_imgs)
]
batch_results = await asyncio.gather(*batch_coros)
@ -691,7 +695,9 @@ class AgentRearrange(BaseSwarm):
List of results corresponding to input tasks
"""
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
with ThreadPoolExecutor(
max_workers=max_workers
) as executor:
imgs = img if img else [None] * len(tasks)
futures = [
executor.submit(
@ -710,8 +716,7 @@ class AgentRearrange(BaseSwarm):
return [future.result() for future in futures]
except Exception as e:
self._catch_error(e)
def _serialize_callable(
self, attr_value: Callable
) -> Dict[str, Any]:
@ -771,7 +776,6 @@ class AgentRearrange(BaseSwarm):
}
def rearrange(
agents: List[Agent] = None,
flow: str = None,

@ -108,7 +108,9 @@ class SpreadSheetSwarm(BaseSwarm):
# --------------- NEW CHANGE START ---------------
# The save_file_path now uses the formatted_time and uuid_hex
self.save_file_path = f"spreadsheet_swarm_{formatted_time}_run_id_{uuid_hex}.csv"
self.save_file_path = (
f"spreadsheet_swarm_run_id_{uuid_hex}.csv"
)
# --------------- NEW CHANGE END ---------------
self.metadata = SwarmRunMetadata(
@ -182,10 +184,22 @@ class SpreadSheetSwarm(BaseSwarm):
),
docs=[row["docs"]] if "docs" in row else "",
dynamic_temperature_enabled=True,
max_loops=row["max_loops"] if "max_loops" in row else 1,
user_name=row["user_name"] if "user_name" in row else "user",
max_loops=(
row["max_loops"]
if "max_loops" in row
else 1
),
user_name=(
row["user_name"]
if "user_name" in row
else "user"
),
# output_type="str",
stopping_token=row["stopping_token"] if "stopping_token" in row else None,
stopping_token=(
row["stopping_token"]
if "stopping_token" in row
else None
),
)
# Add agent to swarm
@ -268,8 +282,7 @@ class SpreadSheetSwarm(BaseSwarm):
print(log_agent_data(self.metadata.model_dump()))
return self.metadata.model_dump_json(indent=4)
def run(self, task: str = None, *args, **kwargs):
"""
Run the swarm with the specified task.
@ -378,7 +391,7 @@ class SpreadSheetSwarm(BaseSwarm):
create_file_in_folder(
folder_path=f"{self.workspace_dir}/Spreedsheet-Swarm-{self.name}/{self.name}",
file_name=f"spreedsheet-swarm-{self.metadata.run_id}_metadata.json",
file_name=f"spreedsheet-swarm-{uuid_hex}_metadata.json",
content=out,
)

@ -1,4 +1,3 @@
import asyncio
import math
from typing import List, Union
@ -343,47 +342,6 @@ def sinusoidal_swarm(agents: AgentListType, task: str):
agents[index].run(task)
async def one_to_three(
sender: Agent, agents: AgentListType, task: str
):
"""
Sends a message from the sender agent to three other agents.
Args:
sender (Agent): The agent sending the message.
agents (AgentListType): The list of agents to receive the message.
task (str): The message to be sent.
Raises:
Exception: If there is an error while sending the message.
Returns:
None
"""
if len(agents) != 3:
raise ValueError("The number of agents must be exactly 3.")
if not task:
raise ValueError("The task cannot be empty.")
if not sender:
raise ValueError("The sender cannot be empty.")
try:
receive_tasks = []
for agent in agents:
receive_tasks.append(
agent.receive_message(sender.agent_name, task)
)
await asyncio.gather(*receive_tasks)
except Exception as error:
logger.error(
f"[ERROR][CLASS: Agent][METHOD: one_to_three] {error}"
)
raise error
"""
This module contains functions for facilitating communication between agents in a swarm. It includes methods for one-to-one communication, broadcasting, and other swarm architectures.
"""
@ -440,36 +398,70 @@ def one_to_one(
return conversation.return_history()
# Broadcasting: A message from one agent to many
async def broadcast(
sender: Agent, agents: AgentListType, task: str
) -> None:
"""
Facilitates broadcasting of a message from one agent to multiple agents.
Args:
sender (Agent): The agent sending the message.
agents (AgentListType): The list of agents to receive the message.
task (str): The message to be sent.
Raises:
ValueError: If the sender, agents, or task is empty.
Exception: If there is an error during the broadcasting process.
"""
conversation = Conversation()
if not sender or not agents or not task:
raise ValueError("Sender, agents, and task cannot be empty.")
try:
receive_tasks = []
# First get the sender's broadcast message
broadcast_message = sender.run(task)
conversation.add_log(
agent_name=sender.agent_name,
task=task,
response=broadcast_message,
)
# Then have all agents process it
for agent in agents:
receive_tasks.append(agent.run(task))
response = agent.run(broadcast_message)
conversation.add_log(
agent_name=agent.agent_name, task=task, response=task
agent_name=agent.agent_name,
task=broadcast_message,
response=response,
)
await asyncio.gather(*receive_tasks)
return conversation.return_history()
except Exception as error:
logger.error(f"Error during broadcast: {error}")
raise error
async def one_to_three(
sender: Agent, agents: AgentListType, task: str
):
if len(agents) != 3:
raise ValueError("The number of agents must be exactly 3.")
if not task or not sender:
raise ValueError("Sender and task cannot be empty.")
conversation = Conversation()
try:
# Get sender's message
sender_message = sender.run(task)
conversation.add_log(
agent_name=sender.agent_name,
task=task,
response=sender_message,
)
# Have each receiver process the message
for agent in agents:
response = agent.run(sender_message)
conversation.add_log(
agent_name=agent.agent_name,
task=sender_message,
response=response,
)
return conversation.return_history()
except Exception as error:
logger.error(f"Error in one_to_three: {error}")
raise error

@ -10,7 +10,6 @@ from swarms.structs.agent import Agent
logger = initialize_logger(log_folder="pandas_utils")
def display_agents_info(agents: List[Agent]) -> None:
"""
Displays information about all agents in a list using a DataFrame.
@ -18,7 +17,7 @@ def display_agents_info(agents: List[Agent]) -> None:
:param agents: List of Agent instances.
"""
# Extracting relevant information from each agent
try:
import pandas as pd
except ImportError:
@ -26,8 +25,6 @@ def display_agents_info(agents: List[Agent]) -> None:
subprocess.run(["pip", "install", "pandas"])
import pandas as pd
agent_data = []
for agent in agents:
try:

@ -0,0 +1,6 @@
pytest
swarms
loguru
pydantic
swarm-models
loguru

@ -12,6 +12,7 @@ from swarms.structs.rearrange import AgentRearrange
class TestResult:
"""Class to store test results and metadata"""
def __init__(self, test_name: str):
self.test_name = test_name
self.start_time = datetime.now()
@ -21,7 +22,9 @@ class TestResult:
self.traceback = None
self.function_output = None
def complete(self, success: bool, error: Optional[Exception] = None):
def complete(
self, success: bool, error: Optional[Exception] = None
):
"""Complete the test execution with results"""
self.end_time = datetime.now()
self.success = success
@ -35,36 +38,47 @@ class TestResult:
return (self.end_time - self.start_time).total_seconds()
return 0
def run_test(test_func: Callable) -> TestResult:
"""
Decorator to run tests with error handling and logging
Args:
test_func (Callable): Test function to execute
Returns:
TestResult: Object containing test execution details
"""
def wrapper(*args, **kwargs) -> TestResult:
result = TestResult(test_func.__name__)
logger.info(f"\n{'='*20} Running test: {test_func.__name__} {'='*20}")
logger.info(
f"\n{'='*20} Running test: {test_func.__name__} {'='*20}"
)
try:
output = test_func(*args, **kwargs)
result.function_output = output
result.complete(success=True)
logger.success(f"✅ Test {test_func.__name__} passed successfully")
logger.success(
f"✅ Test {test_func.__name__} passed successfully"
)
except Exception as e:
result.complete(success=False, error=e)
logger.error(f"❌ Test {test_func.__name__} failed with error: {str(e)}")
logger.error(
f"❌ Test {test_func.__name__} failed with error: {str(e)}"
)
logger.error(f"Traceback: {traceback.format_exc()}")
logger.info(f"Test duration: {result.duration():.2f} seconds\n")
logger.info(
f"Test duration: {result.duration():.2f} seconds\n"
)
return result
return wrapper
def create_functional_agents() -> List[Agent]:
"""
Create a list of functional agents with real LLM integration for testing.
@ -73,16 +87,19 @@ def create_functional_agents() -> List[Agent]:
# Initialize OpenAI Chat model
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
logger.warning("No OpenAI API key found. Using mock agents instead.")
return [create_mock_agent("TestAgent1"), create_mock_agent("TestAgent2")]
logger.warning(
"No OpenAI API key found. Using mock agents instead."
)
return [
create_mock_agent("TestAgent1"),
create_mock_agent("TestAgent2"),
]
try:
model = OpenAIChat(
api_key=api_key,
model_name="gpt-4o",
temperature=0.1
api_key=api_key, model_name="gpt-4o", temperature=0.1
)
# Create boss agent
boss_agent = Agent(
agent_name="BossAgent",
@ -101,7 +118,7 @@ def create_functional_agents() -> List[Agent]:
state_save_file_type="json",
saved_state_path="test_boss_agent.json",
)
# Create analysis agent
analysis_agent = Agent(
agent_name="AnalysisAgent",
@ -119,7 +136,7 @@ def create_functional_agents() -> List[Agent]:
state_save_file_type="json",
saved_state_path="test_analysis_agent.json",
)
# Create summary agent
summary_agent = Agent(
agent_name="SummaryAgent",
@ -137,83 +154,102 @@ def create_functional_agents() -> List[Agent]:
state_save_file_type="json",
saved_state_path="test_summary_agent.json",
)
logger.info("Successfully created functional agents with LLM integration")
logger.info(
"Successfully created functional agents with LLM integration"
)
return [boss_agent, analysis_agent, summary_agent]
except Exception as e:
logger.error(f"Failed to create functional agents: {str(e)}")
logger.warning("Falling back to mock agents")
return [create_mock_agent("TestAgent1"), create_mock_agent("TestAgent2")]
return [
create_mock_agent("TestAgent1"),
create_mock_agent("TestAgent2"),
]
def create_mock_agent(name: str) -> Agent:
"""Create a mock agent for testing when LLM integration is not available"""
return Agent(
agent_name=name,
system_prompt=f"You are a test agent named {name}",
llm=None
llm=None,
)
@run_test
def test_init():
"""Test AgentRearrange initialization with functional agents"""
logger.info("Creating agents for initialization test")
agents = create_functional_agents()
rearrange = AgentRearrange(
name="TestRearrange",
agents=agents,
flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}"
flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}",
)
assert rearrange.name == "TestRearrange"
assert len(rearrange.agents) == 3
assert rearrange.flow == f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}"
logger.info(f"Initialized AgentRearrange with {len(agents)} agents")
assert (
rearrange.flow
== f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}"
)
logger.info(
f"Initialized AgentRearrange with {len(agents)} agents"
)
return True
@run_test
def test_validate_flow():
"""Test flow validation logic"""
agents = create_functional_agents()
rearrange = AgentRearrange(
agents=agents,
flow=f"{agents[0].agent_name} -> {agents[1].agent_name}"
flow=f"{agents[0].agent_name} -> {agents[1].agent_name}",
)
logger.info("Testing valid flow pattern")
valid = rearrange.validate_flow()
assert valid is True
logger.info("Testing invalid flow pattern")
rearrange.flow = f"{agents[0].agent_name} {agents[1].agent_name}" # Missing arrow
try:
rearrange.validate_flow()
assert False, "Should have raised ValueError"
except ValueError as e:
logger.info(f"Successfully caught invalid flow error: {str(e)}")
logger.info(
f"Successfully caught invalid flow error: {str(e)}"
)
assert True
return True
@run_test
def test_add_remove_agent():
"""Test adding and removing agents from the swarm"""
agents = create_functional_agents()
rearrange = AgentRearrange(agents=agents[:2]) # Start with first two agents
rearrange = AgentRearrange(
agents=agents[:2]
) # Start with first two agents
logger.info("Testing agent addition")
new_agent = agents[2] # Use the third agent as new agent
rearrange.add_agent(new_agent)
assert new_agent.agent_name in rearrange.agents
logger.info("Testing agent removal")
rearrange.remove_agent(new_agent.agent_name)
assert new_agent.agent_name not in rearrange.agents
return True
@run_test
def test_basic_run():
"""Test basic task execution with the swarm"""
@ -222,25 +258,30 @@ def test_basic_run():
name="TestSwarm",
agents=agents,
flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}",
max_loops=1
max_loops=1,
)
test_task = (
"Analyze this test message and provide a brief summary."
)
test_task = "Analyze this test message and provide a brief summary."
logger.info(f"Running test task: {test_task}")
try:
result = rearrange.run(test_task)
assert result is not None
logger.info(f"Successfully executed task with result length: {len(str(result))}")
logger.info(
f"Successfully executed task with result length: {len(str(result))}"
)
return True
except Exception as e:
logger.error(f"Task execution failed: {str(e)}")
raise
def run_all_tests() -> Dict[str, TestResult]:
"""
Run all test cases and collect results
Returns:
Dict[str, TestResult]: Dictionary mapping test names to their results
"""
@ -249,26 +290,26 @@ def run_all_tests() -> Dict[str, TestResult]:
test_init,
test_validate_flow,
test_add_remove_agent,
test_basic_run
test_basic_run,
]
results = {}
for test in test_functions:
result = test()
results[test.__name__] = result
# Log summary
total_tests = len(results)
passed_tests = sum(1 for r in results.values() if r.success)
failed_tests = total_tests - passed_tests
logger.info("\n📊 Test Suite Summary:")
logger.info(f"Total Tests: {total_tests}")
print(f"✅ Passed: {passed_tests}")
if failed_tests > 0:
logger.error(f"❌ Failed: {failed_tests}")
# Detailed failure information
if failed_tests > 0:
logger.error("\n❌ Failed Tests Details:")
@ -277,10 +318,11 @@ def run_all_tests() -> Dict[str, TestResult]:
logger.error(f"\n{name}:")
logger.error(f"Error: {result.error}")
logger.error(f"Traceback: {result.traceback}")
return results
if __name__ == "__main__":
print("🌟 Starting AgentRearrange Test Suite")
results = run_all_tests()
print("🏁 Test Suite Execution Completed")
print("🏁 Test Suite Execution Completed")

@ -12,6 +12,7 @@ from swarms.structs.rearrange import AgentRearrange
class TestResult:
"""Class to store test results and metadata"""
def __init__(self, test_name: str):
self.test_name = test_name
self.start_time = datetime.now()
@ -21,7 +22,9 @@ class TestResult:
self.traceback = None
self.function_output = None
def complete(self, success: bool, error: Optional[Exception] = None):
def complete(
self, success: bool, error: Optional[Exception] = None
):
"""Complete the test execution with results"""
self.end_time = datetime.now()
self.success = success
@ -35,36 +38,47 @@ class TestResult:
return (self.end_time - self.start_time).total_seconds()
return 0
def run_test(test_func: Callable) -> TestResult:
"""
Decorator to run tests with error handling and logging
Args:
test_func (Callable): Test function to execute
Returns:
TestResult: Object containing test execution details
"""
def wrapper(*args, **kwargs) -> TestResult:
result = TestResult(test_func.__name__)
logger.info(f"\n{'='*20} Running test: {test_func.__name__} {'='*20}")
logger.info(
f"\n{'='*20} Running test: {test_func.__name__} {'='*20}"
)
try:
output = test_func(*args, **kwargs)
result.function_output = output
result.complete(success=True)
logger.success(f"✅ Test {test_func.__name__} passed successfully")
logger.success(
f"✅ Test {test_func.__name__} passed successfully"
)
except Exception as e:
result.complete(success=False, error=e)
logger.error(f"❌ Test {test_func.__name__} failed with error: {str(e)}")
logger.error(
f"❌ Test {test_func.__name__} failed with error: {str(e)}"
)
logger.error(f"Traceback: {traceback.format_exc()}")
logger.info(f"Test duration: {result.duration():.2f} seconds\n")
logger.info(
f"Test duration: {result.duration():.2f} seconds\n"
)
return result
return wrapper
def create_functional_agents() -> List[Agent]:
"""
Create a list of functional agents with real LLM integration for testing.
@ -73,16 +87,19 @@ def create_functional_agents() -> List[Agent]:
# Initialize OpenAI Chat model
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
logger.warning("No OpenAI API key found. Using mock agents instead.")
return [create_mock_agent("TestAgent1"), create_mock_agent("TestAgent2")]
logger.warning(
"No OpenAI API key found. Using mock agents instead."
)
return [
create_mock_agent("TestAgent1"),
create_mock_agent("TestAgent2"),
]
try:
model = OpenAIChat(
api_key=api_key,
model_name="gpt-4o",
temperature=0.1
api_key=api_key, model_name="gpt-4o", temperature=0.1
)
# Create boss agent
boss_agent = Agent(
agent_name="BossAgent",
@ -101,7 +118,7 @@ def create_functional_agents() -> List[Agent]:
state_save_file_type="json",
saved_state_path="test_boss_agent.json",
)
# Create analysis agent
analysis_agent = Agent(
agent_name="AnalysisAgent",
@ -119,7 +136,7 @@ def create_functional_agents() -> List[Agent]:
state_save_file_type="json",
saved_state_path="test_analysis_agent.json",
)
# Create summary agent
summary_agent = Agent(
agent_name="SummaryAgent",
@ -137,83 +154,102 @@ def create_functional_agents() -> List[Agent]:
state_save_file_type="json",
saved_state_path="test_summary_agent.json",
)
logger.info("Successfully created functional agents with LLM integration")
logger.info(
"Successfully created functional agents with LLM integration"
)
return [boss_agent, analysis_agent, summary_agent]
except Exception as e:
logger.error(f"Failed to create functional agents: {str(e)}")
logger.warning("Falling back to mock agents")
return [create_mock_agent("TestAgent1"), create_mock_agent("TestAgent2")]
return [
create_mock_agent("TestAgent1"),
create_mock_agent("TestAgent2"),
]
def create_mock_agent(name: str) -> Agent:
"""Create a mock agent for testing when LLM integration is not available"""
return Agent(
agent_name=name,
system_prompt=f"You are a test agent named {name}",
llm=None
llm=None,
)
@run_test
def test_init():
"""Test AgentRearrange initialization with functional agents"""
logger.info("Creating agents for initialization test")
agents = create_functional_agents()
rearrange = AgentRearrange(
name="TestRearrange",
agents=agents,
flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}"
flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}",
)
assert rearrange.name == "TestRearrange"
assert len(rearrange.agents) == 3
assert rearrange.flow == f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}"
logger.info(f"Initialized AgentRearrange with {len(agents)} agents")
assert (
rearrange.flow
== f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}"
)
logger.info(
f"Initialized AgentRearrange with {len(agents)} agents"
)
return True
@run_test
def test_validate_flow():
"""Test flow validation logic"""
agents = create_functional_agents()
rearrange = AgentRearrange(
agents=agents,
flow=f"{agents[0].agent_name} -> {agents[1].agent_name}"
flow=f"{agents[0].agent_name} -> {agents[1].agent_name}",
)
logger.info("Testing valid flow pattern")
valid = rearrange.validate_flow()
assert valid is True
logger.info("Testing invalid flow pattern")
rearrange.flow = f"{agents[0].agent_name} {agents[1].agent_name}" # Missing arrow
try:
rearrange.validate_flow()
assert False, "Should have raised ValueError"
except ValueError as e:
logger.info(f"Successfully caught invalid flow error: {str(e)}")
logger.info(
f"Successfully caught invalid flow error: {str(e)}"
)
assert True
return True
@run_test
def test_add_remove_agent():
"""Test adding and removing agents from the swarm"""
agents = create_functional_agents()
rearrange = AgentRearrange(agents=agents[:2]) # Start with first two agents
rearrange = AgentRearrange(
agents=agents[:2]
) # Start with first two agents
logger.info("Testing agent addition")
new_agent = agents[2] # Use the third agent as new agent
rearrange.add_agent(new_agent)
assert new_agent.agent_name in rearrange.agents
logger.info("Testing agent removal")
rearrange.remove_agent(new_agent.agent_name)
assert new_agent.agent_name not in rearrange.agents
return True
@run_test
def test_basic_run():
"""Test basic task execution with the swarm"""
@ -222,25 +258,30 @@ def test_basic_run():
name="TestSwarm",
agents=agents,
flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}",
max_loops=1
max_loops=1,
)
test_task = (
"Analyze this test message and provide a brief summary."
)
test_task = "Analyze this test message and provide a brief summary."
logger.info(f"Running test task: {test_task}")
try:
result = rearrange.run(test_task)
assert result is not None
logger.info(f"Successfully executed task with result length: {len(str(result))}")
logger.info(
f"Successfully executed task with result length: {len(str(result))}"
)
return True
except Exception as e:
logger.error(f"Task execution failed: {str(e)}")
raise
def run_all_tests() -> Dict[str, TestResult]:
"""
Run all test cases and collect results
Returns:
Dict[str, TestResult]: Dictionary mapping test names to their results
"""
@ -249,26 +290,26 @@ def run_all_tests() -> Dict[str, TestResult]:
test_init,
test_validate_flow,
test_add_remove_agent,
test_basic_run
test_basic_run,
]
results = {}
for test in test_functions:
result = test()
results[test.__name__] = result
# Log summary
total_tests = len(results)
passed_tests = sum(1 for r in results.values() if r.success)
failed_tests = total_tests - passed_tests
logger.info("\n📊 Test Suite Summary:")
logger.info(f"Total Tests: {total_tests}")
print(f"✅ Passed: {passed_tests}")
if failed_tests > 0:
logger.error(f"❌ Failed: {failed_tests}")
# Detailed failure information
if failed_tests > 0:
logger.error("\n❌ Failed Tests Details:")
@ -277,10 +318,11 @@ def run_all_tests() -> Dict[str, TestResult]:
logger.error(f"\n{name}:")
logger.error(f"Error: {result.error}")
logger.error(f"Traceback: {result.traceback}")
return results
if __name__ == "__main__":
print("🌟 Starting AgentRearrange Test Suite")
results = run_all_tests()
print("🏁 Test Suite Execution Completed")
print("🏁 Test Suite Execution Completed")

@ -0,0 +1,198 @@
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
from dotenv import load_dotenv
load_dotenv()
def print_separator():
print("\n" + "=" * 50)
def test_initialization():
"""Test basic initialization of AutoSwarmBuilder"""
print_separator()
print("Testing AutoSwarmBuilder Initialization")
try:
swarm = AutoSwarmBuilder(
name="TestSwarm",
description="A test swarm for validation",
verbose=True,
max_loops=2,
)
print("✓ Created swarm with configuration:")
print(f" - Name: {swarm.name}")
print(f" - Description: {swarm.description}")
print(f" - Max loops: {swarm.max_loops}")
print(f" - Verbose: {swarm.verbose}")
print("✓ Initialization test passed")
return swarm
except Exception as e:
print(f"✗ Initialization test failed: {str(e)}")
raise
def test_agent_building():
"""Test building individual agents"""
print_separator()
print("Testing Agent Building")
try:
swarm = AutoSwarmBuilder()
agent = swarm.build_agent(
agent_name="TestAgent",
agent_description="A test agent",
agent_system_prompt="You are a test agent",
max_loops=1,
)
print("✓ Built agent with configuration:")
print(f" - Name: {agent.agent_name}")
print(f" - Description: {agent.description}")
print(f" - Max loops: {agent.max_loops}")
print("✓ Agent building test passed")
return agent
except Exception as e:
print(f"✗ Agent building test failed: {str(e)}")
raise
def test_agent_creation():
"""Test creating multiple agents for a task"""
print_separator()
print("Testing Agent Creation from Task")
try:
swarm = AutoSwarmBuilder(
name="ResearchSwarm",
description="A swarm for research tasks",
)
task = "Research the latest developments in quantum computing"
agents = swarm._create_agents(task)
print("✓ Created agents for research task:")
for i, agent in enumerate(agents, 1):
print(f" Agent {i}:")
print(f" - Name: {agent.agent_name}")
print(f" - Description: {agent.description}")
print(f"✓ Created {len(agents)} agents successfully")
return agents
except Exception as e:
print(f"✗ Agent creation test failed: {str(e)}")
raise
def test_swarm_routing():
"""Test routing tasks through the swarm"""
print_separator()
print("Testing Swarm Routing")
try:
swarm = AutoSwarmBuilder(
name="RouterTestSwarm",
description="Testing routing capabilities",
)
agents = (
test_agent_creation()
) # Get agents from previous test
task = "Analyze the impact of AI on healthcare"
print("Starting task routing...")
result = swarm.swarm_router(agents, task)
print("✓ Task routed successfully")
print(
f" - Result length: {len(str(result)) if result else 0} characters"
)
print("✓ Swarm routing test passed")
return result
except Exception as e:
print(f"✗ Swarm routing test failed: {str(e)}")
raise
def test_full_swarm_execution():
"""Test complete swarm execution with a real task"""
print_separator()
print("Testing Full Swarm Execution")
try:
swarm = AutoSwarmBuilder(
name="FullTestSwarm",
description="Testing complete swarm functionality",
max_loops=1,
)
task = (
"Create a summary of recent advances in renewable energy"
)
print("Starting full swarm execution...")
result = swarm.run(task)
print("✓ Full swarm execution completed:")
print(f" - Output generated: {bool(result)}")
print(
f" - Output length: {len(str(result)) if result else 0} characters"
)
print("✓ Full swarm execution test passed")
return result
except Exception as e:
print(f"✗ Full swarm execution test failed: {str(e)}")
raise
def test_error_handling():
"""Test error handling in swarm operations"""
print_separator()
print("Testing Error Handling")
try:
swarm = AutoSwarmBuilder()
# Test with invalid agent configuration
print("Testing invalid agent configuration...")
try:
swarm.build_agent("", "", "")
print(
"✗ Should have raised an error for empty agent configuration"
)
except Exception as e:
print(
f"✓ Correctly handled invalid agent configuration: {type(e).__name__}"
)
# Test with None task
print("\nTesting None task...")
try:
swarm.run(None)
print("✗ Should have raised an error for None task")
except Exception as e:
print(
f"✓ Correctly handled None task: {type(e).__name__}"
)
print("✓ Error handling test passed")
except Exception as e:
print(f"✗ Error handling test failed: {str(e)}")
raise
def run_all_tests():
"""Run complete test suite"""
print("\n=== Starting AutoSwarmBuilder Test Suite ===\n")
try:
# Run all tests in sequence
test_initialization()
test_agent_building()
test_agent_creation()
test_swarm_routing()
test_full_swarm_execution()
test_error_handling()
print_separator()
print("🎉 All tests completed successfully!")
except Exception as e:
print_separator()
print(f"❌ Test suite failed: {str(e)}")
raise
if __name__ == "__main__":
run_all_tests()

@ -12,6 +12,7 @@ ceo = Agent(llm=llm, name="CEO")
dev = Agent(llm=llm, name="Developer")
va = Agent(llm=llm, name="VA")
hr = Agent(llm=llm, name="HR")
shared_instructions = "Listen to your boss"

@ -0,0 +1,177 @@
import asyncio
import time
from swarms.structs.agent import Agent
from swarms.structs.multi_process_workflow import MultiProcessWorkflow
def create_test_agent(name: str) -> Agent:
"""Create a test agent that simply returns its input with a timestamp"""
return Agent(
agent_name=name,
system_prompt=f"Test prompt for {name}",
model_name="gpt-4o-mini",
max_loops=1,
)
def test_initialization():
"""Test basic workflow initialization"""
print("\n=== Testing Workflow Initialization ===")
try:
agents = [create_test_agent(f"agent{i}") for i in range(3)]
workflow = MultiProcessWorkflow(max_workers=2, agents=agents)
print("✓ Created workflow with configuration:")
print(f" - Max workers: {workflow.max_workers}")
print(f" - Number of agents: {len(workflow.agents)}")
print(f" - Autosave: {workflow.autosave}")
print("✓ Initialization test passed")
except Exception as e:
print(f"✗ Initialization test failed: {str(e)}")
raise
def test_execute_task():
"""Test execution of a single task"""
print("\n=== Testing Task Execution ===")
try:
agents = [create_test_agent("test_agent")]
workflow = MultiProcessWorkflow(agents=agents)
test_task = "Return this message with timestamp"
result = workflow.execute_task(test_task)
print("✓ Task executed successfully")
print(f" - Input task: {test_task}")
print(f" - Result: {result}")
print("✓ Task execution test passed")
except Exception as e:
print(f"✗ Task execution test failed: {str(e)}")
raise
def test_parallel_run():
"""Test parallel execution of tasks"""
print("\n=== Testing Parallel Run ===")
try:
agents = [create_test_agent(f"agent{i}") for i in range(3)]
workflow = MultiProcessWorkflow(max_workers=2, agents=agents)
test_task = "Process this in parallel"
results = workflow.run(test_task)
print("✓ Parallel execution completed")
# print(f" - Number of results: {len(results)}")
print(f" - Results: {results}")
print("✓ Parallel run test passed")
except Exception as e:
print(f"✗ Parallel run test failed: {str(e)}")
raise
async def test_async_run():
"""Test asynchronous execution of tasks"""
print("\n=== Testing Async Run ===")
try:
agents = [create_test_agent(f"agent{i}") for i in range(3)]
workflow = MultiProcessWorkflow(max_workers=2, agents=agents)
test_task = "Process this asynchronously"
results = await workflow.async_run(test_task)
print("✓ Async execution completed")
print(f" - Number of results: {len(results)}")
print(f" - Results: {results}")
print("✓ Async run test passed")
except Exception as e:
print(f"✗ Async run test failed: {str(e)}")
raise
def test_batched_run():
"""Test batch execution of tasks"""
print("\n=== Testing Batched Run ===")
try:
agents = [create_test_agent(f"agent{i}") for i in range(2)]
workflow = MultiProcessWorkflow(max_workers=2, agents=agents)
tasks = [f"Batch task {i}" for i in range(5)]
results = workflow.batched_run(tasks, batch_size=2)
print("✓ Batch execution completed")
print(f" - Number of tasks: {len(tasks)}")
print(" - Batch size: 2")
print(f" - Results: {results}")
print("✓ Batched run test passed")
except Exception as e:
print(f"✗ Batched run test failed: {str(e)}")
raise
def test_concurrent_run():
"""Test concurrent execution of tasks"""
print("\n=== Testing Concurrent Run ===")
try:
agents = [create_test_agent(f"agent{i}") for i in range(2)]
workflow = MultiProcessWorkflow(max_workers=2, agents=agents)
tasks = [f"Concurrent task {i}" for i in range(4)]
results = workflow.concurrent_run(tasks)
print("✓ Concurrent execution completed")
print(f" - Number of tasks: {len(tasks)}")
print(f" - Results: {results}")
print("✓ Concurrent run test passed")
except Exception as e:
print(f"✗ Concurrent run test failed: {str(e)}")
raise
def test_error_handling():
"""Test error handling in workflow"""
print("\n=== Testing Error Handling ===")
try:
# Create workflow with no agents to trigger error
workflow = MultiProcessWorkflow(max_workers=2, agents=None)
result = workflow.execute_task(
"This should handle the error gracefully"
)
print("✓ Error handled gracefully")
print(f" - Result when no agents: {result}")
print("✓ Error handling test passed")
except Exception as e:
print(f"✗ Error handling test failed: {str(e)}")
raise
async def run_all_tests():
"""Run all tests"""
print("\n=== Starting MultiProcessWorkflow Test Suite ===")
start_time = time.time()
try:
# Run synchronous tests
test_initialization()
test_execute_task()
test_parallel_run()
test_batched_run()
test_concurrent_run()
test_error_handling()
# Run async test
await test_async_run()
end_time = time.time()
duration = round(end_time - start_time, 2)
print("\n=== Test Suite Completed Successfully ===")
print(f"Time taken: {duration} seconds")
except Exception as e:
print("\n=== Test Suite Failed ===")
print(f"Error: {str(e)}")
raise
if __name__ == "__main__":
asyncio.run(run_all_tests())

@ -0,0 +1,226 @@
import os
import asyncio
from loguru import logger
from swarms.structs.agent import Agent
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
def create_test_csv() -> str:
"""Create a test CSV file with agent configurations."""
print("\nStarting creation of test CSV file")
try:
csv_content = """agent_name,description,system_prompt,task
test_agent_1,Test Agent 1,System prompt 1,Task 1
test_agent_2,Test Agent 2,System prompt 2,Task 2"""
file_path = "test_agents.csv"
with open(file_path, "w") as f:
f.write(csv_content)
print(f"Created CSV with content:\n{csv_content}")
print(f"CSV file created at: {file_path}")
return file_path
except Exception as e:
logger.error(f"Failed to create test CSV: {str(e)}")
raise
def create_test_agent(name: str) -> Agent:
"""Create a test agent with specified name."""
print(f"\nCreating test agent: {name}")
try:
agent = Agent(
agent_name=name,
system_prompt=f"Test prompt for {name}",
model_name="gpt-4o-mini",
max_loops=1,
autosave=True,
verbose=True,
)
print(f"Created agent: {name}")
return agent
except Exception as e:
logger.error(f"Failed to create agent {name}: {str(e)}")
raise
def test_swarm_initialization() -> None:
"""Test basic swarm initialization."""
print("\n[TEST] Starting swarm initialization test")
try:
print("Creating test agents...")
agents = [
create_test_agent("agent1"),
create_test_agent("agent2"),
]
print("Initializing swarm...")
swarm = SpreadSheetSwarm(
name="Test Swarm",
description="Test Description",
agents=agents,
max_loops=2,
)
print("Verifying swarm configuration...")
assert swarm.name == "Test Swarm"
assert swarm.description == "Test Description"
assert len(swarm.agents) == 2
assert swarm.max_loops == 2
print("✅ Swarm initialization test PASSED")
except Exception as e:
logger.error(f"❌ Swarm initialization test FAILED: {str(e)}")
raise
async def test_load_from_csv() -> None:
"""Test loading agent configurations from CSV."""
print("\n[TEST] Starting CSV loading test")
try:
csv_path = create_test_csv()
print("Initializing swarm with CSV...")
swarm = SpreadSheetSwarm(load_path=csv_path)
print("Loading configurations...")
await swarm._load_from_csv()
print("Verifying loaded configurations...")
assert len(swarm.agents) == 2
assert len(swarm.agent_configs) == 2
assert "test_agent_1" in swarm.agent_configs
assert "test_agent_2" in swarm.agent_configs
os.remove(csv_path)
print(f"Cleaned up test file: {csv_path}")
print("✅ CSV loading test PASSED")
except Exception as e:
logger.error(f"❌ CSV loading test FAILED: {str(e)}")
raise
async def test_run_tasks() -> None:
"""Test running tasks with multiple agents."""
print("\n[TEST] Starting task execution test")
try:
print("Setting up test swarm...")
agents = [
create_test_agent("agent1"),
create_test_agent("agent2"),
]
swarm = SpreadSheetSwarm(agents=agents, max_loops=1)
test_task = "Test task for all agents"
print(f"Running test task: {test_task}")
await swarm._run_tasks(test_task)
print("Verifying task execution...")
assert swarm.metadata.tasks_completed == 2
assert len(swarm.metadata.outputs) == 2
print("✅ Task execution test PASSED")
except Exception as e:
logger.error(f"❌ Task execution test FAILED: {str(e)}")
raise
def test_output_tracking() -> None:
"""Test tracking of task outputs."""
print("\n[TEST] Starting output tracking test")
try:
print("Creating test swarm...")
swarm = SpreadSheetSwarm(agents=[create_test_agent("agent1")])
print("Tracking test output...")
swarm._track_output("agent1", "Test task", "Test result")
print("Verifying output tracking...")
assert swarm.metadata.tasks_completed == 1
assert len(swarm.metadata.outputs) == 1
assert swarm.metadata.outputs[0].agent_name == "agent1"
print("✅ Output tracking test PASSED")
except Exception as e:
logger.error(f"❌ Output tracking test FAILED: {str(e)}")
raise
async def test_save_to_csv() -> None:
"""Test saving metadata to CSV."""
print("\n[TEST] Starting CSV saving test")
try:
print("Setting up test data...")
swarm = SpreadSheetSwarm(
agents=[create_test_agent("agent1")],
save_file_path="test_output.csv",
)
swarm._track_output("agent1", "Test task", "Test result")
print("Saving to CSV...")
await swarm._save_to_csv()
print("Verifying file creation...")
assert os.path.exists(swarm.save_file_path)
os.remove(swarm.save_file_path)
print("Cleaned up test file")
print("✅ CSV saving test PASSED")
except Exception as e:
logger.error(f"❌ CSV saving test FAILED: {str(e)}")
raise
def test_json_export() -> None:
"""Test JSON export functionality."""
print("\n[TEST] Starting JSON export test")
try:
print("Creating test data...")
swarm = SpreadSheetSwarm(agents=[create_test_agent("agent1")])
swarm._track_output("agent1", "Test task", "Test result")
print("Exporting to JSON...")
json_output = swarm.export_to_json()
print("Verifying JSON output...")
assert isinstance(json_output, str)
assert "run_id" in json_output
assert "tasks_completed" in json_output
print("✅ JSON export test PASSED")
except Exception as e:
logger.error(f"❌ JSON export test FAILED: {str(e)}")
raise
async def run_all_tests() -> None:
"""Run all test functions."""
print("\n" + "=" * 50)
print("Starting SpreadsheetSwarm Test Suite")
print("=" * 50 + "\n")
try:
# Run synchronous tests
print("Running synchronous tests...")
test_swarm_initialization()
test_output_tracking()
test_json_export()
# Run asynchronous tests
print("\nRunning asynchronous tests...")
await test_load_from_csv()
await test_run_tasks()
await test_save_to_csv()
print("\n🎉 All tests completed successfully!")
print("=" * 50)
except Exception as e:
logger.error(f"\n❌ Test suite failed: {str(e)}")
print("=" * 50)
raise
if __name__ == "__main__":
# Run all tests
asyncio.run(run_all_tests())

@ -0,0 +1,301 @@
import asyncio
import time
from typing import List
from swarms.structs.agent import Agent
from swarms.structs.swarming_architectures import (
broadcast,
circular_swarm,
exponential_swarm,
geometric_swarm,
grid_swarm,
harmonic_swarm,
linear_swarm,
log_swarm,
mesh_swarm,
one_to_one,
one_to_three,
power_swarm,
pyramid_swarm,
sigmoid_swarm,
sinusoidal_swarm,
staircase_swarm,
star_swarm,
)
def create_test_agent(name: str) -> Agent:
"""Create a test agent with specified name"""
return Agent(
agent_name=name,
system_prompt=f"You are {name}. Respond with your name and the task you received.",
model_name="gpt-4o-mini",
max_loops=1,
)
def create_test_agents(num_agents: int) -> List[Agent]:
"""Create specified number of test agents"""
return [
create_test_agent(f"Agent{i+1}") for i in range(num_agents)
]
def print_separator():
print("\n" + "=" * 50 + "\n")
def test_circular_swarm():
"""Test and display circular swarm outputs"""
print_separator()
print("CIRCULAR SWARM TEST")
try:
agents = create_test_agents(3)
tasks = [
"Analyze data",
"Generate report",
"Summarize findings",
]
print("Running circular swarm with:")
print(f"Tasks: {tasks}\n")
result = circular_swarm(agents, tasks)
print("Circular Swarm Outputs:")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
def test_grid_swarm():
"""Test and display grid swarm outputs"""
print_separator()
print("GRID SWARM TEST")
try:
agents = create_test_agents(4) # 2x2 grid
tasks = ["Task A", "Task B", "Task C", "Task D"]
print("Running grid swarm with 2x2 grid")
print(f"Tasks: {tasks}\n")
print(grid_swarm(agents, tasks))
print(
"Grid Swarm completed - each agent processed tasks in its grid position"
)
except Exception as e:
print(f"Error: {str(e)}")
def test_linear_swarm():
"""Test and display linear swarm outputs"""
print_separator()
print("LINEAR SWARM TEST")
try:
agents = create_test_agents(3)
tasks = ["Research task", "Write content", "Review output"]
print("Running linear swarm with:")
print(f"Tasks: {tasks}\n")
result = linear_swarm(agents, tasks)
print("Linear Swarm Outputs:")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
def test_star_swarm():
"""Test and display star swarm outputs"""
print_separator()
print("STAR SWARM TEST")
try:
agents = create_test_agents(4) # 1 center + 3 peripheral
tasks = ["Coordinate workflow", "Process data"]
print("Running star swarm with:")
print(f"Center agent: {agents[0].agent_name}")
print(
f"Peripheral agents: {[agent.agent_name for agent in agents[1:]]}"
)
print(f"Tasks: {tasks}\n")
result = star_swarm(agents, tasks)
print("Star Swarm Outputs:")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
def test_mesh_swarm():
"""Test and display mesh swarm outputs"""
print_separator()
print("MESH SWARM TEST")
try:
agents = create_test_agents(3)
tasks = [
"Analyze data",
"Process information",
"Generate insights",
]
print("Running mesh swarm with:")
print(f"Tasks: {tasks}\n")
result = mesh_swarm(agents, tasks)
print(f"Mesh Swarm Outputs: {result}")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
def test_pyramid_swarm():
"""Test and display pyramid swarm outputs"""
print_separator()
print("PYRAMID SWARM TEST")
try:
agents = create_test_agents(6) # 1-2-3 pyramid
tasks = [
"Top task",
"Middle task 1",
"Middle task 2",
"Bottom task 1",
"Bottom task 2",
"Bottom task 3",
]
print("Running pyramid swarm with:")
print(f"Tasks: {tasks}\n")
result = pyramid_swarm(agents, tasks)
print(f"Pyramid Swarm Outputs: {result}")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
async def test_communication_patterns():
"""Test and display agent communication patterns"""
print_separator()
print("COMMUNICATION PATTERNS TEST")
try:
sender = create_test_agent("Sender")
receiver = create_test_agent("Receiver")
task = "Process and relay this message"
print("Testing One-to-One Communication:")
result = one_to_one(sender, receiver, task)
print(f"\nOne-to-One Communication Outputs: {result}")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
print("\nTesting One-to-Three Communication:")
receivers = create_test_agents(3)
await one_to_three(sender, receivers, task)
print("\nTesting Broadcast Communication:")
broadcast_receivers = create_test_agents(5)
await broadcast(sender, broadcast_receivers, task)
except Exception as e:
print(f"Error: {str(e)}")
def test_mathematical_swarms():
"""Test and display mathematical swarm patterns"""
print_separator()
print("MATHEMATICAL SWARMS TEST")
try:
agents = create_test_agents(8)
base_tasks = ["Calculate", "Process", "Analyze"]
# Test each mathematical swarm
for swarm_type, swarm_func in [
("Power Swarm", power_swarm),
("Log Swarm", log_swarm),
("Exponential Swarm", exponential_swarm),
("Geometric Swarm", geometric_swarm),
("Harmonic Swarm", harmonic_swarm),
]:
print(f"\nTesting {swarm_type}:")
tasks = [f"{task} in {swarm_type}" for task in base_tasks]
print(f"Tasks: {tasks}")
swarm_func(agents, tasks.copy())
except Exception as e:
print(f"Error: {str(e)}")
def test_pattern_swarms():
"""Test and display pattern-based swarms"""
print_separator()
print("PATTERN-BASED SWARMS TEST")
try:
agents = create_test_agents(10)
task = "Process according to pattern"
for swarm_type, swarm_func in [
("Staircase Swarm", staircase_swarm),
("Sigmoid Swarm", sigmoid_swarm),
("Sinusoidal Swarm", sinusoidal_swarm),
]:
print(f"\nTesting {swarm_type}:")
print(f"Task: {task}")
swarm_func(agents, task)
except Exception as e:
print(f"Error: {str(e)}")
def run_all_tests():
"""Run all swarm architecture tests"""
print(
"\n=== Starting Swarm Architectures Test Suite with Outputs ==="
)
start_time = time.time()
try:
# Test basic swarm patterns
test_circular_swarm()
test_grid_swarm()
test_linear_swarm()
test_star_swarm()
test_mesh_swarm()
test_pyramid_swarm()
# Test mathematical and pattern swarms
test_mathematical_swarms()
test_pattern_swarms()
# Test communication patterns
asyncio.run(test_communication_patterns())
end_time = time.time()
duration = round(end_time - start_time, 2)
print("\n=== Test Suite Completed Successfully ===")
print(f"Time taken: {duration} seconds")
except Exception as e:
print("\n=== Test Suite Failed ===")
print(f"Error: {str(e)}")
raise
if __name__ == "__main__":
run_all_tests()

@ -1,52 +0,0 @@
from unittest.mock import Mock, patch
import pytest
from swarms.structs.agent import Agent
from swarms.structs.swarm_net import SwarmNetwork
@pytest.fixture
def swarm_network():
agents = [Agent(id=f"Agent_{i}") for i in range(5)]
return SwarmNetwork(agents=agents)
def test_swarm_network_init(swarm_network):
assert isinstance(swarm_network.agents, list)
assert len(swarm_network.agents) == 5
@patch("swarms.structs.swarm_net.SwarmNetwork.logger")
def test_run(mock_logger, swarm_network):
swarm_network.run()
assert (
mock_logger.info.call_count == 10
) # 2 log messages per agent
def test_run_with_mocked_agents(mocker, swarm_network):
mock_agents = [Mock(spec=Agent) for _ in range(5)]
mocker.patch.object(swarm_network, "agents", mock_agents)
swarm_network.run()
for mock_agent in mock_agents:
assert mock_agent.run.called
def test_swarm_network_with_no_agents():
swarm_network = SwarmNetwork(agents=[])
assert swarm_network.agents == []
def test_swarm_network_add_agent(swarm_network):
new_agent = Agent(id="Agent_5")
swarm_network.add_agent(new_agent)
assert len(swarm_network.agents) == 6
assert swarm_network.agents[-1] == new_agent
def test_swarm_network_remove_agent(swarm_network):
agent_to_remove = swarm_network.agents[0]
swarm_network.remove_agent(agent_to_remove)
assert len(swarm_network.agents) == 4
assert agent_to_remove not in swarm_network.agents
Loading…
Cancel
Save