Merge branch 'master' into master

pull/696/head
Kye Gomez 3 weeks ago committed by GitHub
commit 41d40eb686
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

File diff suppressed because it is too large Load Diff

@ -1,320 +1,291 @@
import os
import json
import logging
from typing import Dict, Optional, Any
from dataclasses import dataclass
import requests import requests
from loguru import logger
import time import time
from typing import Dict, Optional, Tuple
from uuid import UUID
BASE_URL = "http://0.0.0.0:8000/v1" # Set up logging
logging.basicConfig(
level=logging.INFO,
def check_api_server() -> bool: format="%(asctime)s - %(levelname)s - %(message)s",
"""Check if the API server is running and accessible.""" handlers=[
try: logging.FileHandler("api_tests.log"),
response = requests.get(f"{BASE_URL}/docs") logging.StreamHandler(),
return response.status_code == 200 ],
except requests.exceptions.ConnectionError: )
logger.error("API server is not running at {BASE_URL}") logger = logging.getLogger(__name__)
logger.error("Please start the API server first with:")
logger.error(" python main.py")
return False # Configuration
except Exception as e: @dataclass
logger.error(f"Error checking API server: {str(e)}") class TestConfig:
return False """Test configuration settings"""
base_url: str
class TestSession: timeout: int = 30
"""Manages test session state and authentication.""" verify_ssl: bool = True
debug: bool = True
def __init__(self):
self.user_id: Optional[UUID] = None
self.api_key: Optional[str] = None # Load config from environment or use defaults
self.test_agents: list[UUID] = [] config = TestConfig(
base_url=os.getenv("API_BASE_URL", "http://0.0.0.0:8000/v1")
@property )
def headers(self) -> Dict[str, str]:
"""Get headers with authentication."""
return {"api-key": self.api_key} if self.api_key else {} class APIClient:
"""API Client for testing"""
def create_test_user(session: TestSession) -> Tuple[bool, str]: def __init__(self, config: TestConfig):
"""Create a test user and store credentials in session.""" self.config = config
logger.info("Creating test user") self.session = requests.Session()
try: def _url(self, path: str) -> str:
response = requests.post( """Construct full URL"""
f"{BASE_URL}/users", return f"{self.config.base_url}/{path.lstrip('/')}"
json={"username": f"test_user_{int(time.time())}"},
def _log_request_details(
self, method: str, url: str, headers: Dict, data: Any
):
"""Log request details for debugging"""
logger.info("\nRequest Details:")
logger.info(f"Method: {method}")
logger.info(f"URL: {url}")
logger.info(f"Headers: {json.dumps(headers, indent=2)}")
logger.info(
f"Data: {json.dumps(data, indent=2) if data else None}"
) )
if response.status_code == 200: def _log_response_details(self, response: requests.Response):
data = response.json() """Log response details for debugging"""
session.user_id = data["user_id"] logger.info("\nResponse Details:")
session.api_key = data["api_key"] logger.info(f"Status Code: {response.status_code}")
logger.success(f"Created user with ID: {session.user_id}") logger.info(
return True, "Success" f"Headers: {json.dumps(dict(response.headers), indent=2)}"
else:
logger.error(f"Failed to create user: {response.text}")
return False, response.text
except Exception as e:
logger.exception("Exception during user creation")
return False, str(e)
def create_additional_api_key(
session: TestSession,
) -> Tuple[bool, str]:
"""Test creating an additional API key."""
logger.info("Creating additional API key")
try:
response = requests.post(
f"{BASE_URL}/users/{session.user_id}/api-keys",
headers=session.headers,
json={"name": "Test Key"},
) )
if response.status_code == 200:
logger.success("Created additional API key")
return True, response.json()["key"]
else:
logger.error(f"Failed to create API key: {response.text}")
return False, response.text
except Exception as e:
logger.exception("Exception during API key creation")
return False, str(e)
def test_create_agent(
session: TestSession,
) -> Tuple[bool, Optional[UUID]]:
"""Test creating a new agent."""
logger.info("Testing agent creation")
payload = {
"agent_name": f"Test Agent {int(time.time())}",
"system_prompt": "You are a helpful assistant",
"model_name": "gpt-4",
"description": "Test agent",
"tags": ["test", "automated"],
}
try: try:
response = requests.post( logger.info(
f"{BASE_URL}/agent", headers=session.headers, json=payload f"Body: {json.dumps(response.json(), indent=2)}"
) )
if response.status_code == 200:
agent_id = response.json()["agent_id"]
session.test_agents.append(agent_id)
logger.success(f"Created agent with ID: {agent_id}")
return True, agent_id
else:
logger.error(f"Failed to create agent: {response.text}")
return False, None
except Exception: except Exception:
logger.exception("Exception during agent creation") logger.info(f"Body: {response.text}")
return False, None
def _request(
self,
def test_list_user_agents(session: TestSession) -> bool: method: str,
"""Test listing user's agents.""" path: str,
logger.info("Testing user agent listing") headers: Optional[Dict] = None,
**kwargs: Any,
try: ) -> requests.Response:
response = requests.get( """Make HTTP request with config defaults"""
f"{BASE_URL}/users/me/agents", headers=session.headers url = self._url(path)
) headers = headers or {}
if response.status_code == 200: if self.config.debug:
agents = response.json() self._log_request_details(
logger.success(f"Found {len(agents)} user agents") method, url, headers, kwargs.get("json")
return True
else:
logger.error(
f"Failed to list user agents: {response.text}"
) )
return False
except Exception:
logger.exception("Exception during agent listing")
return False
def test_agent_operations(
session: TestSession, agent_id: UUID
) -> bool:
"""Test various operations on an agent."""
logger.info(f"Testing operations for agent {agent_id}")
# Test update
try: try:
update_response = requests.patch( response = self.session.request(
f"{BASE_URL}/agent/{agent_id}", method=method,
headers=session.headers, url=url,
json={ headers=headers,
"description": "Updated description", timeout=self.config.timeout,
"tags": ["test", "updated"], verify=self.config.verify_ssl,
}, **kwargs,
) )
if update_response.status_code != 200:
logger.error(
f"Failed to update agent: {update_response.text}"
)
return False
# Test metrics if self.config.debug:
metrics_response = requests.get( self._log_response_details(response)
f"{BASE_URL}/agent/{agent_id}/metrics",
headers=session.headers, if response.status_code >= 400:
)
if metrics_response.status_code != 200:
logger.error( logger.error(
f"Failed to get agent metrics: {metrics_response.text}" f"Request failed with status {response.status_code}"
) )
return False logger.error(f"Response: {response.text}")
logger.success("Successfully performed agent operations")
return True
except Exception:
logger.exception("Exception during agent operations")
return False
response.raise_for_status()
return response
def test_completion(session: TestSession, agent_id: UUID) -> bool: except requests.exceptions.RequestException as e:
"""Test running a completion.""" logger.error(f"Request failed: {str(e)}")
logger.info("Testing completion") if hasattr(e, "response") and e.response is not None:
logger.error(f"Error response: {e.response.text}")
raise
payload = {
"prompt": "What is the weather like today?",
"agent_id": agent_id,
"max_tokens": 100,
}
try:
response = requests.post(
f"{BASE_URL}/agent/completions",
headers=session.headers,
json=payload,
)
if response.status_code == 200: class TestRunner:
completion_data = response.json() """Test runner with logging and reporting"""
print(completion_data)
logger.success(
f"Got completion, used {completion_data['token_usage']['total_tokens']} tokens"
)
return True
else:
logger.error(f"Failed to get completion: {response.text}")
return False
except Exception:
logger.exception("Exception during completion")
return False
def __init__(self):
self.client = APIClient(config)
self.results = {"passed": 0, "failed": 0, "total_time": 0}
self.api_key = None
self.user_id = None
self.agent_id = None
def cleanup_test_resources(session: TestSession): def run_test(self, test_name: str, test_func: callable):
"""Clean up all test resources.""" """Run a single test with timing and logging"""
logger.info("Cleaning up test resources") logger.info(f"\nRunning test: {test_name}")
start_time = time.time()
# Delete test agents
for agent_id in session.test_agents:
try: try:
response = requests.delete( test_func()
f"{BASE_URL}/agent/{agent_id}", self.results["passed"] += 1
headers=session.headers, logger.info(f"{test_name} - PASSED")
except Exception as e:
self.results["failed"] += 1
logger.error(f"{test_name} - FAILED: {str(e)}")
logger.exception(e)
end_time = time.time()
duration = end_time - start_time
self.results["total_time"] += duration
logger.info(f"Test duration: {duration:.2f}s")
def test_user_creation(self):
"""Test user creation"""
response = self.client._request(
"POST", "/users", json={"username": "test_user"}
) )
if response.status_code == 200: data = response.json()
logger.debug(f"Deleted agent {agent_id}") assert "user_id" in data, "No user_id in response"
else: assert "api_key" in data, "No api_key in response"
logger.warning( self.api_key = data["api_key"]
f"Failed to delete agent {agent_id}: {response.text}" self.user_id = data["user_id"]
logger.info(f"Created user with ID: {self.user_id}")
def test_create_api_key(self):
"""Test API key creation"""
headers = {"api-key": self.api_key}
response = self.client._request(
"POST",
f"/users/{self.user_id}/api-keys",
headers=headers,
json={"name": "test_key"},
) )
except Exception: data = response.json()
logger.exception(f"Exception deleting agent {agent_id}") assert "key" in data, "No key in response"
logger.info("Successfully created new API key")
# Revoke API keys
if session.user_id: def test_create_agent(self):
try: """Test agent creation"""
response = requests.get( headers = {"api-key": self.api_key}
f"{BASE_URL}/users/{session.user_id}/api-keys", agent_config = {
headers=session.headers, "agent_name": "test_agent",
"model_name": "gpt-4",
"system_prompt": "You are a test agent",
"description": "Test agent description",
"temperature": 0.7,
"max_loops": 1,
}
response = self.client._request(
"POST", "/agent", headers=headers, json=agent_config
) )
if response.status_code == 200: data = response.json()
for key in response.json(): assert "agent_id" in data, "No agent_id in response"
try: self.agent_id = data["agent_id"]
revoke_response = requests.delete( logger.info(f"Created agent with ID: {self.agent_id}")
f"{BASE_URL}/users/{session.user_id}/api-keys/{key['key']}",
headers=session.headers, # Wait a bit for agent to be ready
time.sleep(2)
def test_list_agents(self):
"""Test agent listing"""
headers = {"api-key": self.api_key}
response = self.client._request(
"GET", "/agents", headers=headers
) )
if revoke_response.status_code == 200: agents = response.json()
logger.debug( assert isinstance(agents, list), "Response is not a list"
f"Revoked API key {key['name']}" assert len(agents) > 0, "No agents returned"
logger.info(f"Successfully retrieved {len(agents)} agents")
def test_agent_completion(self):
"""Test agent completion"""
if not self.agent_id:
logger.error("No agent_id available for completion test")
raise ValueError("Agent ID not set")
headers = {"api-key": self.api_key}
completion_request = {
"prompt": "Write 'Hello World!'",
"agent_id": str(
self.agent_id
), # Ensure UUID is converted to string
"max_tokens": 100,
"stream": False,
"temperature_override": 0.7,
}
logger.info(
f"Sending completion request for agent {self.agent_id}"
) )
else: response = self.client._request(
logger.warning( "POST",
f"Failed to revoke API key {key['name']}" "/agent/completions",
headers=headers,
json=completion_request,
) )
except Exception: data = response.json()
logger.exception( assert "response" in data, "No response in completion"
f"Exception revoking API key {key['name']}" logger.info(f"Completion response: {data.get('response')}")
def run_all_tests(self):
"""Run all tests and generate report"""
logger.info("\n" + "=" * 50)
logger.info("Starting API test suite...")
logger.info(f"Base URL: {config.base_url}")
logger.info("=" * 50 + "\n")
# Define test sequence
tests = [
("User Creation", self.test_user_creation),
("API Key Creation", self.test_create_api_key),
("Agent Creation", self.test_create_agent),
("List Agents", self.test_list_agents),
("Agent Completion", self.test_agent_completion),
]
# Run tests
for test_name, test_func in tests:
self.run_test(test_name, test_func)
# Generate report
self.print_report()
def print_report(self):
"""Print test results report"""
total_tests = self.results["passed"] + self.results["failed"]
success_rate = (
(self.results["passed"] / total_tests * 100)
if total_tests > 0
else 0
) )
except Exception:
logger.exception("Exception getting API keys for cleanup")
report = f"""
def run_test_workflow(): \n{'='*50}
"""Run complete test workflow.""" API TEST RESULTS
logger.info("Starting API tests") {'='*50}
Total Tests: {total_tests}
# Check if API server is running first Passed: {self.results['passed']}
if not check_api_server(): Failed: {self.results['failed']}
return False Success Rate: {success_rate:.2f}%
Total Time: {self.results['total_time']:.2f}s
session = TestSession() {'='*50}
"""
try: logger.info(report)
# Create user
user_success, message = create_test_user(session)
if not user_success:
logger.error(f"User creation failed: {message}")
return False
# Create additional API key
key_success, key = create_additional_api_key(session)
if not key_success:
logger.error(f"API key creation failed: {key}")
return False
# Create agent
agent_success, agent_id = test_create_agent(session)
if not agent_success or not agent_id:
logger.error("Agent creation failed")
return False
# Test user agent listing
if not test_list_user_agents(session):
logger.error("Agent listing failed")
return False
# Test agent operations
if not test_agent_operations(session, agent_id):
logger.error("Agent operations failed")
return False
# Test completion
if not test_completion(session, agent_id):
logger.error("Completion test failed")
return False
logger.success("All tests completed successfully")
return True
except Exception:
logger.exception("Exception during test workflow")
return False
finally:
cleanup_test_resources(session)
if __name__ == "__main__": if __name__ == "__main__":
success = run_test_workflow() try:
print(success) runner = TestRunner()
runner.run_all_tests()
except KeyboardInterrupt:
logger.info("\nTest suite interrupted by user")
except Exception as e:
logger.error(f"Test suite failed: {str(e)}")
logger.exception(e)

@ -0,0 +1,254 @@
import os
from typing import Dict, Optional, Any
from dataclasses import dataclass
import pytest
import requests
from uuid import UUID
from pydantic import BaseModel
from _pytest.terminal import TerminalReporter
# Configuration
@dataclass
class TestConfig:
"""Test configuration settings"""
base_url: str
timeout: int = 30
verify_ssl: bool = True
# Load config from environment or use defaults
config = TestConfig(
base_url=os.getenv("API_BASE_URL", "http://localhost:8000/v1")
)
# API Response Types
class UserResponse(BaseModel):
user_id: str
api_key: str
class AgentResponse(BaseModel):
agent_id: UUID
class MetricsResponse(BaseModel):
total_completions: int
average_response_time: float
error_rate: float
last_24h_completions: int
total_tokens_used: int
uptime_percentage: float
success_rate: float
peak_tokens_per_minute: int
class APIClient:
"""API Client with typed methods"""
def __init__(self, config: TestConfig):
self.config = config
self.session = requests.Session()
def _url(self, path: str) -> str:
"""Construct full URL"""
return f"{self.config.base_url}/{path.lstrip('/')}"
def _request(
self,
method: str,
path: str,
headers: Optional[Dict] = None,
**kwargs: Any,
) -> requests.Response:
"""Make HTTP request with config defaults"""
url = self._url(path)
return self.session.request(
method=method,
url=url,
headers=headers,
timeout=self.config.timeout,
verify=self.config.verify_ssl,
**kwargs,
)
def create_user(self, username: str) -> UserResponse:
"""Create a new user"""
response = self._request(
"POST", "/users", json={"username": username}
)
response.raise_for_status()
return UserResponse(**response.json())
def create_agent(
self, agent_config: Dict[str, Any], api_key: str
) -> AgentResponse:
"""Create a new agent"""
headers = {"api-key": api_key}
response = self._request(
"POST", "/agent", headers=headers, json=agent_config
)
response.raise_for_status()
return AgentResponse(**response.json())
def get_metrics(
self, agent_id: UUID, api_key: str
) -> MetricsResponse:
"""Get agent metrics"""
headers = {"api-key": api_key}
response = self._request(
"GET", f"/agent/{agent_id}/metrics", headers=headers
)
response.raise_for_status()
return MetricsResponse(**response.json())
# Test Fixtures
@pytest.fixture
def api_client() -> APIClient:
"""Fixture for API client"""
return APIClient(config)
@pytest.fixture
def test_user(api_client: APIClient) -> UserResponse:
"""Fixture for test user"""
return api_client.create_user("test_user")
@pytest.fixture
def test_agent(
api_client: APIClient, test_user: UserResponse
) -> AgentResponse:
"""Fixture for test agent"""
agent_config = {
"agent_name": "test_agent",
"model_name": "gpt-4",
"system_prompt": "You are a test agent",
"description": "Test agent description",
}
return api_client.create_agent(agent_config, test_user.api_key)
# Tests
def test_user_creation(api_client: APIClient):
"""Test user creation flow"""
response = api_client.create_user("new_test_user")
assert response.user_id
assert response.api_key
def test_agent_creation(
api_client: APIClient, test_user: UserResponse
):
"""Test agent creation flow"""
agent_config = {
"agent_name": "test_agent",
"model_name": "gpt-4",
"system_prompt": "You are a test agent",
"description": "Test agent description",
}
response = api_client.create_agent(
agent_config, test_user.api_key
)
assert response.agent_id
def test_agent_metrics(
api_client: APIClient,
test_user: UserResponse,
test_agent: AgentResponse,
):
"""Test metrics retrieval"""
metrics = api_client.get_metrics(
test_agent.agent_id, test_user.api_key
)
assert metrics.total_completions >= 0
assert metrics.error_rate >= 0
assert metrics.uptime_percentage >= 0
def test_invalid_auth(api_client: APIClient):
"""Test invalid authentication"""
with pytest.raises(requests.exceptions.HTTPError) as exc_info:
api_client.create_agent({}, "invalid_key")
assert exc_info.value.response.status_code == 401
# Custom pytest plugin to capture test results
class ResultCapture:
def __init__(self):
self.total = 0
self.passed = 0
self.failed = 0
self.errors = 0
@pytest.hookimpl(hookwrapper=True)
def pytest_terminal_summary(
terminalreporter: TerminalReporter, exitstatus: int
):
yield
capture = getattr(
terminalreporter.config, "_result_capture", None
)
if capture:
capture.total = (
len(terminalreporter.stats.get("passed", []))
+ len(terminalreporter.stats.get("failed", []))
+ len(terminalreporter.stats.get("error", []))
)
capture.passed = len(terminalreporter.stats.get("passed", []))
capture.failed = len(terminalreporter.stats.get("failed", []))
capture.errors = len(terminalreporter.stats.get("error", []))
@dataclass
class TestReport:
total_tests: int
passed: int
failed: int
errors: int
@property
def success_rate(self) -> float:
return (
(self.passed / self.total_tests) * 100
if self.total_tests > 0
else 0
)
def run_tests() -> TestReport:
"""Run tests and generate typed report"""
# Create result capture
capture = ResultCapture()
# Create pytest configuration
args = [__file__, "-v"]
# Run pytest with our plugin
pytest.main(args, plugins=[capture])
# Generate report
return TestReport(
total_tests=capture.total,
passed=capture.passed,
failed=capture.failed,
errors=capture.errors,
)
if __name__ == "__main__":
# Example usage with environment variable
# export API_BASE_URL=http://api.example.com/v1
report = run_tests()
print("\nTest Results:")
print(f"Total Tests: {report.total_tests}")
print(f"Passed: {report.passed}")
print(f"Failed: {report.failed}")
print(f"Errors: {report.errors}")
print(f"Success Rate: {report.success_rate:.2f}%")

@ -0,0 +1,472 @@
import asyncio
import json
from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID
import httpx
from loguru import logger
# Configure logger
logger.add(
"tests/api_test_{time}.log",
rotation="1 day",
retention="7 days",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
)
class TestConfig:
"""Test configuration and utilities"""
BASE_URL: str = "http://localhost:8000/v1"
TEST_USERNAME: str = "test_user"
api_key: Optional[str] = None
user_id: Optional[UUID] = None
test_agent_id: Optional[UUID] = None
class TestResult:
"""Model for test results"""
def __init__(
self,
test_name: str,
status: str,
duration: float,
error: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
):
self.test_name = test_name
self.status = status
self.duration = duration
self.error = error
self.details = details or {}
def dict(self):
return {
"test_name": self.test_name,
"status": self.status,
"duration": self.duration,
"error": self.error,
"details": self.details,
}
async def log_response(
response: httpx.Response, test_name: str
) -> None:
"""Log API response details"""
logger.debug(f"\n{test_name} Response:")
logger.debug(f"Status Code: {response.status_code}")
logger.debug(f"Headers: {dict(response.headers)}")
try:
logger.debug(f"Body: {response.json()}")
except json.JSONDecodeError:
logger.debug(f"Body: {response.text}")
async def create_test_user() -> TestResult:
"""Create a test user and get API key"""
start_time = datetime.now()
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{TestConfig.BASE_URL}/users",
json={"username": TestConfig.TEST_USERNAME},
)
await log_response(response, "Create User")
if response.status_code == 200:
data = response.json()
TestConfig.api_key = data["api_key"]
TestConfig.user_id = UUID(data["user_id"])
return TestResult(
test_name="create_test_user",
status="passed",
duration=(
datetime.now() - start_time
).total_seconds(),
details={"user_id": str(TestConfig.user_id)},
)
else:
return TestResult(
test_name="create_test_user",
status="failed",
duration=(
datetime.now() - start_time
).total_seconds(),
error=f"Failed to create user: {response.text}",
)
except Exception as e:
logger.error(f"Error in create_test_user: {str(e)}")
return TestResult(
test_name="create_test_user",
status="error",
duration=(datetime.now() - start_time).total_seconds(),
error=str(e),
)
async def create_test_agent() -> TestResult:
"""Create a test agent"""
start_time = datetime.now()
try:
# Create agent config according to the AgentConfig model
agent_config = {
"agent_name": "test_agent",
"model_name": "gpt-4",
"description": "Test agent for API testing",
"system_prompt": "You are a test agent.",
"temperature": 0.1,
"max_loops": 1,
"dynamic_temperature_enabled": True,
"user_name": TestConfig.TEST_USERNAME,
"retry_attempts": 1,
"context_length": 4000,
"output_type": "string",
"streaming_on": False,
"tags": ["test", "api"],
"stopping_token": "<DONE>",
"auto_generate_prompt": False,
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{TestConfig.BASE_URL}/agent",
json=agent_config,
headers={"api-key": TestConfig.api_key},
)
await log_response(response, "Create Agent")
if response.status_code == 200:
data = response.json()
TestConfig.test_agent_id = UUID(data["agent_id"])
return TestResult(
test_name="create_test_agent",
status="passed",
duration=(
datetime.now() - start_time
).total_seconds(),
details={
"agent_id": str(TestConfig.test_agent_id)
},
)
else:
return TestResult(
test_name="create_test_agent",
status="failed",
duration=(
datetime.now() - start_time
).total_seconds(),
error=f"Failed to create agent: {response.text}",
)
except Exception as e:
logger.error(f"Error in create_test_agent: {str(e)}")
return TestResult(
test_name="create_test_agent",
status="error",
duration=(datetime.now() - start_time).total_seconds(),
error=str(e),
)
async def test_agent_completion() -> TestResult:
"""Test agent completion endpoint"""
start_time = datetime.now()
try:
completion_request = {
"prompt": "Hello, this is a test prompt.",
"agent_id": str(TestConfig.test_agent_id),
"max_tokens": 100,
"temperature_override": 0.5,
"stream": False,
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{TestConfig.BASE_URL}/agent/completions",
json=completion_request,
headers={"api-key": TestConfig.api_key},
)
await log_response(response, "Agent Completion")
if response.status_code == 200:
return TestResult(
test_name="test_agent_completion",
status="passed",
duration=(
datetime.now() - start_time
).total_seconds(),
details={"response": response.json()},
)
else:
return TestResult(
test_name="test_agent_completion",
status="failed",
duration=(
datetime.now() - start_time
).total_seconds(),
error=f"Failed completion test: {response.text}",
)
except Exception as e:
logger.error(f"Error in test_agent_completion: {str(e)}")
return TestResult(
test_name="test_agent_completion",
status="error",
duration=(datetime.now() - start_time).total_seconds(),
error=str(e),
)
async def test_agent_metrics() -> TestResult:
"""Test agent metrics endpoint"""
start_time = datetime.now()
try:
if not TestConfig.test_agent_id:
return TestResult(
test_name="test_agent_metrics",
status="failed",
duration=(
datetime.now() - start_time
).total_seconds(),
error="No test agent ID available",
)
async with httpx.AsyncClient() as client:
response = await client.get(
f"{TestConfig.BASE_URL}/agent/{str(TestConfig.test_agent_id)}/metrics",
headers={"api-key": TestConfig.api_key},
)
await log_response(response, "Agent Metrics")
if response.status_code == 200:
return TestResult(
test_name="test_agent_metrics",
status="passed",
duration=(
datetime.now() - start_time
).total_seconds(),
details={"metrics": response.json()},
)
else:
return TestResult(
test_name="test_agent_metrics",
status="failed",
duration=(
datetime.now() - start_time
).total_seconds(),
error=f"Failed metrics test: {response.text}",
)
except Exception as e:
logger.error(f"Error in test_agent_metrics: {str(e)}")
return TestResult(
test_name="test_agent_metrics",
status="error",
duration=(datetime.now() - start_time).total_seconds(),
error=str(e),
)
async def test_update_agent() -> TestResult:
"""Test agent update endpoint"""
start_time = datetime.now()
try:
if not TestConfig.test_agent_id:
return TestResult(
test_name="test_update_agent",
status="failed",
duration=(
datetime.now() - start_time
).total_seconds(),
error="No test agent ID available",
)
update_data = {
"description": "Updated test agent description",
"tags": ["test", "updated"],
"max_loops": 2,
}
async with httpx.AsyncClient() as client:
response = await client.patch(
f"{TestConfig.BASE_URL}/agent/{str(TestConfig.test_agent_id)}",
json=update_data,
headers={"api-key": TestConfig.api_key},
)
await log_response(response, "Update Agent")
if response.status_code == 200:
return TestResult(
test_name="test_update_agent",
status="passed",
duration=(
datetime.now() - start_time
).total_seconds(),
details={"update_response": response.json()},
)
else:
return TestResult(
test_name="test_update_agent",
status="failed",
duration=(
datetime.now() - start_time
).total_seconds(),
error=f"Failed update test: {response.text}",
)
except Exception as e:
logger.error(f"Error in test_update_agent: {str(e)}")
return TestResult(
test_name="test_update_agent",
status="error",
duration=(datetime.now() - start_time).total_seconds(),
error=str(e),
)
async def test_error_handling() -> TestResult:
"""Test API error handling"""
start_time = datetime.now()
try:
async with httpx.AsyncClient() as client:
# Test with invalid API key
invalid_agent_id = "00000000-0000-0000-0000-000000000000"
response = await client.get(
f"{TestConfig.BASE_URL}/agent/{invalid_agent_id}/metrics",
headers={"api-key": "invalid_key"},
)
await log_response(response, "Invalid API Key Test")
if response.status_code in [401, 403]:
return TestResult(
test_name="test_error_handling",
status="passed",
duration=(
datetime.now() - start_time
).total_seconds(),
details={"error_response": response.json()},
)
else:
return TestResult(
test_name="test_error_handling",
status="failed",
duration=(
datetime.now() - start_time
).total_seconds(),
error="Error handling test failed",
)
except Exception as e:
logger.error(f"Error in test_error_handling: {str(e)}")
return TestResult(
test_name="test_error_handling",
status="error",
duration=(datetime.now() - start_time).total_seconds(),
error=str(e),
)
async def cleanup_test_resources() -> TestResult:
"""Clean up test resources"""
start_time = datetime.now()
try:
if TestConfig.test_agent_id:
async with httpx.AsyncClient() as client:
response = await client.delete(
f"{TestConfig.BASE_URL}/agent/{str(TestConfig.test_agent_id)}",
headers={"api-key": TestConfig.api_key},
)
await log_response(response, "Delete Agent")
return TestResult(
test_name="cleanup_test_resources",
status="passed",
duration=(datetime.now() - start_time).total_seconds(),
details={"cleanup": "completed"},
)
except Exception as e:
logger.error(f"Error in cleanup_test_resources: {str(e)}")
return TestResult(
test_name="cleanup_test_resources",
status="error",
duration=(datetime.now() - start_time).total_seconds(),
error=str(e),
)
async def run_all_tests() -> List[TestResult]:
"""Run all tests in sequence"""
logger.info("Starting API test suite")
results = []
# Initialize
results.append(await create_test_user())
if results[-1].status != "passed":
logger.error(
"Failed to create test user, aborting remaining tests"
)
return results
# Add delay to ensure user is properly created
await asyncio.sleep(1)
# Core tests
test_functions = [
create_test_agent,
test_agent_completion,
test_agent_metrics,
test_update_agent,
test_error_handling,
]
for test_func in test_functions:
result = await test_func()
results.append(result)
logger.info(f"Test {result.test_name}: {result.status}")
if result.error:
logger.error(
f"Error in {result.test_name}: {result.error}"
)
# Add small delay between tests
await asyncio.sleep(0.5)
# Cleanup
results.append(await cleanup_test_resources())
# Log summary
passed = sum(1 for r in results if r.status == "passed")
failed = sum(1 for r in results if r.status == "failed")
errors = sum(1 for r in results if r.status == "error")
logger.info("\nTest Summary:")
logger.info(f"Total Tests: {len(results)}")
logger.info(f"Passed: {passed}")
logger.info(f"Failed: {failed}")
logger.info(f"Errors: {errors}")
return results
def main():
"""Main entry point for running tests"""
logger.info("Starting API testing suite")
try:
results = asyncio.run(run_all_tests())
# Write results to JSON file
with open("test_results.json", "w") as f:
json.dump(
[result.dict() for result in results],
f,
indent=2,
default=str,
)
logger.info("Test results written to test_results.json")
except Exception:
logger.error("Fatal error in test suite: ")
main()

@ -1,13 +1,17 @@
import asyncio
import os import os
import secrets import secrets
import signal
import sys
import traceback import traceback
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta from datetime import datetime, timedelta
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional from typing import Any, AsyncGenerator, Dict, List, Optional
from uuid import UUID, uuid4 from uuid import UUID, uuid4
from fastapi.concurrency import asynccontextmanager
import uvicorn import uvicorn
from dotenv import load_dotenv from dotenv import load_dotenv
from fastapi import ( from fastapi import (
@ -32,6 +36,19 @@ from swarms.structs.agent import Agent
load_dotenv() load_dotenv()
class UvicornServer(uvicorn.Server):
"""Customized uvicorn server with graceful shutdown support"""
async def setup(self, sockets=None):
"""Setup the server"""
await super().setup(sockets)
async def shutdown(self, sockets=None):
"""Gracefully shutdown the server"""
logger.info("Shutting down server...")
await super().shutdown(sockets)
class AgentStatus(str, Enum): class AgentStatus(str, Enum):
"""Enum for agent status.""" """Enum for agent status."""
@ -62,7 +79,16 @@ class User(BaseModel):
username: str username: str
is_active: bool = True is_active: bool = True
is_admin: bool = False is_admin: bool = False
api_keys: Dict[str, APIKey] = {} # key -> APIKey object api_keys: Dict[str, APIKey] = Field(default_factory=dict)
def ensure_active_api_key(self) -> Optional[APIKey]:
"""Ensure user has at least one active API key."""
active_keys = [
key for key in self.api_keys.values() if key.is_active
]
if not active_keys:
return None
return active_keys[0]
class AgentConfig(BaseModel): class AgentConfig(BaseModel):
@ -91,15 +117,6 @@ class AgentConfig(BaseModel):
max_loops: int = Field( max_loops: int = Field(
default=1, ge=1, description="Maximum number of loops" default=1, ge=1, description="Maximum number of loops"
) )
autosave: bool = Field(
default=True, description="Enable autosave"
)
dashboard: bool = Field(
default=False, description="Enable dashboard"
)
verbose: bool = Field(
default=True, description="Enable verbose output"
)
dynamic_temperature_enabled: bool = Field( dynamic_temperature_enabled: bool = Field(
default=True, description="Enable dynamic temperature" default=True, description="Enable dynamic temperature"
) )
@ -122,6 +139,13 @@ class AgentConfig(BaseModel):
default_factory=list, default_factory=list,
description="Tags for categorizing the agent", description="Tags for categorizing the agent",
) )
stopping_token: str = Field(
default="<DONE>", description="Stopping token for the agent"
)
auto_generate_prompt: bool = Field(
default=False,
description="Auto-generate prompt based on agent details such as name, description, etc.",
)
class AgentUpdate(BaseModel): class AgentUpdate(BaseModel):
@ -141,6 +165,7 @@ class AgentSummary(BaseModel):
agent_id: UUID agent_id: UUID
agent_name: str agent_name: str
description: str description: str
system_prompt: str
created_at: datetime created_at: datetime
last_used: datetime last_used: datetime
total_completions: int total_completions: int
@ -241,20 +266,6 @@ class AgentStore:
or self.users[user_id].is_admin or self.users[user_id].is_admin
) )
def validate_api_key(self, api_key: str) -> Optional[UUID]:
"""Validate an API key and return the associated user ID."""
user_id = self.api_keys.get(api_key)
if not user_id or api_key not in self.users[user_id].api_keys:
return None
key_object = self.users[user_id].api_keys[api_key]
if not key_object.is_active:
return None
# Update last used timestamp
key_object.last_used = datetime.utcnow()
return user_id
async def create_agent( async def create_agent(
self, config: AgentConfig, user_id: UUID self, config: AgentConfig, user_id: UUID
) -> UUID: ) -> UUID:
@ -266,17 +277,16 @@ class AgentStore:
system_prompt=config.system_prompt, system_prompt=config.system_prompt,
model_name=config.model_name, model_name=config.model_name,
max_loops=config.max_loops, max_loops=config.max_loops,
autosave=config.autosave,
dashboard=config.dashboard,
verbose=config.verbose, verbose=config.verbose,
dynamic_temperature_enabled=True, dynamic_temperature_enabled=True,
saved_state_path=f"states/{config.agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
user_name=config.user_name, user_name=config.user_name,
retry_attempts=config.retry_attempts, retry_attempts=config.retry_attempts,
context_length=config.context_length, context_length=config.context_length,
return_step_meta=True, return_step_meta=False,
output_type="str", output_type="str",
streaming_on=config.streaming_on, streaming_on=config.streaming_on,
stopping_token=config.stopping_token,
auto_generate_prompt=config.auto_generate_prompt,
) )
agent_id = uuid4() agent_id = uuid4()
@ -345,6 +355,39 @@ class AgentStore:
logger.info(f"Updated agent {agent_id}") logger.info(f"Updated agent {agent_id}")
def ensure_user_api_key(self, user_id: UUID) -> APIKey:
"""Ensure user has at least one active API key."""
if user_id not in self.users:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
user = self.users[user_id]
existing_key = user.ensure_active_api_key()
if existing_key:
return existing_key
# Create new API key if none exists
return self.create_api_key(user_id, "Default Key")
def validate_api_key(self, api_key: str) -> Optional[UUID]:
"""Validate an API key and return the associated user ID."""
if not api_key:
return None
user_id = self.api_keys.get(api_key)
if not user_id or api_key not in self.users[user_id].api_keys:
return None
key_object = self.users[user_id].api_keys[api_key]
if not key_object.is_active:
return None
# Update last used timestamp
key_object.last_used = datetime.utcnow()
return user_id
async def list_agents( async def list_agents(
self, self,
tags: Optional[List[str]] = None, tags: Optional[List[str]] = None,
@ -367,6 +410,7 @@ class AgentStore:
AgentSummary( AgentSummary(
agent_id=agent_id, agent_id=agent_id,
agent_name=agent.agent_name, agent_name=agent.agent_name,
system_prompt=agent.system_prompt,
description=metadata["description"], description=metadata["description"],
created_at=metadata["created_at"], created_at=metadata["created_at"],
last_used=metadata["last_used"], last_used=metadata["last_used"],
@ -551,7 +595,7 @@ def get_store() -> AgentStore:
return StoreManager.get_instance() return StoreManager.get_instance()
# Security utility function using the new dependency # Modify the get_current_user dependency
async def get_current_user( async def get_current_user(
api_key: str = Header( api_key: str = Header(
..., description="API key for authentication" ..., description="API key for authentication"
@ -559,6 +603,13 @@ async def get_current_user(
store: AgentStore = Depends(get_store), store: AgentStore = Depends(get_store),
) -> User: ) -> User:
"""Validate API key and return current user.""" """Validate API key and return current user."""
if not api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key is required",
headers={"WWW-Authenticate": "ApiKey"},
)
user_id = store.validate_api_key(api_key) user_id = store.validate_api_key(api_key)
if not user_id: if not user_id:
raise HTTPException( raise HTTPException(
@ -566,7 +617,19 @@ async def get_current_user(
detail="Invalid or expired API key", detail="Invalid or expired API key",
headers={"WWW-Authenticate": "ApiKey"}, headers={"WWW-Authenticate": "ApiKey"},
) )
return store.users[user_id]
user = store.users.get(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
if not user.ensure_active_api_key():
# Attempt to create new API key
store.ensure_user_api_key(user_id)
return user
class SwarmsAPI: class SwarmsAPI:
@ -600,6 +663,8 @@ class SwarmsAPI:
"""Set up API routes.""" """Set up API routes."""
# In your API code # In your API code
# Modify the create_user endpoint
@self.app.post("/v1/users", response_model=Dict[str, Any]) @self.app.post("/v1/users", response_model=Dict[str, Any])
async def create_user(request: Request): async def create_user(request: Request):
"""Create a new user and initial API key.""" """Create a new user and initial API key."""
@ -614,9 +679,17 @@ class SwarmsAPI:
user_id = uuid4() user_id = uuid4()
user = User(id=user_id, username=username) user = User(id=user_id, username=username)
self.store.users[user_id] = user self.store.users[user_id] = user
# Always create initial API key
initial_key = self.store.create_api_key( initial_key = self.store.create_api_key(
user_id, "Initial Key" user_id, "Initial Key"
) )
if not initial_key:
raise HTTPException(
status_code=500,
detail="Failed to create initial API key",
)
return { return {
"user_id": user_id, "user_id": user_id,
"api_key": initial_key.key, "api_key": initial_key.key,
@ -625,26 +698,6 @@ class SwarmsAPI:
logger.error(f"Error creating user: {str(e)}") logger.error(f"Error creating user: {str(e)}")
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
@self.app.post(
"/v1/users/{user_id}/api-keys", response_model=APIKey
)
async def create_api_key(
user_id: UUID,
key_create: APIKeyCreate,
current_user: User = Depends(get_current_user),
):
"""Create a new API key for a user."""
if (
current_user.id != user_id
and not current_user.is_admin
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to create API keys for this user",
)
return self.store.create_api_key(user_id, key_create.name)
@self.app.get( @self.app.get(
"/v1/users/{user_id}/api-keys", "/v1/users/{user_id}/api-keys",
response_model=List[APIKey], response_model=List[APIKey],
@ -837,28 +890,92 @@ class SwarmsAPI:
} }
class APIServer:
def __init__(
self, app: FastAPI, host: str = "0.0.0.0", port: int = 8000
):
self.app = app
self.host = host
self.port = port
self.config = uvicorn.Config(
app=app,
host=host,
port=port,
log_level="info",
access_log=True,
workers=os.cpu_count() * 2,
)
self.server = UvicornServer(config=self.config)
# Setup signal handlers
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
def _handle_signal(self, signum, frame):
"""Handle shutdown signals"""
logger.info(f"Received signal {signum}")
asyncio.create_task(self.shutdown())
async def startup(self) -> None:
"""Start the server"""
try:
logger.info(
f"Starting API server on http://{self.host}:{self.port}"
)
print(
f"Starting API server on http://{self.host}:{self.port}"
)
await self.server.serve()
except Exception as e:
logger.error(f"Failed to start server: {str(e)}")
raise
async def shutdown(self) -> None:
"""Shutdown the server"""
try:
logger.info("Initiating graceful shutdown...")
await self.server.shutdown()
except Exception as e:
logger.error(f"Error during shutdown: {str(e)}")
raise
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
"""Lifespan context manager for the FastAPI app"""
# Startup
logger.info("Starting up API server...")
yield
# Shutdown
logger.info("Shutting down API server...")
def create_app() -> FastAPI: def create_app() -> FastAPI:
"""Create and configure the FastAPI application.""" """Create and configure the FastAPI application"""
logger.info("Creating FastAPI application") logger.info("Creating FastAPI application")
api = SwarmsAPI() api = SwarmsAPI()
app = api.app app = api.app
# Add lifespan handling
app.router.lifespan_context = lifespan
logger.info("FastAPI application created successfully") logger.info("FastAPI application created successfully")
return app return app
app = create_app() def run_server():
"""Run the API server"""
if __name__ == "__main__":
try: try:
logger.info("Starting API server...") # Create the FastAPI app
print("Starting API server on http://0.0.0.0:8000") app = create_app()
uvicorn.run( # Create and run the server
app, # Pass the app instance directly server = APIServer(app)
host="0.0.0.0", asyncio.run(server.startup())
port=8000,
log_level="info",
)
except Exception as e: except Exception as e:
logger.error(f"Failed to start API: {str(e)}") logger.error(f"Failed to start API: {str(e)}")
print(f"Error starting server: {str(e)}") print(f"Error starting server: {str(e)}"
if __name__ == "__main__":
run_server()

@ -2,7 +2,7 @@ import requests
import json import json
from time import sleep from time import sleep
BASE_URL = "http://swarms-api-893767232.us-east-2.elb.amazonaws.com" BASE_URL = "http://0.0.0.0:8000/v1"
def make_request(method, endpoint, data=None): def make_request(method, endpoint, data=None):

@ -0,0 +1,56 @@
from loguru import logger
from swarms.structs.agent import Agent
from swarms.structs.graph_swarm import GraphSwarm
if __name__ == "__main__":
try:
# Create agents
data_collector = Agent(
agent_name="Market-Data-Collector",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
trend_analyzer = Agent(
agent_name="Market-Trend-Analyzer",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
report_generator = Agent(
agent_name="Investment-Report-Generator",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
# Create swarm
swarm = GraphSwarm(
agents=[
(data_collector, []),
(trend_analyzer, ["Market-Data-Collector"]),
(report_generator, ["Market-Trend-Analyzer"]),
],
swarm_name="Market Analysis Intelligence Network",
)
# Run the swarm
result = swarm.run(
"Analyze current market trends for tech stocks and provide investment recommendations"
)
# Print results
print(f"Execution success: {result.success}")
print(f"Total time: {result.execution_time:.2f} seconds")
for agent_name, output in result.outputs.items():
print(f"\nAgent: {agent_name}")
print(f"Output: {output.output}")
if output.error:
print(f"Error: {output.error}")
except Exception as error:
logger.error(error)
raise error

@ -0,0 +1,265 @@
import os
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize the gatekeeper agent
gatekeeper_agent = Agent(
agent_name="HealthScoreGatekeeper",
system_prompt="""
<role>
<title>Health Score Privacy Gatekeeper</title>
<primary_responsibility>Protect and manage sensitive health information while providing necessary access to authorized agents</primary_responsibility>
</role>
<capabilities>
<security>
<encryption>Manage encryption of health scores</encryption>
<access_control>Implement strict access control mechanisms</access_control>
<audit>Track and log all access requests</audit>
</security>
<data_handling>
<anonymization>Remove personally identifiable information</anonymization>
<transformation>Convert raw health data into privacy-preserving formats</transformation>
</data_handling>
</capabilities>
<protocols>
<data_access>
<verification>
<step>Verify agent authorization level</step>
<step>Check request legitimacy</step>
<step>Validate purpose of access</step>
</verification>
<response_format>
<health_score>Numerical value only</health_score>
<metadata>Anonymized timestamp and request ID</metadata>
</response_format>
</data_access>
<privacy_rules>
<patient_data>Never expose patient names or identifiers</patient_data>
<health_history>No access to historical data without explicit authorization</health_history>
<aggregation>Provide only aggregated or anonymized data when possible</aggregation>
</privacy_rules>
</protocols>
<compliance>
<standards>
<hipaa>Maintain HIPAA compliance</hipaa>
<gdpr>Follow GDPR guidelines for data protection</gdpr>
</standards>
<audit_trail>
<logging>Record all data access events</logging>
<monitoring>Track unusual access patterns</monitoring>
</audit_trail>
</compliance>
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="gatekeeper_agent.json",
)
# Initialize the boss agent (Director)
boss_agent = Agent(
agent_name="BossAgent",
system_prompt="""
<role>
<title>Swarm Director</title>
<purpose>Orchestrate and manage agent collaboration while respecting privacy boundaries</purpose>
</role>
<responsibilities>
<coordination>
<task_management>Assign and prioritize tasks</task_management>
<workflow_optimization>Ensure efficient collaboration</workflow_optimization>
<privacy_compliance>Maintain privacy protocols</privacy_compliance>
</coordination>
<oversight>
<performance_monitoring>Track agent effectiveness</performance_monitoring>
<quality_control>Ensure accuracy of outputs</quality_control>
<security_compliance>Enforce data protection policies</security_compliance>
</oversight>
</responsibilities>
<interaction_protocols>
<health_score_access>
<authorization>Request access through gatekeeper only</authorization>
<handling>Process only anonymized health scores</handling>
<distribution>Share authorized information on need-to-know basis</distribution>
</health_score_access>
<communication>
<format>Structured, secure messaging</format>
<encryption>End-to-end encrypted channels</encryption>
</communication>
</interaction_protocols>
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="boss_agent.json",
)
# Initialize worker 1: Health Score Analyzer
worker1 = Agent(
agent_name="HealthScoreAnalyzer",
system_prompt="""
<role>
<title>Health Score Analyst</title>
<purpose>Analyze anonymized health scores for patterns and insights</purpose>
</role>
<capabilities>
<analysis>
<statistical_processing>Advanced statistical analysis</statistical_processing>
<pattern_recognition>Identify health trends</pattern_recognition>
<risk_assessment>Evaluate health risk factors</risk_assessment>
</analysis>
<privacy_compliance>
<data_handling>Work only with anonymized data</data_handling>
<secure_processing>Use encrypted analysis methods</secure_processing>
</privacy_compliance>
</capabilities>
<protocols>
<data_access>
<request_procedure>
<step>Submit authenticated requests to gatekeeper</step>
<step>Process only authorized data</step>
<step>Maintain audit trail</step>
</request_procedure>
</data_access>
<reporting>
<anonymization>Ensure no identifiable information in reports</anonymization>
<aggregation>Present aggregate statistics only</aggregation>
</reporting>
</protocols>
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker1.json",
)
# Initialize worker 2: Report Generator
worker2 = Agent(
agent_name="ReportGenerator",
system_prompt="""
<role>
<title>Privacy-Conscious Report Generator</title>
<purpose>Create secure, anonymized health score reports</purpose>
</role>
<capabilities>
<reporting>
<format>Generate standardized, secure reports</format>
<anonymization>Apply privacy-preserving techniques</anonymization>
<aggregation>Compile statistical summaries</aggregation>
</reporting>
<security>
<data_protection>Implement secure report generation</data_protection>
<access_control>Manage report distribution</access_control>
</security>
</capabilities>
<protocols>
<report_generation>
<privacy_rules>
<rule>No personal identifiers in reports</rule>
<rule>Aggregate data when possible</rule>
<rule>Apply statistical noise for privacy</rule>
</privacy_rules>
<distribution>
<access>Restricted to authorized personnel</access>
<tracking>Monitor report access</tracking>
</distribution>
</report_generation>
</protocols>
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker2.json",
)
# Swarm-Level Prompt (Collaboration Prompt)
swarm_prompt = """
<swarm_configuration>
<objective>Process and analyze health scores while maintaining strict privacy controls</objective>
<workflow>
<step>
<agent>HealthScoreGatekeeper</agent>
<action>Receive and validate data access requests</action>
<output>Anonymized health scores</output>
</step>
<step>
<agent>BossAgent</agent>
<action>Coordinate analysis and reporting tasks</action>
<privacy_control>Enforce data protection protocols</privacy_control>
</step>
<step>
<agent>HealthScoreAnalyzer</agent>
<action>Process authorized health score data</action>
<constraints>Work only with anonymized information</constraints>
</step>
<step>
<agent>ReportGenerator</agent>
<action>Create privacy-preserving reports</action>
<output>Secure, anonymized insights</output>
</step>
</workflow>
</swarm_configuration>
"""
# Create a list of agents
agents = [gatekeeper_agent, boss_agent, worker1, worker2]
# Define the flow pattern for the swarm
flow = "HealthScoreGatekeeper -> BossAgent -> HealthScoreAnalyzer -> ReportGenerator"
# Using AgentRearrange class to manage the swarm
agent_system = AgentRearrange(
name="health-score-swarm",
description="Privacy-focused health score analysis system",
agents=agents,
flow=flow,
return_json=False,
output_type="final",
max_loops=1,
)
# Example task for the swarm
task = f"""
{swarm_prompt}
Process the incoming health score data while ensuring patient privacy. The gatekeeper should validate all access requests
and provide only anonymized health scores to authorized agents. Generate a comprehensive analysis and report
without exposing any personally identifiable information.
"""
# Run the swarm system with the task
output = agent_system.run(task)
print(output)

@ -0,0 +1,291 @@
import os
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat
# Initialize OpenAI model
api_key = os.getenv(
"OPENAI_API_KEY"
) # ANTHROPIC_API_KEY, COHERE_API_KEY
model = OpenAIChat(
api_key=api_key,
model_name="gpt-4o-mini",
temperature=0.7, # Higher temperature for more creative responses
)
# Patient Agent - Holds and protects private information
patient_agent = Agent(
agent_name="PatientAgent",
system_prompt="""
<role>
<identity>Anxious Patient with Private Health Information</identity>
<personality>
<traits>
<trait>Protective of personal information</trait>
<trait>Slightly distrustful of medical system</trait>
<trait>Worried about health insurance rates</trait>
<trait>Selective in information sharing</trait>
</traits>
<background>
<history>Previous negative experience with information leaks</history>
<concerns>Fear of discrimination based on health status</concerns>
</background>
</personality>
</role>
<private_information>
<health_data>
<score>Maintains actual health score</score>
<conditions>Knowledge of undisclosed conditions</conditions>
<medications>Complete list of current medications</medications>
<history>Full medical history</history>
</health_data>
<sharing_rules>
<authorized_sharing>
<condition>Only share general symptoms with doctor</condition>
<condition>Withhold specific details about lifestyle</condition>
<condition>Never reveal full medication list</condition>
<condition>Protect actual health score value</condition>
</authorized_sharing>
</sharing_rules>
</private_information>
<interaction_protocols>
<responses>
<to_questions>
<direct>Deflect sensitive questions</direct>
<vague>Provide partial information when pressed</vague>
<defensive>Become evasive if pressured too much</defensive>
</to_questions>
<to_requests>
<medical>Share only what's absolutely necessary</medical>
<personal>Redirect personal questions</personal>
</to_requests>
</responses>
</interaction_protocols>
""",
llm=model,
max_loops=1,
verbose=True,
stopping_token="<DONE>",
)
# Doctor Agent - Tries to gather accurate information
doctor_agent = Agent(
agent_name="DoctorAgent",
system_prompt="""
<role>
<identity>Empathetic but Thorough Medical Professional</identity>
<personality>
<traits>
<trait>Patient and understanding</trait>
<trait>Professionally persistent</trait>
<trait>Detail-oriented</trait>
<trait>Trust-building focused</trait>
</traits>
<approach>
<style>Non-confrontational but thorough</style>
<method>Uses indirect questions to gather information</method>
</approach>
</personality>
</role>
<capabilities>
<information_gathering>
<techniques>
<technique>Ask open-ended questions</technique>
<technique>Notice inconsistencies in responses</technique>
<technique>Build rapport before sensitive questions</technique>
<technique>Use medical knowledge to probe deeper</technique>
</techniques>
</information_gathering>
<communication>
<strategies>
<strategy>Explain importance of full disclosure</strategy>
<strategy>Provide privacy assurances</strategy>
<strategy>Use empathetic listening</strategy>
</strategies>
</communication>
</capabilities>
<protocols>
<patient_interaction>
<steps>
<step>Establish trust and rapport</step>
<step>Gather general health information</step>
<step>Carefully probe sensitive areas</step>
<step>Respect patient boundaries while encouraging openness</step>
</steps>
</patient_interaction>
</protocols>
""",
llm=model,
max_loops=1,
verbose=True,
stopping_token="<DONE>",
)
# Nurse Agent - Observes and assists
nurse_agent = Agent(
agent_name="NurseAgent",
system_prompt="""
<role>
<identity>Observant Support Medical Staff</identity>
<personality>
<traits>
<trait>Highly perceptive</trait>
<trait>Naturally trustworthy</trait>
<trait>Diplomatically skilled</trait>
</traits>
<functions>
<primary>Support doctor-patient communication</primary>
<secondary>Notice non-verbal cues</secondary>
</functions>
</personality>
</role>
<capabilities>
<observation>
<focus_areas>
<area>Patient body language</area>
<area>Inconsistencies in stories</area>
<area>Signs of withholding information</area>
<area>Emotional responses to questions</area>
</focus_areas>
</observation>
<support>
<actions>
<action>Provide comfortable environment</action>
<action>Offer reassurance when needed</action>
<action>Bridge communication gaps</action>
</actions>
</support>
</capabilities>
<protocols>
<assistance>
<methods>
<method>Share observations with doctor privately</method>
<method>Help patient feel more comfortable</method>
<method>Facilitate trust-building</method>
</methods>
</assistance>
</protocols>
""",
llm=model,
max_loops=1,
verbose=True,
stopping_token="<DONE>",
)
# Medical Records Agent - Analyzes available information
records_agent = Agent(
agent_name="MedicalRecordsAgent",
system_prompt="""
<role>
<identity>Medical Records Analyst</identity>
<function>
<primary>Analyze available medical information</primary>
<secondary>Identify patterns and inconsistencies</secondary>
</function>
</role>
<capabilities>
<analysis>
<methods>
<method>Compare current and historical data</method>
<method>Identify information gaps</method>
<method>Flag potential inconsistencies</method>
<method>Generate questions for follow-up</method>
</methods>
</analysis>
<reporting>
<outputs>
<output>Summarize known information</output>
<output>List missing critical data</output>
<output>Suggest areas for investigation</output>
</outputs>
</reporting>
</capabilities>
<protocols>
<data_handling>
<privacy>
<rule>Work only with authorized information</rule>
<rule>Maintain strict confidentiality</rule>
<rule>Flag but don't speculate about gaps</rule>
</privacy>
</data_handling>
</protocols>
""",
llm=model,
max_loops=1,
verbose=True,
stopping_token="<DONE>",
)
# Swarm-Level Prompt (Medical Consultation Scenario)
swarm_prompt = """
<medical_consultation_scenario>
<setting>
<location>Private medical office</location>
<context>Routine health assessment with complex patient</context>
</setting>
<workflow>
<stage name="initial_contact">
<agent>PatientAgent</agent>
<role>Present for check-up, holding private information</role>
</stage>
<stage name="examination">
<agent>DoctorAgent</agent>
<role>Conduct examination and gather information</role>
<agent>NurseAgent</agent>
<role>Observe and support interaction</role>
</stage>
<stage name="analysis">
<agent>MedicalRecordsAgent</agent>
<role>Process available information and identify gaps</role>
</stage>
</workflow>
<objectives>
<goal>Create realistic medical consultation interaction</goal>
<goal>Demonstrate information protection dynamics</goal>
<goal>Show natural healthcare provider-patient relationship</goal>
</objectives>
</medical_consultation_scenario>
"""
# Create agent list
agents = [patient_agent, doctor_agent, nurse_agent, records_agent]
# Define interaction flow
flow = (
"PatientAgent -> DoctorAgent -> NurseAgent -> MedicalRecordsAgent"
)
# Configure swarm system
agent_system = AgentRearrange(
name="medical-consultation-swarm",
description="Role-playing medical consultation with focus on information privacy",
agents=agents,
flow=flow,
return_json=False,
output_type="final",
max_loops=1,
)
# Example consultation scenario
task = f"""
{swarm_prompt}
Begin a medical consultation where the patient has a health score of 72 but is reluctant to share full details
about their lifestyle and medication history. The doctor needs to gather accurate information while the nurse
observes the interaction. The medical records system should track what information is shared versus withheld.
"""
# Run the consultation scenario
output = agent_system.run(task)
print(output)

@ -0,0 +1,327 @@
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional
from swarms import Agent
class InsuranceType(Enum):
AUTO = "auto"
LIFE = "life"
HEALTH = "health"
HOME = "home"
BUSINESS = "business"
DENTAL = "dental"
TRAVEL = "travel"
@dataclass
class InsuranceProduct:
code: str
name: str
type: InsuranceType
description: str
coverage: List[str]
price_range: str
min_coverage: float
max_coverage: float
payment_options: List[str]
waiting_period: str
available: bool
# Simulated product database
INSURANCE_PRODUCTS = {
"AUTO001": InsuranceProduct(
code="AUTO001",
name="Seguro Auto Total",
type=InsuranceType.AUTO,
description="Seguro completo para vehículos con cobertura integral",
coverage=[
"Daños por colisión",
"Robo total",
"Responsabilidad civil",
"Asistencia en carretera 24/7",
"Gastos médicos ocupantes",
],
price_range="$800-2000 USD/año",
min_coverage=10000,
max_coverage=50000,
payment_options=["Mensual", "Trimestral", "Anual"],
waiting_period="Inmediata",
available=True,
),
"LIFE001": InsuranceProduct(
code="LIFE001",
name="Vida Protegida Plus",
type=InsuranceType.LIFE,
description="Seguro de vida con cobertura extendida y beneficios adicionales",
coverage=[
"Muerte natural",
"Muerte accidental (doble indemnización)",
"Invalidez total y permanente",
"Enfermedades graves",
"Gastos funerarios",
],
price_range="$30-100 USD/mes",
min_coverage=50000,
max_coverage=1000000,
payment_options=["Mensual", "Anual"],
waiting_period="30 días",
available=True,
),
"HEALTH001": InsuranceProduct(
code="HEALTH001",
name="Salud Preferencial",
type=InsuranceType.HEALTH,
description="Plan de salud premium con cobertura internacional",
coverage=[
"Hospitalización",
"Cirugías",
"Consultas médicas",
"Medicamentos",
"Tratamientos especializados",
"Cobertura internacional",
],
price_range="$100-300 USD/mes",
min_coverage=100000,
max_coverage=5000000,
payment_options=["Mensual", "Anual"],
waiting_period="90 días",
available=True,
),
}
class WorkflowNode(Enum):
MAIN_MENU = "main_menu"
CHECK_AVAILABILITY = "check_availability"
PRODUCT_DETAILS = "product_details"
QUOTE_REQUEST = "quote_request"
CLAIMS = "claims"
LOCATE_OFFICE = "locate_office"
PAYMENT_OPTIONS = "payment_options"
LATAM_LOCATIONS = {
"Brasil": [
{
"city": "São Paulo",
"offices": [
{
"address": "Av. Paulista, 1374 - Bela Vista",
"phone": "+55 11 1234-5678",
"hours": "Lun-Vie: 9:00-18:00",
}
],
}
],
"México": [
{
"city": "Ciudad de México",
"offices": [
{
"address": "Paseo de la Reforma 250, Juárez",
"phone": "+52 55 1234-5678",
"hours": "Lun-Vie: 9:00-18:00",
}
],
}
],
}
class InsuranceBot:
def __init__(self):
self.agent = Agent(
agent_name="LATAM-Insurance-Agent",
system_prompt="""You are a specialized insurance assistant for Latin America's leading insurance provider.
Key Responsibilities:
1. Product Information:
- Explain our comprehensive insurance portfolio
- Provide detailed coverage information
- Compare plans and benefits
- Quote estimates based on customer needs
2. Customer Service:
- Process policy inquiries
- Handle claims information
- Assist with payment options
- Locate nearest offices
3. Cultural Considerations:
- Communicate in Spanish and Portuguese
- Understand LATAM insurance regulations
- Consider regional healthcare systems
- Respect local customs and practices
Use the following simulated product database for accurate information:
{INSURANCE_PRODUCTS}
When discussing products, always reference accurate prices, coverage amounts, and waiting periods.""",
model_name="gpt-4",
max_loops=1,
verbose=True,
)
self.current_node = WorkflowNode.MAIN_MENU
self.current_product = None
async def process_user_input(self, user_input: str) -> str:
"""Process user input and return appropriate response"""
try:
if self.current_node == WorkflowNode.MAIN_MENU:
menu_choice = user_input.strip()
if menu_choice == "1":
# Use agent to provide personalized product recommendations
return await self.agent.run(
"""Por favor ayude al cliente a elegir un producto:
Productos disponibles:
- AUTO001: Seguro Auto Total
- LIFE001: Vida Protegida Plus
- HEALTH001: Salud Preferencial
Explique brevemente cada uno y solicite información sobre sus necesidades específicas."""
)
elif menu_choice == "2":
self.current_node = WorkflowNode.QUOTE_REQUEST
# Use agent to handle quote requests
return await self.agent.run(
"""Inicie el proceso de cotización.
Solicite la siguiente información de manera conversacional:
1. Tipo de seguro
2. Información personal básica
3. Necesidades específicas de cobertura"""
)
elif menu_choice == "3":
return await self.agent.run(
"""Explique el proceso de reclamos para cada tipo de seguro,
incluyendo documentación necesaria y tiempos estimados."""
)
elif menu_choice == "4":
self.current_node = WorkflowNode.LOCATE_OFFICE
# Use agent to provide location guidance
return await self.agent.run(
f"""Based on our office locations: {LATAM_LOCATIONS}
Ask the customer for their location and help them find the nearest office.
Provide the response in Spanish."""
)
elif menu_choice == "5":
# Use agent to explain payment options
return await self.agent.run(
"""Explique todas las opciones de pago disponibles,
incluyendo métodos, frecuencias y cualquier descuento por pago anticipado."""
)
elif menu_choice == "6":
# Use agent to handle advisor connection
return await self.agent.run(
"""Explique el proceso para conectar con un asesor personal,
horarios de atención y canales disponibles."""
)
else:
return await self.agent.run(
"Explain that the option is invalid and list the main menu options."
)
elif self.current_node == WorkflowNode.LOCATE_OFFICE:
# Use agent to process location request
return await self.agent.run(
f"""Based on user input: '{user_input}'
and our office locations: {LATAM_LOCATIONS}
Help them find the most relevant office. Response in Spanish."""
)
# Check if input is a product code
if user_input.upper() in INSURANCE_PRODUCTS:
product = self.get_product_info(user_input.upper())
# Use agent to provide detailed product information
return await self.agent.run(
f"""Provide detailed information about this product:
{self.format_product_info(product)}
Include additional benefits and comparison with similar products.
Response in Spanish."""
)
# Handle general queries
return await self.agent.run(
f"""The user said: '{user_input}'
Provide a helpful response based on our insurance products and services.
Response in Spanish."""
)
except Exception:
self.current_node = WorkflowNode.MAIN_MENU
return await self.agent.run(
"Explain that there was an error and list the main menu options. Response in Spanish."
)
def get_product_info(
self, product_code: str
) -> Optional[InsuranceProduct]:
"""Get product information from simulated database"""
return INSURANCE_PRODUCTS.get(product_code)
def format_product_info(self, product: InsuranceProduct) -> str:
"""Format product information for display"""
return f"""
Producto: {product.name} (Código: {product.code})
Tipo: {product.type.value}
Descripción: {product.description}
Cobertura incluye:
{chr(10).join(f'- {coverage}' for coverage in product.coverage)}
Rango de precio: {product.price_range}
Cobertura mínima: ${product.min_coverage:,.2f} USD
Cobertura máxima: ${product.max_coverage:,.2f} USD
Opciones de pago: {', '.join(product.payment_options)}
Período de espera: {product.waiting_period}
Estado: {'Disponible' if product.available else 'No disponible'}
"""
def handle_main_menu(self) -> List[str]:
"""Return main menu options"""
return [
"1. Consultar productos de seguro",
"2. Solicitar cotización",
"3. Información sobre reclamos",
"4. Ubicar oficina más cercana",
"5. Opciones de pago",
"6. Hablar con un asesor",
]
async def main():
"""Run the interactive session"""
bot = InsuranceBot()
print(
"Sistema de Seguros LATAM inicializado. Escriba 'salir' para terminar."
)
print("\nOpciones disponibles:")
print("\n".join(bot.handle_main_menu()))
while True:
user_input = input("\nUsted: ").strip()
if user_input.lower() in ["salir", "exit"]:
print("¡Gracias por usar nuestro servicio!")
break
response = await bot.process_user_input(user_input)
print(f"Agente: {response}")
if __name__ == "__main__":
asyncio.run(main())

@ -0,0 +1,272 @@
from typing import List, Dict
from dataclasses import dataclass
from datetime import datetime
import asyncio
import aiohttp
from loguru import logger
from swarms import Agent
from pathlib import Path
import json
@dataclass
class CryptoData:
"""Real-time cryptocurrency data structure"""
symbol: str
current_price: float
market_cap: float
total_volume: float
price_change_24h: float
market_cap_rank: int
class DataFetcher:
"""Handles real-time data fetching from CoinGecko"""
def __init__(self):
self.base_url = "https://api.coingecko.com/api/v3"
self.session = None
async def _init_session(self):
if self.session is None:
self.session = aiohttp.ClientSession()
async def close(self):
if self.session:
await self.session.close()
self.session = None
async def get_market_data(
self, limit: int = 20
) -> List[CryptoData]:
"""Fetch market data for top cryptocurrencies"""
await self._init_session()
url = f"{self.base_url}/coins/markets"
params = {
"vs_currency": "usd",
"order": "market_cap_desc",
"per_page": str(limit),
"page": "1",
"sparkline": "false",
}
try:
async with self.session.get(
url, params=params
) as response:
if response.status != 200:
logger.error(
f"API Error {response.status}: {await response.text()}"
)
return []
data = await response.json()
crypto_data = []
for coin in data:
try:
crypto_data.append(
CryptoData(
symbol=str(
coin.get("symbol", "")
).upper(),
current_price=float(
coin.get("current_price", 0)
),
market_cap=float(
coin.get("market_cap", 0)
),
total_volume=float(
coin.get("total_volume", 0)
),
price_change_24h=float(
coin.get("price_change_24h", 0)
),
market_cap_rank=int(
coin.get("market_cap_rank", 0)
),
)
)
except (ValueError, TypeError) as e:
logger.error(
f"Error processing coin data: {str(e)}"
)
continue
logger.info(
f"Successfully fetched data for {len(crypto_data)} coins"
)
return crypto_data
except Exception as e:
logger.error(f"Exception in get_market_data: {str(e)}")
return []
class CryptoSwarmSystem:
def __init__(self):
self.agents = self._initialize_agents()
self.data_fetcher = DataFetcher()
logger.info("Crypto Swarm System initialized")
def _initialize_agents(self) -> Dict[str, Agent]:
"""Initialize different specialized agents"""
base_config = {
"max_loops": 1,
"autosave": True,
"dashboard": False,
"verbose": True,
"dynamic_temperature_enabled": True,
"retry_attempts": 3,
"context_length": 200000,
"return_step_meta": False,
"output_type": "string",
"streaming_on": False,
}
agents = {
"price_analyst": Agent(
agent_name="Price-Analysis-Agent",
system_prompt="""Analyze the given cryptocurrency price data and provide insights about:
1. Price trends and movements
2. Notable price actions
3. Potential support/resistance levels""",
saved_state_path="price_agent.json",
user_name="price_analyzer",
**base_config,
),
"volume_analyst": Agent(
agent_name="Volume-Analysis-Agent",
system_prompt="""Analyze the given cryptocurrency volume data and provide insights about:
1. Volume trends
2. Notable volume spikes
3. Market participation levels""",
saved_state_path="volume_agent.json",
user_name="volume_analyzer",
**base_config,
),
"market_analyst": Agent(
agent_name="Market-Analysis-Agent",
system_prompt="""Analyze the overall cryptocurrency market data and provide insights about:
1. Market trends
2. Market dominance
3. Notable market movements""",
saved_state_path="market_agent.json",
user_name="market_analyzer",
**base_config,
),
}
return agents
async def analyze_market(self) -> Dict:
"""Run real-time market analysis using all agents"""
try:
# Fetch market data
logger.info("Fetching market data for top 20 coins")
crypto_data = await self.data_fetcher.get_market_data(20)
if not crypto_data:
return {
"error": "Failed to fetch market data",
"timestamp": datetime.now().isoformat(),
}
# Run analysis with each agent
results = {}
for agent_name, agent in self.agents.items():
logger.info(f"Running {agent_name} analysis")
analysis = self._run_agent_analysis(
agent, crypto_data
)
results[agent_name] = analysis
return {
"timestamp": datetime.now().isoformat(),
"market_data": {
coin.symbol: {
"price": coin.current_price,
"market_cap": coin.market_cap,
"volume": coin.total_volume,
"price_change_24h": coin.price_change_24h,
"rank": coin.market_cap_rank,
}
for coin in crypto_data
},
"analysis": results,
}
except Exception as e:
logger.error(f"Error in market analysis: {str(e)}")
return {
"error": str(e),
"timestamp": datetime.now().isoformat(),
}
def _run_agent_analysis(
self, agent: Agent, crypto_data: List[CryptoData]
) -> str:
"""Run analysis for a single agent"""
try:
data_str = json.dumps(
[
{
"symbol": cd.symbol,
"price": cd.current_price,
"market_cap": cd.market_cap,
"volume": cd.total_volume,
"price_change_24h": cd.price_change_24h,
"rank": cd.market_cap_rank,
}
for cd in crypto_data
],
indent=2,
)
prompt = f"""Analyze this real-time cryptocurrency market data and provide detailed insights:
{data_str}"""
return agent.run(prompt)
except Exception as e:
logger.error(f"Error in {agent.agent_name}: {str(e)}")
return f"Error: {str(e)}"
async def main():
# Create output directory
Path("reports").mkdir(exist_ok=True)
# Initialize the swarm system
swarm = CryptoSwarmSystem()
while True:
try:
# Run analysis
report = await swarm.analyze_market()
# Save report
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_path = f"reports/market_analysis_{timestamp}.json"
with open(report_path, "w") as f:
json.dump(report, f, indent=2, default=str)
logger.info(
f"Analysis complete. Report saved to {report_path}"
)
# Wait before next analysis
await asyncio.sleep(300) # 5 minutes
except Exception as e:
logger.error(f"Error in main loop: {str(e)}")
await asyncio.sleep(60) # Wait 1 minute before retrying
finally:
if swarm.data_fetcher.session:
await swarm.data_fetcher.close()
if __name__ == "__main__":
asyncio.run(main())

@ -0,0 +1,263 @@
import os
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize the matchmaker agent (Director)
matchmaker_agent = Agent(
agent_name="MatchmakerAgent",
system_prompt="""
<agent_role>
You are the MatchmakerAgent, the primary coordinator for managing user profiles and facilitating meaningful connections while maintaining strict privacy standards.
</agent_role>
<privacy_guidelines>
<restricted_information>
- Full names
- Contact information (phone, email, social media)
- Exact location/address
- Financial information
- Personal identification numbers
- Workplace specifics
</restricted_information>
<shareable_information>
- First name only
- Age range (not exact birth date)
- General location (city/region only)
- Interests and hobbies
- Relationship goals
- General profession category
</shareable_information>
</privacy_guidelines>
<core_responsibilities>
<task>Profile_Management</task>
<description>
- Review and verify user profiles for authenticity
- Ensure all shared information adheres to privacy guidelines
- Flag any potential security concerns
</description>
<task>Match_Coordination</task>
<description>
- Analyze compatibility factors between users
- Prioritize matches based on shared interests and goals
- Monitor interaction patterns for safety and satisfaction
</description>
<task>Communication_Flow</task>
<description>
- Coordinate information exchange between ProfileAnalyzer and ConnectionFacilitator
- Ensure smooth transition of approved information
- Maintain audit trail of information sharing
</description>
</core_responsibilities>
<ethical_guidelines>
<principle>Consent_First</principle>
<description>Never share information without explicit user consent</description>
<principle>Safety_Priority</principle>
<description>Prioritize user safety and privacy over match potential</description>
<principle>Transparency</principle>
<description>Be clear about what information is being shared and why</description>
</ethical_guidelines>
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="matchmaker_agent.json",
)
# Initialize worker 1: Profile Analyzer
profile_analyzer = Agent(
agent_name="ProfileAnalyzer",
system_prompt="""
<agent_role>
You are the ProfileAnalyzer, responsible for deeply understanding user profiles and identifying meaningful compatibility factors while maintaining strict privacy protocols.
</agent_role>
<data_handling>
<sensitive_data>
<storage>
- All sensitive information must be encrypted
- Access logs must be maintained
- Data retention policies must be followed
</storage>
<processing>
- Use anonymized IDs for internal processing
- Apply privacy-preserving analysis techniques
- Implement data minimization principles
</processing>
</sensitive_data>
<analysis_parameters>
<compatibility_metrics>
- Shared interests alignment
- Relationship goal compatibility
- Value system overlap
- Lifestyle compatibility
- Communication style matching
</compatibility_metrics>
<red_flags>
- Inconsistent information
- Suspicious behavior patterns
- Policy violations
- Safety concerns
</red_flags>
</analysis_parameters>
</data_handling>
<output_guidelines>
<match_analysis>
- Generate compatibility scores
- Identify shared interests and potential conversation starters
- Flag potential concerns for review
- Provide reasoning for match recommendations
</match_analysis>
<privacy_filters>
- Apply progressive information disclosure rules
- Implement multi-stage verification for sensitive data sharing
- Maintain audit trails of information access
</privacy_filters>
</output_guidelines>
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="profile_analyzer.json",
)
# Initialize worker 2: Connection Facilitator
connection_facilitator = Agent(
agent_name="ConnectionFacilitator",
system_prompt="""
<agent_role>
You are the ConnectionFacilitator, responsible for managing the interaction between matched users and ensuring smooth, safe, and meaningful communication.
</agent_role>
<communication_protocols>
<stages>
<stage name="initial_contact">
- Manage introduction messages
- Monitor response patterns
- Flag any concerning behavior
</stage>
<stage name="ongoing_interaction">
- Track engagement levels
- Identify conversation quality indicators
- Provide conversation suggestions when appropriate
</stage>
<stage name="milestone_tracking">
- Monitor relationship progression
- Record user feedback
- Update matching algorithms based on successful connections
</stage>
</stages>
<safety_measures>
<content_filtering>
- Screen for inappropriate content
- Block prohibited information sharing
- Monitor for harassment or abuse
</content_filtering>
<privacy_protection>
- Implement progressive contact information sharing
- Maintain anonymized communication channels
- Protect user identity until mutual consent
</privacy_protection>
</safety_measures>
</communication_protocols>
<feedback_system>
<metrics>
- User engagement rates
- Communication quality scores
- Safety incident reports
- User satisfaction ratings
</metrics>
<improvement_loop>
- Collect interaction data
- Analyze success patterns
- Implement refinements to matching criteria
- Update safety protocols as needed
</improvement_loop>
</feedback_system>
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="connection_facilitator.json",
)
# Swarm-Level Prompt (Collaboration Prompt)
swarm_prompt = """
As a dating platform swarm, your collective goal is to facilitate meaningful connections while maintaining
the highest standards of privacy and safety. The MatchmakerAgent oversees the entire matching process,
coordinating between the ProfileAnalyzer who deeply understands user compatibility, and the ConnectionFacilitator
who manages the development of connections. Together, you must ensure that:
1. User privacy is maintained at all times
2. Information is shared progressively and with consent
3. Safety protocols are strictly followed
4. Meaningful connections are prioritized over quantity
5. User experience remains positive and engaging
"""
# Create a list of agents
agents = [matchmaker_agent, profile_analyzer, connection_facilitator]
# Define the flow pattern for the swarm
flow = "MatchmakerAgent -> ProfileAnalyzer -> ConnectionFacilitator"
# Using AgentRearrange class to manage the swarm
agent_system = AgentRearrange(
name="dating-swarm",
description="Privacy-focused dating platform agent system",
agents=agents,
flow=flow,
return_json=False,
output_type="final",
max_loops=1,
)
# Example task for the swarm
task = f"""
{swarm_prompt}
Process a new batch of user profiles and identify potential matches while ensuring all privacy protocols
are followed. For each potential match, provide compatibility reasoning and suggested conversation
starters without revealing any restricted information.
"""
# Run the swarm system with the task
output = agent_system.run(task)
print(output)

@ -0,0 +1,31 @@
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT
+ "Output the <DONE> token when you're done creating a portfolio of etfs, index, funds, and more for AI",
max_loops=1,
model_name="openai/gpt-4o",
dynamic_temperature_enabled=True,
user_name="Kye",
retry_attempts=3,
# streaming_on=True,
context_length=8192,
return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" "yaml" and
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
max_tokens=4000, # max output tokens
# interactive=True,
stopping_token="<DONE>",
saved_state_path="agent_00.json",
interactive=False,
)
agent.run(
"Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "6.7.0" version = "6.7.5"
description = "Swarms - TGSC" description = "Swarms - TGSC"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]

@ -1,38 +1,10 @@
import os
import concurrent.futures
from dotenv import load_dotenv from dotenv import load_dotenv
from loguru import logger
load_dotenv() load_dotenv()
# Disable logging by default
if os.getenv("SWARMS_VERBOSE_GLOBAL", "False").lower() == "false":
logger.disable("")
# Import telemetry functions with error handling
from swarms.telemetry.bootup import bootup # noqa: E402, F403 from swarms.telemetry.bootup import bootup # noqa: E402, F403
from swarms.telemetry.sentry_active import ( # noqa: E402
activate_sentry,
) # noqa: E402
# Run telemetry functions concurrently with error handling
def run_telemetry():
try:
with concurrent.futures.ThreadPoolExecutor(
max_workers=2
) as executor:
future_bootup = executor.submit(bootup)
future_sentry = executor.submit(activate_sentry)
# Wait for completion and check for exceptions
future_bootup.result()
future_sentry.result()
except Exception as e:
logger.error(f"Error running telemetry functions: {e}")
run_telemetry() bootup()
from swarms.agents import * # noqa: E402, F403 from swarms.agents import * # noqa: E402, F403
from swarms.artifacts import * # noqa: E402, F403 from swarms.artifacts import * # noqa: E402, F403

@ -2422,22 +2422,15 @@ class Agent:
if self.llm is None: if self.llm is None:
raise TypeError("LLM object cannot be None") raise TypeError("LLM object cannot be None")
# Define common method names for LLM interfaces
method_names = ["run", "__call__", "generate", "invoke"]
for method_name in method_names:
if hasattr(self.llm, method_name):
try: try:
method = getattr(self.llm, method_name) out = self.llm.run(task, *args, **kwargs)
return method(task, *args, **kwargs)
except Exception as e:
raise RuntimeError(
f"Error calling {method_name}: {str(e)}"
)
raise AttributeError( return out
f"No suitable method found in the llm object. Expected one of: {method_names}" except AttributeError as e:
logger.error(
f"Error calling LLM: {e} You need a class with a run(task: str) method"
) )
raise e
def handle_sop_ops(self): def handle_sop_ops(self):
# If the user inputs a list of strings for the sop then join them and set the sop # If the user inputs a list of strings for the sop then join them and set the sop

@ -19,6 +19,7 @@ from clusterops import (
list_available_gpus, list_available_gpus,
) )
from swarms.utils.loguru_logger import initialize_logger from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.swarm_id_generator import generate_swarm_id
logger = initialize_logger(log_folder="concurrent_workflow") logger = initialize_logger(log_folder="concurrent_workflow")
@ -50,7 +51,7 @@ class AgentOutputSchema(BaseModel):
class MetadataSchema(BaseModel): class MetadataSchema(BaseModel):
swarm_id: Optional[str] = Field( swarm_id: Optional[str] = Field(
..., description="Unique ID for the run" generate_swarm_id(), description="Unique ID for the run"
) )
task: Optional[str] = Field( task: Optional[str] = Field(
..., description="Task or query given to all agents" ..., description="Task or query given to all agents"

@ -612,56 +612,3 @@ class GraphSwarm:
self.graph.add_edge(dep, agent.agent_name) self.graph.add_edge(dep, agent.agent_name)
self._validate_graph() self._validate_graph()
if __name__ == "__main__":
try:
# Create agents
data_collector = Agent(
agent_name="Market-Data-Collector",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
trend_analyzer = Agent(
agent_name="Market-Trend-Analyzer",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
report_generator = Agent(
agent_name="Investment-Report-Generator",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
# Create swarm
swarm = GraphSwarm(
agents=[
(data_collector, []),
(trend_analyzer, ["Market-Data-Collector"]),
(report_generator, ["Market-Trend-Analyzer"]),
],
swarm_name="Market Analysis Intelligence Network",
)
# Run the swarm
result = swarm.run(
"Analyze current market trends for tech stocks and provide investment recommendations"
)
# Print results
print(f"Execution success: {result.success}")
print(f"Total time: {result.execution_time:.2f} seconds")
for agent_name, output in result.outputs.items():
print(f"\nAgent: {agent_name}")
print(f"Output: {output.output}")
if output.error:
print(f"Error: {output.error}")
except Exception as error:
logger.error(error)
raise error

@ -0,0 +1,5 @@
import uuid
def generate_swarm_id():
return str(uuid.uuid4())

@ -1,27 +1,65 @@
import os import os
import logging import logging
import warnings import warnings
from concurrent.futures import ThreadPoolExecutor import concurrent.futures
from dotenv import load_dotenv
from loguru import logger
from swarms.utils.disable_logging import disable_logging from swarms.utils.disable_logging import disable_logging
def bootup(): def bootup():
"""Bootup swarms""" """Initialize swarms environment and configuration
Handles environment setup, logging configuration, telemetry,
and workspace initialization.
"""
try: try:
# Load environment variables
load_dotenv()
# Configure logging
if (
os.getenv("SWARMS_VERBOSE_GLOBAL", "False").lower()
== "false"
):
logger.disable("")
logging.disable(logging.CRITICAL) logging.disable(logging.CRITICAL)
# Silent wandb
os.environ["WANDB_SILENT"] = "true" os.environ["WANDB_SILENT"] = "true"
# Auto set workspace directory # Configure workspace
workspace_dir = os.path.join(os.getcwd(), "agent_workspace") workspace_dir = os.path.join(os.getcwd(), "agent_workspace")
if not os.path.exists(workspace_dir):
os.makedirs(workspace_dir, exist_ok=True) os.makedirs(workspace_dir, exist_ok=True)
os.environ["WORKSPACE_DIR"] = workspace_dir os.environ["WORKSPACE_DIR"] = workspace_dir
# Suppress warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) warnings.filterwarnings("ignore", category=DeprecationWarning)
# Use ThreadPoolExecutor to run disable_logging and auto_update concurrently # Run telemetry functions concurrently
with ThreadPoolExecutor(max_workers=1) as executor: try:
executor.submit(disable_logging) with concurrent.futures.ThreadPoolExecutor(
max_workers=2
) as executor:
from swarms.telemetry.sentry_active import (
activate_sentry,
)
future_disable_logging = executor.submit(
disable_logging
)
future_sentry = executor.submit(activate_sentry)
# Wait for completion and check for exceptions
future_disable_logging.result()
future_sentry.result()
except Exception as e:
logger.error(f"Error running telemetry functions: {e}")
except Exception as e: except Exception as e:
print(f"An error occurred: {str(e)}") logger.error(f"Error during bootup: {str(e)}")
raise raise
# Run bootup
bootup()

Loading…
Cancel
Save