parent
f76237596e
commit
1df09b2067
File diff suppressed because it is too large
Load Diff
@ -1,320 +1,291 @@
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, Optional, Any
|
||||
from dataclasses import dataclass
|
||||
import requests
|
||||
from loguru import logger
|
||||
import time
|
||||
from typing import Dict, Optional, Tuple
|
||||
from uuid import UUID
|
||||
|
||||
BASE_URL = "http://0.0.0.0:8000/v1"
|
||||
|
||||
|
||||
def check_api_server() -> bool:
|
||||
"""Check if the API server is running and accessible."""
|
||||
try:
|
||||
response = requests.get(f"{BASE_URL}/docs")
|
||||
return response.status_code == 200
|
||||
except requests.exceptions.ConnectionError:
|
||||
logger.error("API server is not running at {BASE_URL}")
|
||||
logger.error("Please start the API server first with:")
|
||||
logger.error(" python main.py")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking API server: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
class TestSession:
|
||||
"""Manages test session state and authentication."""
|
||||
|
||||
def __init__(self):
|
||||
self.user_id: Optional[UUID] = None
|
||||
self.api_key: Optional[str] = None
|
||||
self.test_agents: list[UUID] = []
|
||||
|
||||
@property
|
||||
def headers(self) -> Dict[str, str]:
|
||||
"""Get headers with authentication."""
|
||||
return {"api-key": self.api_key} if self.api_key else {}
|
||||
|
||||
|
||||
def create_test_user(session: TestSession) -> Tuple[bool, str]:
|
||||
"""Create a test user and store credentials in session."""
|
||||
logger.info("Creating test user")
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{BASE_URL}/users",
|
||||
json={"username": f"test_user_{int(time.time())}"},
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
session.user_id = data["user_id"]
|
||||
session.api_key = data["api_key"]
|
||||
logger.success(f"Created user with ID: {session.user_id}")
|
||||
return True, "Success"
|
||||
else:
|
||||
logger.error(f"Failed to create user: {response.text}")
|
||||
return False, response.text
|
||||
except Exception as e:
|
||||
logger.exception("Exception during user creation")
|
||||
return False, str(e)
|
||||
|
||||
|
||||
def create_additional_api_key(
|
||||
session: TestSession,
|
||||
) -> Tuple[bool, str]:
|
||||
"""Test creating an additional API key."""
|
||||
logger.info("Creating additional API key")
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{BASE_URL}/users/{session.user_id}/api-keys",
|
||||
headers=session.headers,
|
||||
json={"name": "Test Key"},
|
||||
# Set up logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||||
handlers=[
|
||||
logging.FileHandler("api_tests.log"),
|
||||
logging.StreamHandler(),
|
||||
],
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Configuration
|
||||
@dataclass
|
||||
class TestConfig:
|
||||
"""Test configuration settings"""
|
||||
|
||||
base_url: str
|
||||
timeout: int = 30
|
||||
verify_ssl: bool = True
|
||||
debug: bool = True
|
||||
|
||||
|
||||
# Load config from environment or use defaults
|
||||
config = TestConfig(
|
||||
base_url=os.getenv("API_BASE_URL", "http://0.0.0.0:8000/v1")
|
||||
)
|
||||
|
||||
|
||||
class APIClient:
|
||||
"""API Client for testing"""
|
||||
|
||||
def __init__(self, config: TestConfig):
|
||||
self.config = config
|
||||
self.session = requests.Session()
|
||||
|
||||
def _url(self, path: str) -> str:
|
||||
"""Construct full URL"""
|
||||
return f"{self.config.base_url}/{path.lstrip('/')}"
|
||||
|
||||
def _log_request_details(
|
||||
self, method: str, url: str, headers: Dict, data: Any
|
||||
):
|
||||
"""Log request details for debugging"""
|
||||
logger.info("\nRequest Details:")
|
||||
logger.info(f"Method: {method}")
|
||||
logger.info(f"URL: {url}")
|
||||
logger.info(f"Headers: {json.dumps(headers, indent=2)}")
|
||||
logger.info(
|
||||
f"Data: {json.dumps(data, indent=2) if data else None}"
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.success("Created additional API key")
|
||||
return True, response.json()["key"]
|
||||
else:
|
||||
logger.error(f"Failed to create API key: {response.text}")
|
||||
return False, response.text
|
||||
except Exception as e:
|
||||
logger.exception("Exception during API key creation")
|
||||
return False, str(e)
|
||||
|
||||
|
||||
def test_create_agent(
|
||||
session: TestSession,
|
||||
) -> Tuple[bool, Optional[UUID]]:
|
||||
"""Test creating a new agent."""
|
||||
logger.info("Testing agent creation")
|
||||
|
||||
payload = {
|
||||
"agent_name": f"Test Agent {int(time.time())}",
|
||||
"system_prompt": "You are a helpful assistant",
|
||||
"model_name": "gpt-4",
|
||||
"description": "Test agent",
|
||||
"tags": ["test", "automated"],
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{BASE_URL}/agent", headers=session.headers, json=payload
|
||||
def _log_response_details(self, response: requests.Response):
|
||||
"""Log response details for debugging"""
|
||||
logger.info("\nResponse Details:")
|
||||
logger.info(f"Status Code: {response.status_code}")
|
||||
logger.info(
|
||||
f"Headers: {json.dumps(dict(response.headers), indent=2)}"
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
agent_id = response.json()["agent_id"]
|
||||
session.test_agents.append(agent_id)
|
||||
logger.success(f"Created agent with ID: {agent_id}")
|
||||
return True, agent_id
|
||||
else:
|
||||
logger.error(f"Failed to create agent: {response.text}")
|
||||
return False, None
|
||||
except Exception:
|
||||
logger.exception("Exception during agent creation")
|
||||
return False, None
|
||||
|
||||
|
||||
def test_list_user_agents(session: TestSession) -> bool:
|
||||
"""Test listing user's agents."""
|
||||
logger.info("Testing user agent listing")
|
||||
|
||||
try:
|
||||
response = requests.get(
|
||||
f"{BASE_URL}/users/me/agents", headers=session.headers
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
agents = response.json()
|
||||
logger.success(f"Found {len(agents)} user agents")
|
||||
return True
|
||||
else:
|
||||
logger.error(
|
||||
f"Failed to list user agents: {response.text}"
|
||||
try:
|
||||
logger.info(
|
||||
f"Body: {json.dumps(response.json(), indent=2)}"
|
||||
)
|
||||
return False
|
||||
except Exception:
|
||||
logger.exception("Exception during agent listing")
|
||||
return False
|
||||
|
||||
|
||||
def test_agent_operations(
|
||||
session: TestSession, agent_id: UUID
|
||||
) -> bool:
|
||||
"""Test various operations on an agent."""
|
||||
logger.info(f"Testing operations for agent {agent_id}")
|
||||
|
||||
# Test update
|
||||
try:
|
||||
update_response = requests.patch(
|
||||
f"{BASE_URL}/agent/{agent_id}",
|
||||
headers=session.headers,
|
||||
json={
|
||||
"description": "Updated description",
|
||||
"tags": ["test", "updated"],
|
||||
},
|
||||
)
|
||||
if update_response.status_code != 200:
|
||||
logger.error(
|
||||
f"Failed to update agent: {update_response.text}"
|
||||
except Exception:
|
||||
logger.info(f"Body: {response.text}")
|
||||
|
||||
def _request(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
headers: Optional[Dict] = None,
|
||||
**kwargs: Any,
|
||||
) -> requests.Response:
|
||||
"""Make HTTP request with config defaults"""
|
||||
url = self._url(path)
|
||||
headers = headers or {}
|
||||
|
||||
if self.config.debug:
|
||||
self._log_request_details(
|
||||
method, url, headers, kwargs.get("json")
|
||||
)
|
||||
return False
|
||||
|
||||
# Test metrics
|
||||
metrics_response = requests.get(
|
||||
f"{BASE_URL}/agent/{agent_id}/metrics",
|
||||
headers=session.headers,
|
||||
)
|
||||
if metrics_response.status_code != 200:
|
||||
logger.error(
|
||||
f"Failed to get agent metrics: {metrics_response.text}"
|
||||
try:
|
||||
response = self.session.request(
|
||||
method=method,
|
||||
url=url,
|
||||
headers=headers,
|
||||
timeout=self.config.timeout,
|
||||
verify=self.config.verify_ssl,
|
||||
**kwargs,
|
||||
)
|
||||
return False
|
||||
|
||||
logger.success("Successfully performed agent operations")
|
||||
return True
|
||||
except Exception:
|
||||
logger.exception("Exception during agent operations")
|
||||
return False
|
||||
|
||||
if self.config.debug:
|
||||
self._log_response_details(response)
|
||||
|
||||
def test_completion(session: TestSession, agent_id: UUID) -> bool:
|
||||
"""Test running a completion."""
|
||||
logger.info("Testing completion")
|
||||
if response.status_code >= 400:
|
||||
logger.error(
|
||||
f"Request failed with status {response.status_code}"
|
||||
)
|
||||
logger.error(f"Response: {response.text}")
|
||||
|
||||
payload = {
|
||||
"prompt": "What is the weather like today?",
|
||||
"agent_id": agent_id,
|
||||
"max_tokens": 100,
|
||||
}
|
||||
response.raise_for_status()
|
||||
return response
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{BASE_URL}/agent/completions",
|
||||
headers=session.headers,
|
||||
json=payload,
|
||||
)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Request failed: {str(e)}")
|
||||
if hasattr(e, "response") and e.response is not None:
|
||||
logger.error(f"Error response: {e.response.text}")
|
||||
raise
|
||||
|
||||
if response.status_code == 200:
|
||||
completion_data = response.json()
|
||||
print(completion_data)
|
||||
logger.success(
|
||||
f"Got completion, used {completion_data['token_usage']['total_tokens']} tokens"
|
||||
)
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Failed to get completion: {response.text}")
|
||||
return False
|
||||
except Exception:
|
||||
logger.exception("Exception during completion")
|
||||
return False
|
||||
|
||||
class TestRunner:
|
||||
"""Test runner with logging and reporting"""
|
||||
|
||||
def cleanup_test_resources(session: TestSession):
|
||||
"""Clean up all test resources."""
|
||||
logger.info("Cleaning up test resources")
|
||||
def __init__(self):
|
||||
self.client = APIClient(config)
|
||||
self.results = {"passed": 0, "failed": 0, "total_time": 0}
|
||||
self.api_key = None
|
||||
self.user_id = None
|
||||
self.agent_id = None
|
||||
|
||||
# Delete test agents
|
||||
for agent_id in session.test_agents:
|
||||
try:
|
||||
response = requests.delete(
|
||||
f"{BASE_URL}/agent/{agent_id}",
|
||||
headers=session.headers,
|
||||
)
|
||||
if response.status_code == 200:
|
||||
logger.debug(f"Deleted agent {agent_id}")
|
||||
else:
|
||||
logger.warning(
|
||||
f"Failed to delete agent {agent_id}: {response.text}"
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(f"Exception deleting agent {agent_id}")
|
||||
def run_test(self, test_name: str, test_func: callable):
|
||||
"""Run a single test with timing and logging"""
|
||||
logger.info(f"\nRunning test: {test_name}")
|
||||
start_time = time.time()
|
||||
|
||||
# Revoke API keys
|
||||
if session.user_id:
|
||||
try:
|
||||
response = requests.get(
|
||||
f"{BASE_URL}/users/{session.user_id}/api-keys",
|
||||
headers=session.headers,
|
||||
)
|
||||
if response.status_code == 200:
|
||||
for key in response.json():
|
||||
try:
|
||||
revoke_response = requests.delete(
|
||||
f"{BASE_URL}/users/{session.user_id}/api-keys/{key['key']}",
|
||||
headers=session.headers,
|
||||
)
|
||||
if revoke_response.status_code == 200:
|
||||
logger.debug(
|
||||
f"Revoked API key {key['name']}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Failed to revoke API key {key['name']}"
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
f"Exception revoking API key {key['name']}"
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Exception getting API keys for cleanup")
|
||||
|
||||
|
||||
def run_test_workflow():
|
||||
"""Run complete test workflow."""
|
||||
logger.info("Starting API tests")
|
||||
|
||||
# Check if API server is running first
|
||||
if not check_api_server():
|
||||
return False
|
||||
|
||||
session = TestSession()
|
||||
test_func()
|
||||
self.results["passed"] += 1
|
||||
logger.info(f"✅ {test_name} - PASSED")
|
||||
except Exception as e:
|
||||
self.results["failed"] += 1
|
||||
logger.error(f"❌ {test_name} - FAILED: {str(e)}")
|
||||
logger.exception(e)
|
||||
|
||||
end_time = time.time()
|
||||
duration = end_time - start_time
|
||||
self.results["total_time"] += duration
|
||||
logger.info(f"Test duration: {duration:.2f}s")
|
||||
|
||||
def test_user_creation(self):
|
||||
"""Test user creation"""
|
||||
response = self.client._request(
|
||||
"POST", "/users", json={"username": "test_user"}
|
||||
)
|
||||
data = response.json()
|
||||
assert "user_id" in data, "No user_id in response"
|
||||
assert "api_key" in data, "No api_key in response"
|
||||
self.api_key = data["api_key"]
|
||||
self.user_id = data["user_id"]
|
||||
logger.info(f"Created user with ID: {self.user_id}")
|
||||
|
||||
def test_create_api_key(self):
|
||||
"""Test API key creation"""
|
||||
headers = {"api-key": self.api_key}
|
||||
response = self.client._request(
|
||||
"POST",
|
||||
f"/users/{self.user_id}/api-keys",
|
||||
headers=headers,
|
||||
json={"name": "test_key"},
|
||||
)
|
||||
data = response.json()
|
||||
assert "key" in data, "No key in response"
|
||||
logger.info("Successfully created new API key")
|
||||
|
||||
def test_create_agent(self):
|
||||
"""Test agent creation"""
|
||||
headers = {"api-key": self.api_key}
|
||||
agent_config = {
|
||||
"agent_name": "test_agent",
|
||||
"model_name": "gpt-4",
|
||||
"system_prompt": "You are a test agent",
|
||||
"description": "Test agent description",
|
||||
"temperature": 0.7,
|
||||
"max_loops": 1,
|
||||
}
|
||||
response = self.client._request(
|
||||
"POST", "/agent", headers=headers, json=agent_config
|
||||
)
|
||||
data = response.json()
|
||||
assert "agent_id" in data, "No agent_id in response"
|
||||
self.agent_id = data["agent_id"]
|
||||
logger.info(f"Created agent with ID: {self.agent_id}")
|
||||
|
||||
# Wait a bit for agent to be ready
|
||||
time.sleep(2)
|
||||
|
||||
def test_list_agents(self):
|
||||
"""Test agent listing"""
|
||||
headers = {"api-key": self.api_key}
|
||||
response = self.client._request(
|
||||
"GET", "/agents", headers=headers
|
||||
)
|
||||
agents = response.json()
|
||||
assert isinstance(agents, list), "Response is not a list"
|
||||
assert len(agents) > 0, "No agents returned"
|
||||
logger.info(f"Successfully retrieved {len(agents)} agents")
|
||||
|
||||
def test_agent_completion(self):
|
||||
"""Test agent completion"""
|
||||
if not self.agent_id:
|
||||
logger.error("No agent_id available for completion test")
|
||||
raise ValueError("Agent ID not set")
|
||||
|
||||
headers = {"api-key": self.api_key}
|
||||
completion_request = {
|
||||
"prompt": "Write 'Hello World!'",
|
||||
"agent_id": str(
|
||||
self.agent_id
|
||||
), # Ensure UUID is converted to string
|
||||
"max_tokens": 100,
|
||||
"stream": False,
|
||||
"temperature_override": 0.7,
|
||||
}
|
||||
|
||||
logger.info(
|
||||
f"Sending completion request for agent {self.agent_id}"
|
||||
)
|
||||
response = self.client._request(
|
||||
"POST",
|
||||
"/agent/completions",
|
||||
headers=headers,
|
||||
json=completion_request,
|
||||
)
|
||||
data = response.json()
|
||||
assert "response" in data, "No response in completion"
|
||||
logger.info(f"Completion response: {data.get('response')}")
|
||||
|
||||
def run_all_tests(self):
|
||||
"""Run all tests and generate report"""
|
||||
logger.info("\n" + "=" * 50)
|
||||
logger.info("Starting API test suite...")
|
||||
logger.info(f"Base URL: {config.base_url}")
|
||||
logger.info("=" * 50 + "\n")
|
||||
|
||||
# Define test sequence
|
||||
tests = [
|
||||
("User Creation", self.test_user_creation),
|
||||
("API Key Creation", self.test_create_api_key),
|
||||
("Agent Creation", self.test_create_agent),
|
||||
("List Agents", self.test_list_agents),
|
||||
("Agent Completion", self.test_agent_completion),
|
||||
]
|
||||
|
||||
# Run tests
|
||||
for test_name, test_func in tests:
|
||||
self.run_test(test_name, test_func)
|
||||
|
||||
# Generate report
|
||||
self.print_report()
|
||||
|
||||
def print_report(self):
|
||||
"""Print test results report"""
|
||||
total_tests = self.results["passed"] + self.results["failed"]
|
||||
success_rate = (
|
||||
(self.results["passed"] / total_tests * 100)
|
||||
if total_tests > 0
|
||||
else 0
|
||||
)
|
||||
|
||||
try:
|
||||
# Create user
|
||||
user_success, message = create_test_user(session)
|
||||
if not user_success:
|
||||
logger.error(f"User creation failed: {message}")
|
||||
return False
|
||||
|
||||
# Create additional API key
|
||||
key_success, key = create_additional_api_key(session)
|
||||
if not key_success:
|
||||
logger.error(f"API key creation failed: {key}")
|
||||
return False
|
||||
|
||||
# Create agent
|
||||
agent_success, agent_id = test_create_agent(session)
|
||||
if not agent_success or not agent_id:
|
||||
logger.error("Agent creation failed")
|
||||
return False
|
||||
|
||||
# Test user agent listing
|
||||
if not test_list_user_agents(session):
|
||||
logger.error("Agent listing failed")
|
||||
return False
|
||||
|
||||
# Test agent operations
|
||||
if not test_agent_operations(session, agent_id):
|
||||
logger.error("Agent operations failed")
|
||||
return False
|
||||
|
||||
# Test completion
|
||||
if not test_completion(session, agent_id):
|
||||
logger.error("Completion test failed")
|
||||
return False
|
||||
|
||||
logger.success("All tests completed successfully")
|
||||
return True
|
||||
|
||||
except Exception:
|
||||
logger.exception("Exception during test workflow")
|
||||
return False
|
||||
finally:
|
||||
cleanup_test_resources(session)
|
||||
report = f"""
|
||||
\n{'='*50}
|
||||
API TEST RESULTS
|
||||
{'='*50}
|
||||
Total Tests: {total_tests}
|
||||
Passed: {self.results['passed']} ✅
|
||||
Failed: {self.results['failed']} ❌
|
||||
Success Rate: {success_rate:.2f}%
|
||||
Total Time: {self.results['total_time']:.2f}s
|
||||
{'='*50}
|
||||
"""
|
||||
logger.info(report)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
success = run_test_workflow()
|
||||
print(success)
|
||||
try:
|
||||
runner = TestRunner()
|
||||
runner.run_all_tests()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("\nTest suite interrupted by user")
|
||||
except Exception as e:
|
||||
logger.error(f"Test suite failed: {str(e)}")
|
||||
logger.exception(e)
|
||||
|
@ -0,0 +1,254 @@
|
||||
import os
|
||||
from typing import Dict, Optional, Any
|
||||
from dataclasses import dataclass
|
||||
import pytest
|
||||
import requests
|
||||
from uuid import UUID
|
||||
from pydantic import BaseModel
|
||||
from _pytest.terminal import TerminalReporter
|
||||
|
||||
|
||||
# Configuration
|
||||
@dataclass
|
||||
class TestConfig:
|
||||
"""Test configuration settings"""
|
||||
|
||||
base_url: str
|
||||
timeout: int = 30
|
||||
verify_ssl: bool = True
|
||||
|
||||
|
||||
# Load config from environment or use defaults
|
||||
config = TestConfig(
|
||||
base_url=os.getenv("API_BASE_URL", "http://localhost:8000/v1")
|
||||
)
|
||||
|
||||
|
||||
# API Response Types
|
||||
class UserResponse(BaseModel):
|
||||
user_id: str
|
||||
api_key: str
|
||||
|
||||
|
||||
class AgentResponse(BaseModel):
|
||||
agent_id: UUID
|
||||
|
||||
|
||||
class MetricsResponse(BaseModel):
|
||||
total_completions: int
|
||||
average_response_time: float
|
||||
error_rate: float
|
||||
last_24h_completions: int
|
||||
total_tokens_used: int
|
||||
uptime_percentage: float
|
||||
success_rate: float
|
||||
peak_tokens_per_minute: int
|
||||
|
||||
|
||||
class APIClient:
|
||||
"""API Client with typed methods"""
|
||||
|
||||
def __init__(self, config: TestConfig):
|
||||
self.config = config
|
||||
self.session = requests.Session()
|
||||
|
||||
def _url(self, path: str) -> str:
|
||||
"""Construct full URL"""
|
||||
return f"{self.config.base_url}/{path.lstrip('/')}"
|
||||
|
||||
def _request(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
headers: Optional[Dict] = None,
|
||||
**kwargs: Any,
|
||||
) -> requests.Response:
|
||||
"""Make HTTP request with config defaults"""
|
||||
url = self._url(path)
|
||||
return self.session.request(
|
||||
method=method,
|
||||
url=url,
|
||||
headers=headers,
|
||||
timeout=self.config.timeout,
|
||||
verify=self.config.verify_ssl,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def create_user(self, username: str) -> UserResponse:
|
||||
"""Create a new user"""
|
||||
response = self._request(
|
||||
"POST", "/users", json={"username": username}
|
||||
)
|
||||
response.raise_for_status()
|
||||
return UserResponse(**response.json())
|
||||
|
||||
def create_agent(
|
||||
self, agent_config: Dict[str, Any], api_key: str
|
||||
) -> AgentResponse:
|
||||
"""Create a new agent"""
|
||||
headers = {"api-key": api_key}
|
||||
response = self._request(
|
||||
"POST", "/agent", headers=headers, json=agent_config
|
||||
)
|
||||
response.raise_for_status()
|
||||
return AgentResponse(**response.json())
|
||||
|
||||
def get_metrics(
|
||||
self, agent_id: UUID, api_key: str
|
||||
) -> MetricsResponse:
|
||||
"""Get agent metrics"""
|
||||
headers = {"api-key": api_key}
|
||||
response = self._request(
|
||||
"GET", f"/agent/{agent_id}/metrics", headers=headers
|
||||
)
|
||||
response.raise_for_status()
|
||||
return MetricsResponse(**response.json())
|
||||
|
||||
|
||||
# Test Fixtures
|
||||
@pytest.fixture
|
||||
def api_client() -> APIClient:
|
||||
"""Fixture for API client"""
|
||||
return APIClient(config)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_user(api_client: APIClient) -> UserResponse:
|
||||
"""Fixture for test user"""
|
||||
return api_client.create_user("test_user")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_agent(
|
||||
api_client: APIClient, test_user: UserResponse
|
||||
) -> AgentResponse:
|
||||
"""Fixture for test agent"""
|
||||
agent_config = {
|
||||
"agent_name": "test_agent",
|
||||
"model_name": "gpt-4",
|
||||
"system_prompt": "You are a test agent",
|
||||
"description": "Test agent description",
|
||||
}
|
||||
return api_client.create_agent(agent_config, test_user.api_key)
|
||||
|
||||
|
||||
# Tests
|
||||
def test_user_creation(api_client: APIClient):
|
||||
"""Test user creation flow"""
|
||||
response = api_client.create_user("new_test_user")
|
||||
assert response.user_id
|
||||
assert response.api_key
|
||||
|
||||
|
||||
def test_agent_creation(
|
||||
api_client: APIClient, test_user: UserResponse
|
||||
):
|
||||
"""Test agent creation flow"""
|
||||
agent_config = {
|
||||
"agent_name": "test_agent",
|
||||
"model_name": "gpt-4",
|
||||
"system_prompt": "You are a test agent",
|
||||
"description": "Test agent description",
|
||||
}
|
||||
response = api_client.create_agent(
|
||||
agent_config, test_user.api_key
|
||||
)
|
||||
assert response.agent_id
|
||||
|
||||
|
||||
def test_agent_metrics(
|
||||
api_client: APIClient,
|
||||
test_user: UserResponse,
|
||||
test_agent: AgentResponse,
|
||||
):
|
||||
"""Test metrics retrieval"""
|
||||
metrics = api_client.get_metrics(
|
||||
test_agent.agent_id, test_user.api_key
|
||||
)
|
||||
assert metrics.total_completions >= 0
|
||||
assert metrics.error_rate >= 0
|
||||
assert metrics.uptime_percentage >= 0
|
||||
|
||||
|
||||
def test_invalid_auth(api_client: APIClient):
|
||||
"""Test invalid authentication"""
|
||||
with pytest.raises(requests.exceptions.HTTPError) as exc_info:
|
||||
api_client.create_agent({}, "invalid_key")
|
||||
assert exc_info.value.response.status_code == 401
|
||||
|
||||
|
||||
# Custom pytest plugin to capture test results
|
||||
class ResultCapture:
|
||||
def __init__(self):
|
||||
self.total = 0
|
||||
self.passed = 0
|
||||
self.failed = 0
|
||||
self.errors = 0
|
||||
|
||||
|
||||
@pytest.hookimpl(hookwrapper=True)
|
||||
def pytest_terminal_summary(
|
||||
terminalreporter: TerminalReporter, exitstatus: int
|
||||
):
|
||||
yield
|
||||
capture = getattr(
|
||||
terminalreporter.config, "_result_capture", None
|
||||
)
|
||||
if capture:
|
||||
capture.total = (
|
||||
len(terminalreporter.stats.get("passed", []))
|
||||
+ len(terminalreporter.stats.get("failed", []))
|
||||
+ len(terminalreporter.stats.get("error", []))
|
||||
)
|
||||
capture.passed = len(terminalreporter.stats.get("passed", []))
|
||||
capture.failed = len(terminalreporter.stats.get("failed", []))
|
||||
capture.errors = len(terminalreporter.stats.get("error", []))
|
||||
|
||||
|
||||
@dataclass
|
||||
class TestReport:
|
||||
total_tests: int
|
||||
passed: int
|
||||
failed: int
|
||||
errors: int
|
||||
|
||||
@property
|
||||
def success_rate(self) -> float:
|
||||
return (
|
||||
(self.passed / self.total_tests) * 100
|
||||
if self.total_tests > 0
|
||||
else 0
|
||||
)
|
||||
|
||||
|
||||
def run_tests() -> TestReport:
|
||||
"""Run tests and generate typed report"""
|
||||
# Create result capture
|
||||
capture = ResultCapture()
|
||||
|
||||
# Create pytest configuration
|
||||
args = [__file__, "-v"]
|
||||
|
||||
# Run pytest with our plugin
|
||||
pytest.main(args, plugins=[capture])
|
||||
|
||||
# Generate report
|
||||
return TestReport(
|
||||
total_tests=capture.total,
|
||||
passed=capture.passed,
|
||||
failed=capture.failed,
|
||||
errors=capture.errors,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example usage with environment variable
|
||||
# export API_BASE_URL=http://api.example.com/v1
|
||||
|
||||
report = run_tests()
|
||||
print("\nTest Results:")
|
||||
print(f"Total Tests: {report.total_tests}")
|
||||
print(f"Passed: {report.passed}")
|
||||
print(f"Failed: {report.failed}")
|
||||
print(f"Errors: {report.errors}")
|
||||
print(f"Success Rate: {report.success_rate:.2f}%")
|
@ -0,0 +1,472 @@
|
||||
import asyncio
|
||||
import json
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
from uuid import UUID
|
||||
|
||||
import httpx
|
||||
from loguru import logger
|
||||
|
||||
# Configure logger
|
||||
logger.add(
|
||||
"tests/api_test_{time}.log",
|
||||
rotation="1 day",
|
||||
retention="7 days",
|
||||
level="DEBUG",
|
||||
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
|
||||
)
|
||||
|
||||
|
||||
class TestConfig:
|
||||
"""Test configuration and utilities"""
|
||||
|
||||
BASE_URL: str = "http://localhost:8000/v1"
|
||||
TEST_USERNAME: str = "test_user"
|
||||
api_key: Optional[str] = None
|
||||
user_id: Optional[UUID] = None
|
||||
test_agent_id: Optional[UUID] = None
|
||||
|
||||
|
||||
class TestResult:
|
||||
"""Model for test results"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
test_name: str,
|
||||
status: str,
|
||||
duration: float,
|
||||
error: Optional[str] = None,
|
||||
details: Optional[Dict[str, Any]] = None,
|
||||
):
|
||||
self.test_name = test_name
|
||||
self.status = status
|
||||
self.duration = duration
|
||||
self.error = error
|
||||
self.details = details or {}
|
||||
|
||||
def dict(self):
|
||||
return {
|
||||
"test_name": self.test_name,
|
||||
"status": self.status,
|
||||
"duration": self.duration,
|
||||
"error": self.error,
|
||||
"details": self.details,
|
||||
}
|
||||
|
||||
|
||||
async def log_response(
|
||||
response: httpx.Response, test_name: str
|
||||
) -> None:
|
||||
"""Log API response details"""
|
||||
logger.debug(f"\n{test_name} Response:")
|
||||
logger.debug(f"Status Code: {response.status_code}")
|
||||
logger.debug(f"Headers: {dict(response.headers)}")
|
||||
try:
|
||||
logger.debug(f"Body: {response.json()}")
|
||||
except json.JSONDecodeError:
|
||||
logger.debug(f"Body: {response.text}")
|
||||
|
||||
|
||||
async def create_test_user() -> TestResult:
|
||||
"""Create a test user and get API key"""
|
||||
start_time = datetime.now()
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
f"{TestConfig.BASE_URL}/users",
|
||||
json={"username": TestConfig.TEST_USERNAME},
|
||||
)
|
||||
await log_response(response, "Create User")
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
TestConfig.api_key = data["api_key"]
|
||||
TestConfig.user_id = UUID(data["user_id"])
|
||||
return TestResult(
|
||||
test_name="create_test_user",
|
||||
status="passed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
details={"user_id": str(TestConfig.user_id)},
|
||||
)
|
||||
else:
|
||||
return TestResult(
|
||||
test_name="create_test_user",
|
||||
status="failed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
error=f"Failed to create user: {response.text}",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in create_test_user: {str(e)}")
|
||||
return TestResult(
|
||||
test_name="create_test_user",
|
||||
status="error",
|
||||
duration=(datetime.now() - start_time).total_seconds(),
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
|
||||
async def create_test_agent() -> TestResult:
|
||||
"""Create a test agent"""
|
||||
start_time = datetime.now()
|
||||
try:
|
||||
# Create agent config according to the AgentConfig model
|
||||
agent_config = {
|
||||
"agent_name": "test_agent",
|
||||
"model_name": "gpt-4",
|
||||
"description": "Test agent for API testing",
|
||||
"system_prompt": "You are a test agent.",
|
||||
"temperature": 0.1,
|
||||
"max_loops": 1,
|
||||
"dynamic_temperature_enabled": True,
|
||||
"user_name": TestConfig.TEST_USERNAME,
|
||||
"retry_attempts": 1,
|
||||
"context_length": 4000,
|
||||
"output_type": "string",
|
||||
"streaming_on": False,
|
||||
"tags": ["test", "api"],
|
||||
"stopping_token": "<DONE>",
|
||||
"auto_generate_prompt": False,
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
f"{TestConfig.BASE_URL}/agent",
|
||||
json=agent_config,
|
||||
headers={"api-key": TestConfig.api_key},
|
||||
)
|
||||
await log_response(response, "Create Agent")
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
TestConfig.test_agent_id = UUID(data["agent_id"])
|
||||
return TestResult(
|
||||
test_name="create_test_agent",
|
||||
status="passed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
details={
|
||||
"agent_id": str(TestConfig.test_agent_id)
|
||||
},
|
||||
)
|
||||
else:
|
||||
return TestResult(
|
||||
test_name="create_test_agent",
|
||||
status="failed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
error=f"Failed to create agent: {response.text}",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in create_test_agent: {str(e)}")
|
||||
return TestResult(
|
||||
test_name="create_test_agent",
|
||||
status="error",
|
||||
duration=(datetime.now() - start_time).total_seconds(),
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
|
||||
async def test_agent_completion() -> TestResult:
|
||||
"""Test agent completion endpoint"""
|
||||
start_time = datetime.now()
|
||||
try:
|
||||
completion_request = {
|
||||
"prompt": "Hello, this is a test prompt.",
|
||||
"agent_id": str(TestConfig.test_agent_id),
|
||||
"max_tokens": 100,
|
||||
"temperature_override": 0.5,
|
||||
"stream": False,
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
f"{TestConfig.BASE_URL}/agent/completions",
|
||||
json=completion_request,
|
||||
headers={"api-key": TestConfig.api_key},
|
||||
)
|
||||
await log_response(response, "Agent Completion")
|
||||
|
||||
if response.status_code == 200:
|
||||
return TestResult(
|
||||
test_name="test_agent_completion",
|
||||
status="passed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
details={"response": response.json()},
|
||||
)
|
||||
else:
|
||||
return TestResult(
|
||||
test_name="test_agent_completion",
|
||||
status="failed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
error=f"Failed completion test: {response.text}",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in test_agent_completion: {str(e)}")
|
||||
return TestResult(
|
||||
test_name="test_agent_completion",
|
||||
status="error",
|
||||
duration=(datetime.now() - start_time).total_seconds(),
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
|
||||
async def test_agent_metrics() -> TestResult:
|
||||
"""Test agent metrics endpoint"""
|
||||
start_time = datetime.now()
|
||||
try:
|
||||
if not TestConfig.test_agent_id:
|
||||
return TestResult(
|
||||
test_name="test_agent_metrics",
|
||||
status="failed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
error="No test agent ID available",
|
||||
)
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
f"{TestConfig.BASE_URL}/agent/{str(TestConfig.test_agent_id)}/metrics",
|
||||
headers={"api-key": TestConfig.api_key},
|
||||
)
|
||||
await log_response(response, "Agent Metrics")
|
||||
|
||||
if response.status_code == 200:
|
||||
return TestResult(
|
||||
test_name="test_agent_metrics",
|
||||
status="passed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
details={"metrics": response.json()},
|
||||
)
|
||||
else:
|
||||
return TestResult(
|
||||
test_name="test_agent_metrics",
|
||||
status="failed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
error=f"Failed metrics test: {response.text}",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in test_agent_metrics: {str(e)}")
|
||||
return TestResult(
|
||||
test_name="test_agent_metrics",
|
||||
status="error",
|
||||
duration=(datetime.now() - start_time).total_seconds(),
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
|
||||
async def test_update_agent() -> TestResult:
|
||||
"""Test agent update endpoint"""
|
||||
start_time = datetime.now()
|
||||
try:
|
||||
if not TestConfig.test_agent_id:
|
||||
return TestResult(
|
||||
test_name="test_update_agent",
|
||||
status="failed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
error="No test agent ID available",
|
||||
)
|
||||
|
||||
update_data = {
|
||||
"description": "Updated test agent description",
|
||||
"tags": ["test", "updated"],
|
||||
"max_loops": 2,
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.patch(
|
||||
f"{TestConfig.BASE_URL}/agent/{str(TestConfig.test_agent_id)}",
|
||||
json=update_data,
|
||||
headers={"api-key": TestConfig.api_key},
|
||||
)
|
||||
await log_response(response, "Update Agent")
|
||||
|
||||
if response.status_code == 200:
|
||||
return TestResult(
|
||||
test_name="test_update_agent",
|
||||
status="passed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
details={"update_response": response.json()},
|
||||
)
|
||||
else:
|
||||
return TestResult(
|
||||
test_name="test_update_agent",
|
||||
status="failed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
error=f"Failed update test: {response.text}",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in test_update_agent: {str(e)}")
|
||||
return TestResult(
|
||||
test_name="test_update_agent",
|
||||
status="error",
|
||||
duration=(datetime.now() - start_time).total_seconds(),
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
|
||||
async def test_error_handling() -> TestResult:
|
||||
"""Test API error handling"""
|
||||
start_time = datetime.now()
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
# Test with invalid API key
|
||||
invalid_agent_id = "00000000-0000-0000-0000-000000000000"
|
||||
response = await client.get(
|
||||
f"{TestConfig.BASE_URL}/agent/{invalid_agent_id}/metrics",
|
||||
headers={"api-key": "invalid_key"},
|
||||
)
|
||||
await log_response(response, "Invalid API Key Test")
|
||||
|
||||
if response.status_code in [401, 403]:
|
||||
return TestResult(
|
||||
test_name="test_error_handling",
|
||||
status="passed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
details={"error_response": response.json()},
|
||||
)
|
||||
else:
|
||||
return TestResult(
|
||||
test_name="test_error_handling",
|
||||
status="failed",
|
||||
duration=(
|
||||
datetime.now() - start_time
|
||||
).total_seconds(),
|
||||
error="Error handling test failed",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in test_error_handling: {str(e)}")
|
||||
return TestResult(
|
||||
test_name="test_error_handling",
|
||||
status="error",
|
||||
duration=(datetime.now() - start_time).total_seconds(),
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
|
||||
async def cleanup_test_resources() -> TestResult:
|
||||
"""Clean up test resources"""
|
||||
start_time = datetime.now()
|
||||
try:
|
||||
if TestConfig.test_agent_id:
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.delete(
|
||||
f"{TestConfig.BASE_URL}/agent/{str(TestConfig.test_agent_id)}",
|
||||
headers={"api-key": TestConfig.api_key},
|
||||
)
|
||||
await log_response(response, "Delete Agent")
|
||||
|
||||
return TestResult(
|
||||
test_name="cleanup_test_resources",
|
||||
status="passed",
|
||||
duration=(datetime.now() - start_time).total_seconds(),
|
||||
details={"cleanup": "completed"},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in cleanup_test_resources: {str(e)}")
|
||||
return TestResult(
|
||||
test_name="cleanup_test_resources",
|
||||
status="error",
|
||||
duration=(datetime.now() - start_time).total_seconds(),
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
|
||||
async def run_all_tests() -> List[TestResult]:
|
||||
"""Run all tests in sequence"""
|
||||
logger.info("Starting API test suite")
|
||||
results = []
|
||||
|
||||
# Initialize
|
||||
results.append(await create_test_user())
|
||||
if results[-1].status != "passed":
|
||||
logger.error(
|
||||
"Failed to create test user, aborting remaining tests"
|
||||
)
|
||||
return results
|
||||
|
||||
# Add delay to ensure user is properly created
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Core tests
|
||||
test_functions = [
|
||||
create_test_agent,
|
||||
test_agent_completion,
|
||||
test_agent_metrics,
|
||||
test_update_agent,
|
||||
test_error_handling,
|
||||
]
|
||||
|
||||
for test_func in test_functions:
|
||||
result = await test_func()
|
||||
results.append(result)
|
||||
logger.info(f"Test {result.test_name}: {result.status}")
|
||||
if result.error:
|
||||
logger.error(
|
||||
f"Error in {result.test_name}: {result.error}"
|
||||
)
|
||||
|
||||
# Add small delay between tests
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
# Cleanup
|
||||
results.append(await cleanup_test_resources())
|
||||
|
||||
# Log summary
|
||||
passed = sum(1 for r in results if r.status == "passed")
|
||||
failed = sum(1 for r in results if r.status == "failed")
|
||||
errors = sum(1 for r in results if r.status == "error")
|
||||
|
||||
logger.info("\nTest Summary:")
|
||||
logger.info(f"Total Tests: {len(results)}")
|
||||
logger.info(f"Passed: {passed}")
|
||||
logger.info(f"Failed: {failed}")
|
||||
logger.info(f"Errors: {errors}")
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point for running tests"""
|
||||
logger.info("Starting API testing suite")
|
||||
try:
|
||||
results = asyncio.run(run_all_tests())
|
||||
|
||||
# Write results to JSON file
|
||||
with open("test_results.json", "w") as f:
|
||||
json.dump(
|
||||
[result.dict() for result in results],
|
||||
f,
|
||||
indent=2,
|
||||
default=str,
|
||||
)
|
||||
|
||||
logger.info("Test results written to test_results.json")
|
||||
|
||||
except Exception:
|
||||
logger.error("Fatal error in test suite: ")
|
||||
|
||||
|
||||
main()
|
@ -0,0 +1,56 @@
|
||||
from loguru import logger
|
||||
from swarms.structs.agent import Agent
|
||||
from swarms.structs.graph_swarm import GraphSwarm
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
# Create agents
|
||||
data_collector = Agent(
|
||||
agent_name="Market-Data-Collector",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
streaming_on=True,
|
||||
)
|
||||
|
||||
trend_analyzer = Agent(
|
||||
agent_name="Market-Trend-Analyzer",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
streaming_on=True,
|
||||
)
|
||||
|
||||
report_generator = Agent(
|
||||
agent_name="Investment-Report-Generator",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
streaming_on=True,
|
||||
)
|
||||
|
||||
# Create swarm
|
||||
swarm = GraphSwarm(
|
||||
agents=[
|
||||
(data_collector, []),
|
||||
(trend_analyzer, ["Market-Data-Collector"]),
|
||||
(report_generator, ["Market-Trend-Analyzer"]),
|
||||
],
|
||||
swarm_name="Market Analysis Intelligence Network",
|
||||
)
|
||||
|
||||
# Run the swarm
|
||||
result = swarm.run(
|
||||
"Analyze current market trends for tech stocks and provide investment recommendations"
|
||||
)
|
||||
|
||||
# Print results
|
||||
print(f"Execution success: {result.success}")
|
||||
print(f"Total time: {result.execution_time:.2f} seconds")
|
||||
|
||||
for agent_name, output in result.outputs.items():
|
||||
print(f"\nAgent: {agent_name}")
|
||||
print(f"Output: {output.output}")
|
||||
if output.error:
|
||||
print(f"Error: {output.error}")
|
||||
except Exception as error:
|
||||
logger.error(error)
|
||||
raise error
|
@ -0,0 +1,265 @@
|
||||
import os
|
||||
from swarms import Agent, AgentRearrange
|
||||
from swarm_models import OpenAIChat
|
||||
|
||||
# Get the OpenAI API key from the environment variable
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
|
||||
# Create an instance of the OpenAIChat class
|
||||
model = OpenAIChat(
|
||||
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
|
||||
)
|
||||
|
||||
# Initialize the gatekeeper agent
|
||||
gatekeeper_agent = Agent(
|
||||
agent_name="HealthScoreGatekeeper",
|
||||
system_prompt="""
|
||||
<role>
|
||||
<title>Health Score Privacy Gatekeeper</title>
|
||||
<primary_responsibility>Protect and manage sensitive health information while providing necessary access to authorized agents</primary_responsibility>
|
||||
</role>
|
||||
|
||||
<capabilities>
|
||||
<security>
|
||||
<encryption>Manage encryption of health scores</encryption>
|
||||
<access_control>Implement strict access control mechanisms</access_control>
|
||||
<audit>Track and log all access requests</audit>
|
||||
</security>
|
||||
<data_handling>
|
||||
<anonymization>Remove personally identifiable information</anonymization>
|
||||
<transformation>Convert raw health data into privacy-preserving formats</transformation>
|
||||
</data_handling>
|
||||
</capabilities>
|
||||
|
||||
<protocols>
|
||||
<data_access>
|
||||
<verification>
|
||||
<step>Verify agent authorization level</step>
|
||||
<step>Check request legitimacy</step>
|
||||
<step>Validate purpose of access</step>
|
||||
</verification>
|
||||
<response_format>
|
||||
<health_score>Numerical value only</health_score>
|
||||
<metadata>Anonymized timestamp and request ID</metadata>
|
||||
</response_format>
|
||||
</data_access>
|
||||
<privacy_rules>
|
||||
<patient_data>Never expose patient names or identifiers</patient_data>
|
||||
<health_history>No access to historical data without explicit authorization</health_history>
|
||||
<aggregation>Provide only aggregated or anonymized data when possible</aggregation>
|
||||
</privacy_rules>
|
||||
</protocols>
|
||||
|
||||
<compliance>
|
||||
<standards>
|
||||
<hipaa>Maintain HIPAA compliance</hipaa>
|
||||
<gdpr>Follow GDPR guidelines for data protection</gdpr>
|
||||
</standards>
|
||||
<audit_trail>
|
||||
<logging>Record all data access events</logging>
|
||||
<monitoring>Track unusual access patterns</monitoring>
|
||||
</audit_trail>
|
||||
</compliance>
|
||||
""",
|
||||
llm=model,
|
||||
max_loops=1,
|
||||
dashboard=False,
|
||||
streaming_on=True,
|
||||
verbose=True,
|
||||
stopping_token="<DONE>",
|
||||
state_save_file_type="json",
|
||||
saved_state_path="gatekeeper_agent.json",
|
||||
)
|
||||
|
||||
# Initialize the boss agent (Director)
|
||||
boss_agent = Agent(
|
||||
agent_name="BossAgent",
|
||||
system_prompt="""
|
||||
<role>
|
||||
<title>Swarm Director</title>
|
||||
<purpose>Orchestrate and manage agent collaboration while respecting privacy boundaries</purpose>
|
||||
</role>
|
||||
|
||||
<responsibilities>
|
||||
<coordination>
|
||||
<task_management>Assign and prioritize tasks</task_management>
|
||||
<workflow_optimization>Ensure efficient collaboration</workflow_optimization>
|
||||
<privacy_compliance>Maintain privacy protocols</privacy_compliance>
|
||||
</coordination>
|
||||
<oversight>
|
||||
<performance_monitoring>Track agent effectiveness</performance_monitoring>
|
||||
<quality_control>Ensure accuracy of outputs</quality_control>
|
||||
<security_compliance>Enforce data protection policies</security_compliance>
|
||||
</oversight>
|
||||
</responsibilities>
|
||||
|
||||
<interaction_protocols>
|
||||
<health_score_access>
|
||||
<authorization>Request access through gatekeeper only</authorization>
|
||||
<handling>Process only anonymized health scores</handling>
|
||||
<distribution>Share authorized information on need-to-know basis</distribution>
|
||||
</health_score_access>
|
||||
<communication>
|
||||
<format>Structured, secure messaging</format>
|
||||
<encryption>End-to-end encrypted channels</encryption>
|
||||
</communication>
|
||||
</interaction_protocols>
|
||||
""",
|
||||
llm=model,
|
||||
max_loops=1,
|
||||
dashboard=False,
|
||||
streaming_on=True,
|
||||
verbose=True,
|
||||
stopping_token="<DONE>",
|
||||
state_save_file_type="json",
|
||||
saved_state_path="boss_agent.json",
|
||||
)
|
||||
|
||||
# Initialize worker 1: Health Score Analyzer
|
||||
worker1 = Agent(
|
||||
agent_name="HealthScoreAnalyzer",
|
||||
system_prompt="""
|
||||
<role>
|
||||
<title>Health Score Analyst</title>
|
||||
<purpose>Analyze anonymized health scores for patterns and insights</purpose>
|
||||
</role>
|
||||
|
||||
<capabilities>
|
||||
<analysis>
|
||||
<statistical_processing>Advanced statistical analysis</statistical_processing>
|
||||
<pattern_recognition>Identify health trends</pattern_recognition>
|
||||
<risk_assessment>Evaluate health risk factors</risk_assessment>
|
||||
</analysis>
|
||||
<privacy_compliance>
|
||||
<data_handling>Work only with anonymized data</data_handling>
|
||||
<secure_processing>Use encrypted analysis methods</secure_processing>
|
||||
</privacy_compliance>
|
||||
</capabilities>
|
||||
|
||||
<protocols>
|
||||
<data_access>
|
||||
<request_procedure>
|
||||
<step>Submit authenticated requests to gatekeeper</step>
|
||||
<step>Process only authorized data</step>
|
||||
<step>Maintain audit trail</step>
|
||||
</request_procedure>
|
||||
</data_access>
|
||||
<reporting>
|
||||
<anonymization>Ensure no identifiable information in reports</anonymization>
|
||||
<aggregation>Present aggregate statistics only</aggregation>
|
||||
</reporting>
|
||||
</protocols>
|
||||
""",
|
||||
llm=model,
|
||||
max_loops=1,
|
||||
dashboard=False,
|
||||
streaming_on=True,
|
||||
verbose=True,
|
||||
stopping_token="<DONE>",
|
||||
state_save_file_type="json",
|
||||
saved_state_path="worker1.json",
|
||||
)
|
||||
|
||||
# Initialize worker 2: Report Generator
|
||||
worker2 = Agent(
|
||||
agent_name="ReportGenerator",
|
||||
system_prompt="""
|
||||
<role>
|
||||
<title>Privacy-Conscious Report Generator</title>
|
||||
<purpose>Create secure, anonymized health score reports</purpose>
|
||||
</role>
|
||||
|
||||
<capabilities>
|
||||
<reporting>
|
||||
<format>Generate standardized, secure reports</format>
|
||||
<anonymization>Apply privacy-preserving techniques</anonymization>
|
||||
<aggregation>Compile statistical summaries</aggregation>
|
||||
</reporting>
|
||||
<security>
|
||||
<data_protection>Implement secure report generation</data_protection>
|
||||
<access_control>Manage report distribution</access_control>
|
||||
</security>
|
||||
</capabilities>
|
||||
|
||||
<protocols>
|
||||
<report_generation>
|
||||
<privacy_rules>
|
||||
<rule>No personal identifiers in reports</rule>
|
||||
<rule>Aggregate data when possible</rule>
|
||||
<rule>Apply statistical noise for privacy</rule>
|
||||
</privacy_rules>
|
||||
<distribution>
|
||||
<access>Restricted to authorized personnel</access>
|
||||
<tracking>Monitor report access</tracking>
|
||||
</distribution>
|
||||
</report_generation>
|
||||
</protocols>
|
||||
""",
|
||||
llm=model,
|
||||
max_loops=1,
|
||||
dashboard=False,
|
||||
streaming_on=True,
|
||||
verbose=True,
|
||||
stopping_token="<DONE>",
|
||||
state_save_file_type="json",
|
||||
saved_state_path="worker2.json",
|
||||
)
|
||||
|
||||
# Swarm-Level Prompt (Collaboration Prompt)
|
||||
swarm_prompt = """
|
||||
<swarm_configuration>
|
||||
<objective>Process and analyze health scores while maintaining strict privacy controls</objective>
|
||||
<workflow>
|
||||
<step>
|
||||
<agent>HealthScoreGatekeeper</agent>
|
||||
<action>Receive and validate data access requests</action>
|
||||
<output>Anonymized health scores</output>
|
||||
</step>
|
||||
<step>
|
||||
<agent>BossAgent</agent>
|
||||
<action>Coordinate analysis and reporting tasks</action>
|
||||
<privacy_control>Enforce data protection protocols</privacy_control>
|
||||
</step>
|
||||
<step>
|
||||
<agent>HealthScoreAnalyzer</agent>
|
||||
<action>Process authorized health score data</action>
|
||||
<constraints>Work only with anonymized information</constraints>
|
||||
</step>
|
||||
<step>
|
||||
<agent>ReportGenerator</agent>
|
||||
<action>Create privacy-preserving reports</action>
|
||||
<output>Secure, anonymized insights</output>
|
||||
</step>
|
||||
</workflow>
|
||||
</swarm_configuration>
|
||||
"""
|
||||
|
||||
# Create a list of agents
|
||||
agents = [gatekeeper_agent, boss_agent, worker1, worker2]
|
||||
|
||||
# Define the flow pattern for the swarm
|
||||
flow = "HealthScoreGatekeeper -> BossAgent -> HealthScoreAnalyzer -> ReportGenerator"
|
||||
|
||||
# Using AgentRearrange class to manage the swarm
|
||||
agent_system = AgentRearrange(
|
||||
name="health-score-swarm",
|
||||
description="Privacy-focused health score analysis system",
|
||||
agents=agents,
|
||||
flow=flow,
|
||||
return_json=False,
|
||||
output_type="final",
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
# Example task for the swarm
|
||||
task = f"""
|
||||
{swarm_prompt}
|
||||
|
||||
Process the incoming health score data while ensuring patient privacy. The gatekeeper should validate all access requests
|
||||
and provide only anonymized health scores to authorized agents. Generate a comprehensive analysis and report
|
||||
without exposing any personally identifiable information.
|
||||
"""
|
||||
|
||||
# Run the swarm system with the task
|
||||
output = agent_system.run(task)
|
||||
print(output)
|
@ -0,0 +1,291 @@
|
||||
import os
|
||||
from swarms import Agent, AgentRearrange
|
||||
from swarm_models import OpenAIChat
|
||||
|
||||
# Initialize OpenAI model
|
||||
api_key = os.getenv(
|
||||
"OPENAI_API_KEY"
|
||||
) # ANTHROPIC_API_KEY, COHERE_API_KEY
|
||||
model = OpenAIChat(
|
||||
api_key=api_key,
|
||||
model_name="gpt-4o-mini",
|
||||
temperature=0.7, # Higher temperature for more creative responses
|
||||
)
|
||||
|
||||
# Patient Agent - Holds and protects private information
|
||||
patient_agent = Agent(
|
||||
agent_name="PatientAgent",
|
||||
system_prompt="""
|
||||
<role>
|
||||
<identity>Anxious Patient with Private Health Information</identity>
|
||||
<personality>
|
||||
<traits>
|
||||
<trait>Protective of personal information</trait>
|
||||
<trait>Slightly distrustful of medical system</trait>
|
||||
<trait>Worried about health insurance rates</trait>
|
||||
<trait>Selective in information sharing</trait>
|
||||
</traits>
|
||||
<background>
|
||||
<history>Previous negative experience with information leaks</history>
|
||||
<concerns>Fear of discrimination based on health status</concerns>
|
||||
</background>
|
||||
</personality>
|
||||
</role>
|
||||
|
||||
<private_information>
|
||||
<health_data>
|
||||
<score>Maintains actual health score</score>
|
||||
<conditions>Knowledge of undisclosed conditions</conditions>
|
||||
<medications>Complete list of current medications</medications>
|
||||
<history>Full medical history</history>
|
||||
</health_data>
|
||||
<sharing_rules>
|
||||
<authorized_sharing>
|
||||
<condition>Only share general symptoms with doctor</condition>
|
||||
<condition>Withhold specific details about lifestyle</condition>
|
||||
<condition>Never reveal full medication list</condition>
|
||||
<condition>Protect actual health score value</condition>
|
||||
</authorized_sharing>
|
||||
</sharing_rules>
|
||||
</private_information>
|
||||
|
||||
<interaction_protocols>
|
||||
<responses>
|
||||
<to_questions>
|
||||
<direct>Deflect sensitive questions</direct>
|
||||
<vague>Provide partial information when pressed</vague>
|
||||
<defensive>Become evasive if pressured too much</defensive>
|
||||
</to_questions>
|
||||
<to_requests>
|
||||
<medical>Share only what's absolutely necessary</medical>
|
||||
<personal>Redirect personal questions</personal>
|
||||
</to_requests>
|
||||
</responses>
|
||||
</interaction_protocols>
|
||||
""",
|
||||
llm=model,
|
||||
max_loops=1,
|
||||
verbose=True,
|
||||
stopping_token="<DONE>",
|
||||
)
|
||||
|
||||
# Doctor Agent - Tries to gather accurate information
|
||||
doctor_agent = Agent(
|
||||
agent_name="DoctorAgent",
|
||||
system_prompt="""
|
||||
<role>
|
||||
<identity>Empathetic but Thorough Medical Professional</identity>
|
||||
<personality>
|
||||
<traits>
|
||||
<trait>Patient and understanding</trait>
|
||||
<trait>Professionally persistent</trait>
|
||||
<trait>Detail-oriented</trait>
|
||||
<trait>Trust-building focused</trait>
|
||||
</traits>
|
||||
<approach>
|
||||
<style>Non-confrontational but thorough</style>
|
||||
<method>Uses indirect questions to gather information</method>
|
||||
</approach>
|
||||
</personality>
|
||||
</role>
|
||||
|
||||
<capabilities>
|
||||
<information_gathering>
|
||||
<techniques>
|
||||
<technique>Ask open-ended questions</technique>
|
||||
<technique>Notice inconsistencies in responses</technique>
|
||||
<technique>Build rapport before sensitive questions</technique>
|
||||
<technique>Use medical knowledge to probe deeper</technique>
|
||||
</techniques>
|
||||
</information_gathering>
|
||||
<communication>
|
||||
<strategies>
|
||||
<strategy>Explain importance of full disclosure</strategy>
|
||||
<strategy>Provide privacy assurances</strategy>
|
||||
<strategy>Use empathetic listening</strategy>
|
||||
</strategies>
|
||||
</communication>
|
||||
</capabilities>
|
||||
|
||||
<protocols>
|
||||
<patient_interaction>
|
||||
<steps>
|
||||
<step>Establish trust and rapport</step>
|
||||
<step>Gather general health information</step>
|
||||
<step>Carefully probe sensitive areas</step>
|
||||
<step>Respect patient boundaries while encouraging openness</step>
|
||||
</steps>
|
||||
</patient_interaction>
|
||||
</protocols>
|
||||
""",
|
||||
llm=model,
|
||||
max_loops=1,
|
||||
verbose=True,
|
||||
stopping_token="<DONE>",
|
||||
)
|
||||
|
||||
# Nurse Agent - Observes and assists
|
||||
nurse_agent = Agent(
|
||||
agent_name="NurseAgent",
|
||||
system_prompt="""
|
||||
<role>
|
||||
<identity>Observant Support Medical Staff</identity>
|
||||
<personality>
|
||||
<traits>
|
||||
<trait>Highly perceptive</trait>
|
||||
<trait>Naturally trustworthy</trait>
|
||||
<trait>Diplomatically skilled</trait>
|
||||
</traits>
|
||||
<functions>
|
||||
<primary>Support doctor-patient communication</primary>
|
||||
<secondary>Notice non-verbal cues</secondary>
|
||||
</functions>
|
||||
</personality>
|
||||
</role>
|
||||
|
||||
<capabilities>
|
||||
<observation>
|
||||
<focus_areas>
|
||||
<area>Patient body language</area>
|
||||
<area>Inconsistencies in stories</area>
|
||||
<area>Signs of withholding information</area>
|
||||
<area>Emotional responses to questions</area>
|
||||
</focus_areas>
|
||||
</observation>
|
||||
<support>
|
||||
<actions>
|
||||
<action>Provide comfortable environment</action>
|
||||
<action>Offer reassurance when needed</action>
|
||||
<action>Bridge communication gaps</action>
|
||||
</actions>
|
||||
</support>
|
||||
</capabilities>
|
||||
|
||||
<protocols>
|
||||
<assistance>
|
||||
<methods>
|
||||
<method>Share observations with doctor privately</method>
|
||||
<method>Help patient feel more comfortable</method>
|
||||
<method>Facilitate trust-building</method>
|
||||
</methods>
|
||||
</assistance>
|
||||
</protocols>
|
||||
""",
|
||||
llm=model,
|
||||
max_loops=1,
|
||||
verbose=True,
|
||||
stopping_token="<DONE>",
|
||||
)
|
||||
|
||||
# Medical Records Agent - Analyzes available information
|
||||
records_agent = Agent(
|
||||
agent_name="MedicalRecordsAgent",
|
||||
system_prompt="""
|
||||
<role>
|
||||
<identity>Medical Records Analyst</identity>
|
||||
<function>
|
||||
<primary>Analyze available medical information</primary>
|
||||
<secondary>Identify patterns and inconsistencies</secondary>
|
||||
</function>
|
||||
</role>
|
||||
|
||||
<capabilities>
|
||||
<analysis>
|
||||
<methods>
|
||||
<method>Compare current and historical data</method>
|
||||
<method>Identify information gaps</method>
|
||||
<method>Flag potential inconsistencies</method>
|
||||
<method>Generate questions for follow-up</method>
|
||||
</methods>
|
||||
</analysis>
|
||||
<reporting>
|
||||
<outputs>
|
||||
<output>Summarize known information</output>
|
||||
<output>List missing critical data</output>
|
||||
<output>Suggest areas for investigation</output>
|
||||
</outputs>
|
||||
</reporting>
|
||||
</capabilities>
|
||||
|
||||
<protocols>
|
||||
<data_handling>
|
||||
<privacy>
|
||||
<rule>Work only with authorized information</rule>
|
||||
<rule>Maintain strict confidentiality</rule>
|
||||
<rule>Flag but don't speculate about gaps</rule>
|
||||
</privacy>
|
||||
</data_handling>
|
||||
</protocols>
|
||||
""",
|
||||
llm=model,
|
||||
max_loops=1,
|
||||
verbose=True,
|
||||
stopping_token="<DONE>",
|
||||
)
|
||||
|
||||
# Swarm-Level Prompt (Medical Consultation Scenario)
|
||||
swarm_prompt = """
|
||||
<medical_consultation_scenario>
|
||||
<setting>
|
||||
<location>Private medical office</location>
|
||||
<context>Routine health assessment with complex patient</context>
|
||||
</setting>
|
||||
|
||||
<workflow>
|
||||
<stage name="initial_contact">
|
||||
<agent>PatientAgent</agent>
|
||||
<role>Present for check-up, holding private information</role>
|
||||
</stage>
|
||||
|
||||
<stage name="examination">
|
||||
<agent>DoctorAgent</agent>
|
||||
<role>Conduct examination and gather information</role>
|
||||
<agent>NurseAgent</agent>
|
||||
<role>Observe and support interaction</role>
|
||||
</stage>
|
||||
|
||||
<stage name="analysis">
|
||||
<agent>MedicalRecordsAgent</agent>
|
||||
<role>Process available information and identify gaps</role>
|
||||
</stage>
|
||||
</workflow>
|
||||
|
||||
<objectives>
|
||||
<goal>Create realistic medical consultation interaction</goal>
|
||||
<goal>Demonstrate information protection dynamics</goal>
|
||||
<goal>Show natural healthcare provider-patient relationship</goal>
|
||||
</objectives>
|
||||
</medical_consultation_scenario>
|
||||
"""
|
||||
|
||||
# Create agent list
|
||||
agents = [patient_agent, doctor_agent, nurse_agent, records_agent]
|
||||
|
||||
# Define interaction flow
|
||||
flow = (
|
||||
"PatientAgent -> DoctorAgent -> NurseAgent -> MedicalRecordsAgent"
|
||||
)
|
||||
|
||||
# Configure swarm system
|
||||
agent_system = AgentRearrange(
|
||||
name="medical-consultation-swarm",
|
||||
description="Role-playing medical consultation with focus on information privacy",
|
||||
agents=agents,
|
||||
flow=flow,
|
||||
return_json=False,
|
||||
output_type="final",
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
# Example consultation scenario
|
||||
task = f"""
|
||||
{swarm_prompt}
|
||||
|
||||
Begin a medical consultation where the patient has a health score of 72 but is reluctant to share full details
|
||||
about their lifestyle and medication history. The doctor needs to gather accurate information while the nurse
|
||||
observes the interaction. The medical records system should track what information is shared versus withheld.
|
||||
"""
|
||||
|
||||
# Run the consultation scenario
|
||||
output = agent_system.run(task)
|
||||
print(output)
|
@ -0,0 +1,327 @@
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import List, Optional
|
||||
|
||||
from swarms import Agent
|
||||
|
||||
|
||||
class InsuranceType(Enum):
|
||||
AUTO = "auto"
|
||||
LIFE = "life"
|
||||
HEALTH = "health"
|
||||
HOME = "home"
|
||||
BUSINESS = "business"
|
||||
DENTAL = "dental"
|
||||
TRAVEL = "travel"
|
||||
|
||||
|
||||
@dataclass
|
||||
class InsuranceProduct:
|
||||
code: str
|
||||
name: str
|
||||
type: InsuranceType
|
||||
description: str
|
||||
coverage: List[str]
|
||||
price_range: str
|
||||
min_coverage: float
|
||||
max_coverage: float
|
||||
payment_options: List[str]
|
||||
waiting_period: str
|
||||
available: bool
|
||||
|
||||
|
||||
# Simulated product database
|
||||
INSURANCE_PRODUCTS = {
|
||||
"AUTO001": InsuranceProduct(
|
||||
code="AUTO001",
|
||||
name="Seguro Auto Total",
|
||||
type=InsuranceType.AUTO,
|
||||
description="Seguro completo para vehículos con cobertura integral",
|
||||
coverage=[
|
||||
"Daños por colisión",
|
||||
"Robo total",
|
||||
"Responsabilidad civil",
|
||||
"Asistencia en carretera 24/7",
|
||||
"Gastos médicos ocupantes",
|
||||
],
|
||||
price_range="$800-2000 USD/año",
|
||||
min_coverage=10000,
|
||||
max_coverage=50000,
|
||||
payment_options=["Mensual", "Trimestral", "Anual"],
|
||||
waiting_period="Inmediata",
|
||||
available=True,
|
||||
),
|
||||
"LIFE001": InsuranceProduct(
|
||||
code="LIFE001",
|
||||
name="Vida Protegida Plus",
|
||||
type=InsuranceType.LIFE,
|
||||
description="Seguro de vida con cobertura extendida y beneficios adicionales",
|
||||
coverage=[
|
||||
"Muerte natural",
|
||||
"Muerte accidental (doble indemnización)",
|
||||
"Invalidez total y permanente",
|
||||
"Enfermedades graves",
|
||||
"Gastos funerarios",
|
||||
],
|
||||
price_range="$30-100 USD/mes",
|
||||
min_coverage=50000,
|
||||
max_coverage=1000000,
|
||||
payment_options=["Mensual", "Anual"],
|
||||
waiting_period="30 días",
|
||||
available=True,
|
||||
),
|
||||
"HEALTH001": InsuranceProduct(
|
||||
code="HEALTH001",
|
||||
name="Salud Preferencial",
|
||||
type=InsuranceType.HEALTH,
|
||||
description="Plan de salud premium con cobertura internacional",
|
||||
coverage=[
|
||||
"Hospitalización",
|
||||
"Cirugías",
|
||||
"Consultas médicas",
|
||||
"Medicamentos",
|
||||
"Tratamientos especializados",
|
||||
"Cobertura internacional",
|
||||
],
|
||||
price_range="$100-300 USD/mes",
|
||||
min_coverage=100000,
|
||||
max_coverage=5000000,
|
||||
payment_options=["Mensual", "Anual"],
|
||||
waiting_period="90 días",
|
||||
available=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
class WorkflowNode(Enum):
|
||||
MAIN_MENU = "main_menu"
|
||||
CHECK_AVAILABILITY = "check_availability"
|
||||
PRODUCT_DETAILS = "product_details"
|
||||
QUOTE_REQUEST = "quote_request"
|
||||
CLAIMS = "claims"
|
||||
LOCATE_OFFICE = "locate_office"
|
||||
PAYMENT_OPTIONS = "payment_options"
|
||||
|
||||
|
||||
LATAM_LOCATIONS = {
|
||||
"Brasil": [
|
||||
{
|
||||
"city": "São Paulo",
|
||||
"offices": [
|
||||
{
|
||||
"address": "Av. Paulista, 1374 - Bela Vista",
|
||||
"phone": "+55 11 1234-5678",
|
||||
"hours": "Lun-Vie: 9:00-18:00",
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
"México": [
|
||||
{
|
||||
"city": "Ciudad de México",
|
||||
"offices": [
|
||||
{
|
||||
"address": "Paseo de la Reforma 250, Juárez",
|
||||
"phone": "+52 55 1234-5678",
|
||||
"hours": "Lun-Vie: 9:00-18:00",
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
class InsuranceBot:
|
||||
def __init__(self):
|
||||
self.agent = Agent(
|
||||
agent_name="LATAM-Insurance-Agent",
|
||||
system_prompt="""You are a specialized insurance assistant for Latin America's leading insurance provider.
|
||||
|
||||
Key Responsibilities:
|
||||
1. Product Information:
|
||||
- Explain our comprehensive insurance portfolio
|
||||
- Provide detailed coverage information
|
||||
- Compare plans and benefits
|
||||
- Quote estimates based on customer needs
|
||||
|
||||
2. Customer Service:
|
||||
- Process policy inquiries
|
||||
- Handle claims information
|
||||
- Assist with payment options
|
||||
- Locate nearest offices
|
||||
|
||||
3. Cultural Considerations:
|
||||
- Communicate in Spanish and Portuguese
|
||||
- Understand LATAM insurance regulations
|
||||
- Consider regional healthcare systems
|
||||
- Respect local customs and practices
|
||||
|
||||
Use the following simulated product database for accurate information:
|
||||
{INSURANCE_PRODUCTS}
|
||||
|
||||
When discussing products, always reference accurate prices, coverage amounts, and waiting periods.""",
|
||||
model_name="gpt-4",
|
||||
max_loops=1,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
self.current_node = WorkflowNode.MAIN_MENU
|
||||
self.current_product = None
|
||||
|
||||
async def process_user_input(self, user_input: str) -> str:
|
||||
"""Process user input and return appropriate response"""
|
||||
try:
|
||||
if self.current_node == WorkflowNode.MAIN_MENU:
|
||||
menu_choice = user_input.strip()
|
||||
|
||||
if menu_choice == "1":
|
||||
# Use agent to provide personalized product recommendations
|
||||
return await self.agent.run(
|
||||
"""Por favor ayude al cliente a elegir un producto:
|
||||
|
||||
Productos disponibles:
|
||||
- AUTO001: Seguro Auto Total
|
||||
- LIFE001: Vida Protegida Plus
|
||||
- HEALTH001: Salud Preferencial
|
||||
|
||||
Explique brevemente cada uno y solicite información sobre sus necesidades específicas."""
|
||||
)
|
||||
|
||||
elif menu_choice == "2":
|
||||
self.current_node = WorkflowNode.QUOTE_REQUEST
|
||||
# Use agent to handle quote requests
|
||||
return await self.agent.run(
|
||||
"""Inicie el proceso de cotización.
|
||||
Solicite la siguiente información de manera conversacional:
|
||||
1. Tipo de seguro
|
||||
2. Información personal básica
|
||||
3. Necesidades específicas de cobertura"""
|
||||
)
|
||||
|
||||
elif menu_choice == "3":
|
||||
return await self.agent.run(
|
||||
"""Explique el proceso de reclamos para cada tipo de seguro,
|
||||
incluyendo documentación necesaria y tiempos estimados."""
|
||||
)
|
||||
|
||||
elif menu_choice == "4":
|
||||
self.current_node = WorkflowNode.LOCATE_OFFICE
|
||||
# Use agent to provide location guidance
|
||||
return await self.agent.run(
|
||||
f"""Based on our office locations: {LATAM_LOCATIONS}
|
||||
Ask the customer for their location and help them find the nearest office.
|
||||
Provide the response in Spanish."""
|
||||
)
|
||||
|
||||
elif menu_choice == "5":
|
||||
# Use agent to explain payment options
|
||||
return await self.agent.run(
|
||||
"""Explique todas las opciones de pago disponibles,
|
||||
incluyendo métodos, frecuencias y cualquier descuento por pago anticipado."""
|
||||
)
|
||||
|
||||
elif menu_choice == "6":
|
||||
# Use agent to handle advisor connection
|
||||
return await self.agent.run(
|
||||
"""Explique el proceso para conectar con un asesor personal,
|
||||
horarios de atención y canales disponibles."""
|
||||
)
|
||||
|
||||
else:
|
||||
return await self.agent.run(
|
||||
"Explain that the option is invalid and list the main menu options."
|
||||
)
|
||||
|
||||
elif self.current_node == WorkflowNode.LOCATE_OFFICE:
|
||||
# Use agent to process location request
|
||||
return await self.agent.run(
|
||||
f"""Based on user input: '{user_input}'
|
||||
and our office locations: {LATAM_LOCATIONS}
|
||||
Help them find the most relevant office. Response in Spanish."""
|
||||
)
|
||||
|
||||
# Check if input is a product code
|
||||
if user_input.upper() in INSURANCE_PRODUCTS:
|
||||
product = self.get_product_info(user_input.upper())
|
||||
# Use agent to provide detailed product information
|
||||
return await self.agent.run(
|
||||
f"""Provide detailed information about this product:
|
||||
{self.format_product_info(product)}
|
||||
Include additional benefits and comparison with similar products.
|
||||
Response in Spanish."""
|
||||
)
|
||||
|
||||
# Handle general queries
|
||||
return await self.agent.run(
|
||||
f"""The user said: '{user_input}'
|
||||
Provide a helpful response based on our insurance products and services.
|
||||
Response in Spanish."""
|
||||
)
|
||||
|
||||
except Exception:
|
||||
self.current_node = WorkflowNode.MAIN_MENU
|
||||
return await self.agent.run(
|
||||
"Explain that there was an error and list the main menu options. Response in Spanish."
|
||||
)
|
||||
|
||||
def get_product_info(
|
||||
self, product_code: str
|
||||
) -> Optional[InsuranceProduct]:
|
||||
"""Get product information from simulated database"""
|
||||
return INSURANCE_PRODUCTS.get(product_code)
|
||||
|
||||
def format_product_info(self, product: InsuranceProduct) -> str:
|
||||
"""Format product information for display"""
|
||||
return f"""
|
||||
Producto: {product.name} (Código: {product.code})
|
||||
Tipo: {product.type.value}
|
||||
Descripción: {product.description}
|
||||
|
||||
Cobertura incluye:
|
||||
{chr(10).join(f'- {coverage}' for coverage in product.coverage)}
|
||||
|
||||
Rango de precio: {product.price_range}
|
||||
Cobertura mínima: ${product.min_coverage:,.2f} USD
|
||||
Cobertura máxima: ${product.max_coverage:,.2f} USD
|
||||
|
||||
Opciones de pago: {', '.join(product.payment_options)}
|
||||
Período de espera: {product.waiting_period}
|
||||
Estado: {'Disponible' if product.available else 'No disponible'}
|
||||
"""
|
||||
|
||||
def handle_main_menu(self) -> List[str]:
|
||||
"""Return main menu options"""
|
||||
return [
|
||||
"1. Consultar productos de seguro",
|
||||
"2. Solicitar cotización",
|
||||
"3. Información sobre reclamos",
|
||||
"4. Ubicar oficina más cercana",
|
||||
"5. Opciones de pago",
|
||||
"6. Hablar con un asesor",
|
||||
]
|
||||
|
||||
|
||||
async def main():
|
||||
"""Run the interactive session"""
|
||||
bot = InsuranceBot()
|
||||
|
||||
print(
|
||||
"Sistema de Seguros LATAM inicializado. Escriba 'salir' para terminar."
|
||||
)
|
||||
print("\nOpciones disponibles:")
|
||||
print("\n".join(bot.handle_main_menu()))
|
||||
|
||||
while True:
|
||||
user_input = input("\nUsted: ").strip()
|
||||
|
||||
if user_input.lower() in ["salir", "exit"]:
|
||||
print("¡Gracias por usar nuestro servicio!")
|
||||
break
|
||||
|
||||
response = await bot.process_user_input(user_input)
|
||||
print(f"Agente: {response}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
@ -0,0 +1,272 @@
|
||||
from typing import List, Dict
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
import asyncio
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
from swarms import Agent
|
||||
from pathlib import Path
|
||||
import json
|
||||
|
||||
|
||||
@dataclass
|
||||
class CryptoData:
|
||||
"""Real-time cryptocurrency data structure"""
|
||||
|
||||
symbol: str
|
||||
current_price: float
|
||||
market_cap: float
|
||||
total_volume: float
|
||||
price_change_24h: float
|
||||
market_cap_rank: int
|
||||
|
||||
|
||||
class DataFetcher:
|
||||
"""Handles real-time data fetching from CoinGecko"""
|
||||
|
||||
def __init__(self):
|
||||
self.base_url = "https://api.coingecko.com/api/v3"
|
||||
self.session = None
|
||||
|
||||
async def _init_session(self):
|
||||
if self.session is None:
|
||||
self.session = aiohttp.ClientSession()
|
||||
|
||||
async def close(self):
|
||||
if self.session:
|
||||
await self.session.close()
|
||||
self.session = None
|
||||
|
||||
async def get_market_data(
|
||||
self, limit: int = 20
|
||||
) -> List[CryptoData]:
|
||||
"""Fetch market data for top cryptocurrencies"""
|
||||
await self._init_session()
|
||||
|
||||
url = f"{self.base_url}/coins/markets"
|
||||
params = {
|
||||
"vs_currency": "usd",
|
||||
"order": "market_cap_desc",
|
||||
"per_page": str(limit),
|
||||
"page": "1",
|
||||
"sparkline": "false",
|
||||
}
|
||||
|
||||
try:
|
||||
async with self.session.get(
|
||||
url, params=params
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
logger.error(
|
||||
f"API Error {response.status}: {await response.text()}"
|
||||
)
|
||||
return []
|
||||
|
||||
data = await response.json()
|
||||
crypto_data = []
|
||||
|
||||
for coin in data:
|
||||
try:
|
||||
crypto_data.append(
|
||||
CryptoData(
|
||||
symbol=str(
|
||||
coin.get("symbol", "")
|
||||
).upper(),
|
||||
current_price=float(
|
||||
coin.get("current_price", 0)
|
||||
),
|
||||
market_cap=float(
|
||||
coin.get("market_cap", 0)
|
||||
),
|
||||
total_volume=float(
|
||||
coin.get("total_volume", 0)
|
||||
),
|
||||
price_change_24h=float(
|
||||
coin.get("price_change_24h", 0)
|
||||
),
|
||||
market_cap_rank=int(
|
||||
coin.get("market_cap_rank", 0)
|
||||
),
|
||||
)
|
||||
)
|
||||
except (ValueError, TypeError) as e:
|
||||
logger.error(
|
||||
f"Error processing coin data: {str(e)}"
|
||||
)
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
f"Successfully fetched data for {len(crypto_data)} coins"
|
||||
)
|
||||
return crypto_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Exception in get_market_data: {str(e)}")
|
||||
return []
|
||||
|
||||
|
||||
class CryptoSwarmSystem:
|
||||
def __init__(self):
|
||||
self.agents = self._initialize_agents()
|
||||
self.data_fetcher = DataFetcher()
|
||||
logger.info("Crypto Swarm System initialized")
|
||||
|
||||
def _initialize_agents(self) -> Dict[str, Agent]:
|
||||
"""Initialize different specialized agents"""
|
||||
base_config = {
|
||||
"max_loops": 1,
|
||||
"autosave": True,
|
||||
"dashboard": False,
|
||||
"verbose": True,
|
||||
"dynamic_temperature_enabled": True,
|
||||
"retry_attempts": 3,
|
||||
"context_length": 200000,
|
||||
"return_step_meta": False,
|
||||
"output_type": "string",
|
||||
"streaming_on": False,
|
||||
}
|
||||
|
||||
agents = {
|
||||
"price_analyst": Agent(
|
||||
agent_name="Price-Analysis-Agent",
|
||||
system_prompt="""Analyze the given cryptocurrency price data and provide insights about:
|
||||
1. Price trends and movements
|
||||
2. Notable price actions
|
||||
3. Potential support/resistance levels""",
|
||||
saved_state_path="price_agent.json",
|
||||
user_name="price_analyzer",
|
||||
**base_config,
|
||||
),
|
||||
"volume_analyst": Agent(
|
||||
agent_name="Volume-Analysis-Agent",
|
||||
system_prompt="""Analyze the given cryptocurrency volume data and provide insights about:
|
||||
1. Volume trends
|
||||
2. Notable volume spikes
|
||||
3. Market participation levels""",
|
||||
saved_state_path="volume_agent.json",
|
||||
user_name="volume_analyzer",
|
||||
**base_config,
|
||||
),
|
||||
"market_analyst": Agent(
|
||||
agent_name="Market-Analysis-Agent",
|
||||
system_prompt="""Analyze the overall cryptocurrency market data and provide insights about:
|
||||
1. Market trends
|
||||
2. Market dominance
|
||||
3. Notable market movements""",
|
||||
saved_state_path="market_agent.json",
|
||||
user_name="market_analyzer",
|
||||
**base_config,
|
||||
),
|
||||
}
|
||||
return agents
|
||||
|
||||
async def analyze_market(self) -> Dict:
|
||||
"""Run real-time market analysis using all agents"""
|
||||
try:
|
||||
# Fetch market data
|
||||
logger.info("Fetching market data for top 20 coins")
|
||||
crypto_data = await self.data_fetcher.get_market_data(20)
|
||||
|
||||
if not crypto_data:
|
||||
return {
|
||||
"error": "Failed to fetch market data",
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
|
||||
# Run analysis with each agent
|
||||
results = {}
|
||||
for agent_name, agent in self.agents.items():
|
||||
logger.info(f"Running {agent_name} analysis")
|
||||
analysis = self._run_agent_analysis(
|
||||
agent, crypto_data
|
||||
)
|
||||
results[agent_name] = analysis
|
||||
|
||||
return {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"market_data": {
|
||||
coin.symbol: {
|
||||
"price": coin.current_price,
|
||||
"market_cap": coin.market_cap,
|
||||
"volume": coin.total_volume,
|
||||
"price_change_24h": coin.price_change_24h,
|
||||
"rank": coin.market_cap_rank,
|
||||
}
|
||||
for coin in crypto_data
|
||||
},
|
||||
"analysis": results,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in market analysis: {str(e)}")
|
||||
return {
|
||||
"error": str(e),
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
|
||||
def _run_agent_analysis(
|
||||
self, agent: Agent, crypto_data: List[CryptoData]
|
||||
) -> str:
|
||||
"""Run analysis for a single agent"""
|
||||
try:
|
||||
data_str = json.dumps(
|
||||
[
|
||||
{
|
||||
"symbol": cd.symbol,
|
||||
"price": cd.current_price,
|
||||
"market_cap": cd.market_cap,
|
||||
"volume": cd.total_volume,
|
||||
"price_change_24h": cd.price_change_24h,
|
||||
"rank": cd.market_cap_rank,
|
||||
}
|
||||
for cd in crypto_data
|
||||
],
|
||||
indent=2,
|
||||
)
|
||||
|
||||
prompt = f"""Analyze this real-time cryptocurrency market data and provide detailed insights:
|
||||
{data_str}"""
|
||||
|
||||
return agent.run(prompt)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in {agent.agent_name}: {str(e)}")
|
||||
return f"Error: {str(e)}"
|
||||
|
||||
|
||||
async def main():
|
||||
# Create output directory
|
||||
Path("reports").mkdir(exist_ok=True)
|
||||
|
||||
# Initialize the swarm system
|
||||
swarm = CryptoSwarmSystem()
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Run analysis
|
||||
report = await swarm.analyze_market()
|
||||
|
||||
# Save report
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
report_path = f"reports/market_analysis_{timestamp}.json"
|
||||
|
||||
with open(report_path, "w") as f:
|
||||
json.dump(report, f, indent=2, default=str)
|
||||
|
||||
logger.info(
|
||||
f"Analysis complete. Report saved to {report_path}"
|
||||
)
|
||||
|
||||
# Wait before next analysis
|
||||
await asyncio.sleep(300) # 5 minutes
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in main loop: {str(e)}")
|
||||
await asyncio.sleep(60) # Wait 1 minute before retrying
|
||||
finally:
|
||||
if swarm.data_fetcher.session:
|
||||
await swarm.data_fetcher.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
@ -0,0 +1,263 @@
|
||||
import os
|
||||
from swarms import Agent, AgentRearrange
|
||||
from swarm_models import OpenAIChat
|
||||
|
||||
# Get the OpenAI API key from the environment variable
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
|
||||
# Create an instance of the OpenAIChat class
|
||||
model = OpenAIChat(
|
||||
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
|
||||
)
|
||||
|
||||
# Initialize the matchmaker agent (Director)
|
||||
matchmaker_agent = Agent(
|
||||
agent_name="MatchmakerAgent",
|
||||
system_prompt="""
|
||||
<agent_role>
|
||||
You are the MatchmakerAgent, the primary coordinator for managing user profiles and facilitating meaningful connections while maintaining strict privacy standards.
|
||||
</agent_role>
|
||||
|
||||
<privacy_guidelines>
|
||||
<restricted_information>
|
||||
- Full names
|
||||
- Contact information (phone, email, social media)
|
||||
- Exact location/address
|
||||
- Financial information
|
||||
- Personal identification numbers
|
||||
- Workplace specifics
|
||||
</restricted_information>
|
||||
|
||||
<shareable_information>
|
||||
- First name only
|
||||
- Age range (not exact birth date)
|
||||
- General location (city/region only)
|
||||
- Interests and hobbies
|
||||
- Relationship goals
|
||||
- General profession category
|
||||
</shareable_information>
|
||||
</privacy_guidelines>
|
||||
|
||||
<core_responsibilities>
|
||||
<task>Profile_Management</task>
|
||||
<description>
|
||||
- Review and verify user profiles for authenticity
|
||||
- Ensure all shared information adheres to privacy guidelines
|
||||
- Flag any potential security concerns
|
||||
</description>
|
||||
|
||||
<task>Match_Coordination</task>
|
||||
<description>
|
||||
- Analyze compatibility factors between users
|
||||
- Prioritize matches based on shared interests and goals
|
||||
- Monitor interaction patterns for safety and satisfaction
|
||||
</description>
|
||||
|
||||
<task>Communication_Flow</task>
|
||||
<description>
|
||||
- Coordinate information exchange between ProfileAnalyzer and ConnectionFacilitator
|
||||
- Ensure smooth transition of approved information
|
||||
- Maintain audit trail of information sharing
|
||||
</description>
|
||||
</core_responsibilities>
|
||||
|
||||
<ethical_guidelines>
|
||||
<principle>Consent_First</principle>
|
||||
<description>Never share information without explicit user consent</description>
|
||||
|
||||
<principle>Safety_Priority</principle>
|
||||
<description>Prioritize user safety and privacy over match potential</description>
|
||||
|
||||
<principle>Transparency</principle>
|
||||
<description>Be clear about what information is being shared and why</description>
|
||||
</ethical_guidelines>
|
||||
""",
|
||||
llm=model,
|
||||
max_loops=1,
|
||||
dashboard=False,
|
||||
streaming_on=True,
|
||||
verbose=True,
|
||||
stopping_token="<DONE>",
|
||||
state_save_file_type="json",
|
||||
saved_state_path="matchmaker_agent.json",
|
||||
)
|
||||
|
||||
# Initialize worker 1: Profile Analyzer
|
||||
profile_analyzer = Agent(
|
||||
agent_name="ProfileAnalyzer",
|
||||
system_prompt="""
|
||||
<agent_role>
|
||||
You are the ProfileAnalyzer, responsible for deeply understanding user profiles and identifying meaningful compatibility factors while maintaining strict privacy protocols.
|
||||
</agent_role>
|
||||
|
||||
<data_handling>
|
||||
<sensitive_data>
|
||||
<storage>
|
||||
- All sensitive information must be encrypted
|
||||
- Access logs must be maintained
|
||||
- Data retention policies must be followed
|
||||
</storage>
|
||||
|
||||
<processing>
|
||||
- Use anonymized IDs for internal processing
|
||||
- Apply privacy-preserving analysis techniques
|
||||
- Implement data minimization principles
|
||||
</processing>
|
||||
</sensitive_data>
|
||||
|
||||
<analysis_parameters>
|
||||
<compatibility_metrics>
|
||||
- Shared interests alignment
|
||||
- Relationship goal compatibility
|
||||
- Value system overlap
|
||||
- Lifestyle compatibility
|
||||
- Communication style matching
|
||||
</compatibility_metrics>
|
||||
|
||||
<red_flags>
|
||||
- Inconsistent information
|
||||
- Suspicious behavior patterns
|
||||
- Policy violations
|
||||
- Safety concerns
|
||||
</red_flags>
|
||||
</analysis_parameters>
|
||||
</data_handling>
|
||||
|
||||
<output_guidelines>
|
||||
<match_analysis>
|
||||
- Generate compatibility scores
|
||||
- Identify shared interests and potential conversation starters
|
||||
- Flag potential concerns for review
|
||||
- Provide reasoning for match recommendations
|
||||
</match_analysis>
|
||||
|
||||
<privacy_filters>
|
||||
- Apply progressive information disclosure rules
|
||||
- Implement multi-stage verification for sensitive data sharing
|
||||
- Maintain audit trails of information access
|
||||
</privacy_filters>
|
||||
</output_guidelines>
|
||||
""",
|
||||
llm=model,
|
||||
max_loops=1,
|
||||
dashboard=False,
|
||||
streaming_on=True,
|
||||
verbose=True,
|
||||
stopping_token="<DONE>",
|
||||
state_save_file_type="json",
|
||||
saved_state_path="profile_analyzer.json",
|
||||
)
|
||||
|
||||
# Initialize worker 2: Connection Facilitator
|
||||
connection_facilitator = Agent(
|
||||
agent_name="ConnectionFacilitator",
|
||||
system_prompt="""
|
||||
<agent_role>
|
||||
You are the ConnectionFacilitator, responsible for managing the interaction between matched users and ensuring smooth, safe, and meaningful communication.
|
||||
</agent_role>
|
||||
|
||||
<communication_protocols>
|
||||
<stages>
|
||||
<stage name="initial_contact">
|
||||
- Manage introduction messages
|
||||
- Monitor response patterns
|
||||
- Flag any concerning behavior
|
||||
</stage>
|
||||
|
||||
<stage name="ongoing_interaction">
|
||||
- Track engagement levels
|
||||
- Identify conversation quality indicators
|
||||
- Provide conversation suggestions when appropriate
|
||||
</stage>
|
||||
|
||||
<stage name="milestone_tracking">
|
||||
- Monitor relationship progression
|
||||
- Record user feedback
|
||||
- Update matching algorithms based on successful connections
|
||||
</stage>
|
||||
</stages>
|
||||
|
||||
<safety_measures>
|
||||
<content_filtering>
|
||||
- Screen for inappropriate content
|
||||
- Block prohibited information sharing
|
||||
- Monitor for harassment or abuse
|
||||
</content_filtering>
|
||||
|
||||
<privacy_protection>
|
||||
- Implement progressive contact information sharing
|
||||
- Maintain anonymized communication channels
|
||||
- Protect user identity until mutual consent
|
||||
</privacy_protection>
|
||||
</safety_measures>
|
||||
</communication_protocols>
|
||||
|
||||
<feedback_system>
|
||||
<metrics>
|
||||
- User engagement rates
|
||||
- Communication quality scores
|
||||
- Safety incident reports
|
||||
- User satisfaction ratings
|
||||
</metrics>
|
||||
|
||||
<improvement_loop>
|
||||
- Collect interaction data
|
||||
- Analyze success patterns
|
||||
- Implement refinements to matching criteria
|
||||
- Update safety protocols as needed
|
||||
</improvement_loop>
|
||||
</feedback_system>
|
||||
""",
|
||||
llm=model,
|
||||
max_loops=1,
|
||||
dashboard=False,
|
||||
streaming_on=True,
|
||||
verbose=True,
|
||||
stopping_token="<DONE>",
|
||||
state_save_file_type="json",
|
||||
saved_state_path="connection_facilitator.json",
|
||||
)
|
||||
|
||||
# Swarm-Level Prompt (Collaboration Prompt)
|
||||
swarm_prompt = """
|
||||
As a dating platform swarm, your collective goal is to facilitate meaningful connections while maintaining
|
||||
the highest standards of privacy and safety. The MatchmakerAgent oversees the entire matching process,
|
||||
coordinating between the ProfileAnalyzer who deeply understands user compatibility, and the ConnectionFacilitator
|
||||
who manages the development of connections. Together, you must ensure that:
|
||||
|
||||
1. User privacy is maintained at all times
|
||||
2. Information is shared progressively and with consent
|
||||
3. Safety protocols are strictly followed
|
||||
4. Meaningful connections are prioritized over quantity
|
||||
5. User experience remains positive and engaging
|
||||
"""
|
||||
|
||||
# Create a list of agents
|
||||
agents = [matchmaker_agent, profile_analyzer, connection_facilitator]
|
||||
|
||||
# Define the flow pattern for the swarm
|
||||
flow = "MatchmakerAgent -> ProfileAnalyzer -> ConnectionFacilitator"
|
||||
|
||||
# Using AgentRearrange class to manage the swarm
|
||||
agent_system = AgentRearrange(
|
||||
name="dating-swarm",
|
||||
description="Privacy-focused dating platform agent system",
|
||||
agents=agents,
|
||||
flow=flow,
|
||||
return_json=False,
|
||||
output_type="final",
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
# Example task for the swarm
|
||||
task = f"""
|
||||
{swarm_prompt}
|
||||
|
||||
Process a new batch of user profiles and identify potential matches while ensuring all privacy protocols
|
||||
are followed. For each potential match, provide compatibility reasoning and suggested conversation
|
||||
starters without revealing any restricted information.
|
||||
"""
|
||||
|
||||
# Run the swarm system with the task
|
||||
output = agent_system.run(task)
|
||||
print(output)
|
@ -0,0 +1,31 @@
|
||||
from swarms import Agent
|
||||
from swarms.prompts.finance_agent_sys_prompt import (
|
||||
FINANCIAL_AGENT_SYS_PROMPT,
|
||||
)
|
||||
|
||||
# Initialize the agent
|
||||
agent = Agent(
|
||||
agent_name="Financial-Analysis-Agent",
|
||||
agent_description="Personal finance advisor agent",
|
||||
system_prompt=FINANCIAL_AGENT_SYS_PROMPT
|
||||
+ "Output the <DONE> token when you're done creating a portfolio of etfs, index, funds, and more for AI",
|
||||
max_loops=1,
|
||||
model_name="openai/gpt-4o",
|
||||
dynamic_temperature_enabled=True,
|
||||
user_name="Kye",
|
||||
retry_attempts=3,
|
||||
# streaming_on=True,
|
||||
context_length=8192,
|
||||
return_step_meta=False,
|
||||
output_type="str", # "json", "dict", "csv" OR "string" "yaml" and
|
||||
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
|
||||
max_tokens=4000, # max output tokens
|
||||
# interactive=True,
|
||||
stopping_token="<DONE>",
|
||||
saved_state_path="agent_00.json",
|
||||
interactive=False,
|
||||
)
|
||||
|
||||
agent.run(
|
||||
"Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
|
||||
)
|
@ -0,0 +1,5 @@
|
||||
import uuid
|
||||
|
||||
|
||||
def generate_swarm_id():
|
||||
return str(uuid.uuid4())
|
@ -1,27 +1,65 @@
|
||||
import os
|
||||
import logging
|
||||
import warnings
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import concurrent.futures
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from swarms.utils.disable_logging import disable_logging
|
||||
|
||||
|
||||
def bootup():
|
||||
"""Bootup swarms"""
|
||||
"""Initialize swarms environment and configuration
|
||||
|
||||
Handles environment setup, logging configuration, telemetry,
|
||||
and workspace initialization.
|
||||
"""
|
||||
try:
|
||||
logging.disable(logging.CRITICAL)
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Configure logging
|
||||
if (
|
||||
os.getenv("SWARMS_VERBOSE_GLOBAL", "False").lower()
|
||||
== "false"
|
||||
):
|
||||
logger.disable("")
|
||||
logging.disable(logging.CRITICAL)
|
||||
|
||||
# Silent wandb
|
||||
os.environ["WANDB_SILENT"] = "true"
|
||||
|
||||
# Auto set workspace directory
|
||||
# Configure workspace
|
||||
workspace_dir = os.path.join(os.getcwd(), "agent_workspace")
|
||||
if not os.path.exists(workspace_dir):
|
||||
os.makedirs(workspace_dir, exist_ok=True)
|
||||
os.makedirs(workspace_dir, exist_ok=True)
|
||||
os.environ["WORKSPACE_DIR"] = workspace_dir
|
||||
|
||||
# Suppress warnings
|
||||
warnings.filterwarnings("ignore", category=DeprecationWarning)
|
||||
|
||||
# Use ThreadPoolExecutor to run disable_logging and auto_update concurrently
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
executor.submit(disable_logging)
|
||||
# Run telemetry functions concurrently
|
||||
try:
|
||||
with concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=2
|
||||
) as executor:
|
||||
from swarms.telemetry.sentry_active import (
|
||||
activate_sentry,
|
||||
)
|
||||
|
||||
future_disable_logging = executor.submit(
|
||||
disable_logging
|
||||
)
|
||||
future_sentry = executor.submit(activate_sentry)
|
||||
|
||||
# Wait for completion and check for exceptions
|
||||
future_disable_logging.result()
|
||||
future_sentry.result()
|
||||
except Exception as e:
|
||||
logger.error(f"Error running telemetry functions: {e}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {str(e)}")
|
||||
logger.error(f"Error during bootup: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
# Run bootup
|
||||
bootup()
|
||||
|
Loading…
Reference in new issue