[DOCS][MCS API]

pull/707/head
Kye Gomez 1 week ago
parent 17e2933b29
commit b059bf299c

@ -216,11 +216,12 @@ nav:
- BaseMultiModalModel: "swarms/models/base_multimodal_model.md"
- Multi Modal Models Available: "swarms/models/multimodal_models.md"
- GPT4VisionAPI: "swarms/models/gpt4v.md"
# - Swarms Cloud API:
# # - Overview: "swarms_cloud/main.md"
# - Overview: "swarms_cloud/vision.md"
# - Swarms Cloud CLI: "swarms_cloud/cli.md"
# # - Add Agents to Marketplace: "swarms_cloud/add_agent.md"
- Swarms Cloud API:
# - Overview: "swarms_cloud/main.md"
- Overview: "swarms_cloud/vision.md"
- MCS API: "swarms_cloud/mcs_api.md"
- Swarms Cloud CLI: "swarms_cloud/cli.md"
# - Add Agents to Marketplace: "swarms_cloud/add_agent.md"
# - Available Models: "swarms_cloud/available_models.md"
# - Agent API: "swarms_cloud/agent_api.md"
# - Migrate from OpenAI to Swarms in 3 lines of code: "swarms_cloud/migrate_openai.md"

@ -0,0 +1,602 @@
# Medical Coder Swarm API Documentation
Base URL: `https://mcs-285321057562.us-central1.run.app`
## Table of Contents
- [Authentication](#authentication)
- [Rate Limits](#rate-limits)
- [Endpoints](#endpoints)
- [Health Check](#health-check)
- [Run Medical Coder](#run-medical-coder)
- [Run Batch Medical Coder](#run-batch-medical-coder)
- [Get Patient Data](#get-patient-data)
- [Get All Patients](#get-all-patients)
- [Code Examples](#code-examples)
- [Error Handling](#error-handling)
## Authentication
Authentication details will be provided by the MCS team. Contact support for API credentials.
## Rate Limits
| Endpoint | GET Rate Limit Status |
|----------|----------------------|
| `GET /rate-limits` | Returns current rate limit status for your IP address |
## Endpoints
### Health Check
Check if the API is operational.
| Method | Endpoint | Description |
|--------|----------|-------------|
| `GET` | `/health` | Returns 200 OK if service is running |
### Run Medical Coder
Process a single patient case through the Medical Coder Swarm.
| Method | Endpoint | Description |
|--------|----------|-------------|
| `POST` | `/v1/medical-coder/run` | Process a single patient case |
**Request Body Parameters:**
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| patient_id | string | Yes | Unique identifier for the patient |
| case_description | string | Yes | Medical case details to be processed |
**Response Schema:**
| Field | Type | Description |
|-------|------|-------------|
| patient_id | string | Patient identifier |
| case_data | string | Processed case data |
### Run Batch Medical Coder
Process multiple patient cases in a single request.
| Method | Endpoint | Description |
|--------|----------|-------------|
| `POST` | `/v1/medical-coder/run-batch` | Process multiple patient cases |
**Request Body Parameters:**
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| cases | array | Yes | Array of PatientCase objects |
### Get Patient Data
Retrieve data for a specific patient.
| Method | Endpoint | Description |
|--------|----------|-------------|
| `GET` | `/v1/medical-coder/patient/{patient_id}` | Get patient data by ID |
**Path Parameters:**
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| patient_id | string | Yes | Patient identifier |
### Get All Patients
Retrieve data for all patients.
| Method | Endpoint | Description |
|--------|----------|-------------|
| `GET` | `/v1/medical-coder/patients` | Get all patient data |
## Code Examples
### Python
```python
import requests
import json
class MCSClient:
def __init__(self, base_url="https://mcs.swarms.ai", api_key=None):
self.base_url = base_url
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}" if api_key else None
}
def run_medical_coder(self, patient_id, case_description):
endpoint = f"{self.base_url}/v1/medical-coder/run"
payload = {
"patient_id": patient_id,
"case_description": case_description
}
response = requests.post(endpoint, json=payload, headers=self.headers)
return response.json()
def run_batch(self, cases):
endpoint = f"{self.base_url}/v1/medical-coder/run-batch"
payload = {"cases": cases}
response = requests.post(endpoint, json=payload, headers=self.headers)
return response.json()
# Usage example
client = MCSClient(api_key="your_api_key")
result = client.run_medical_coder("P123", "Patient presents with...")
```
### Next.js (TypeScript)
```typescript
// types.ts
interface PatientCase {
patient_id: string;
case_description: string;
}
interface QueryResponse {
patient_id: string;
case_data: string;
}
// api.ts
export class MCSApi {
private baseUrl: string;
private apiKey: string;
constructor(apiKey: string, baseUrl = 'https://mcs.swarms.ai') {
this.baseUrl = baseUrl;
this.apiKey = apiKey;
}
private async fetchWithAuth(endpoint: string, options: RequestInit = {}) {
const response = await fetch(`${this.baseUrl}${endpoint}`, {
...options,
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.apiKey}`,
...options.headers,
},
});
return response.json();
}
async runMedicalCoder(patientCase: PatientCase): Promise<QueryResponse> {
return this.fetchWithAuth('/v1/medical-coder/run', {
method: 'POST',
body: JSON.stringify(patientCase),
});
}
async getPatientData(patientId: string): Promise<QueryResponse> {
return this.fetchWithAuth(`/v1/medical-coder/patient/${patientId}`);
}
}
// Usage in component
const mcsApi = new MCSApi(process.env.MCS_API_KEY);
export async function ProcessPatientCase({ patientId, caseDescription }) {
const result = await mcsApi.runMedicalCoder({
patient_id: patientId,
case_description: caseDescription,
});
return result;
}
```
### Go
```go
package mcs
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
type MCSClient struct {
BaseURL string
APIKey string
Client *http.Client
}
type PatientCase struct {
PatientID string `json:"patient_id"`
CaseDescription string `json:"case_description"`
}
type QueryResponse struct {
PatientID string `json:"patient_id"`
CaseData string `json:"case_data"`
}
func NewMCSClient(apiKey string) *MCSClient {
return &MCSClient{
BaseURL: "https://mcs.swarms.ai",
APIKey: apiKey,
Client: &http.Client{},
}
}
func (c *MCSClient) RunMedicalCoder(patientCase PatientCase) (*QueryResponse, error) {
payload, err := json.Marshal(patientCase)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST",
fmt.Sprintf("%s/v1/medical-coder/run", c.BaseURL),
bytes.NewBuffer(payload))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
resp, err := c.Client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result QueryResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return &result, nil
}
// Usage example
func main() {
client := NewMCSClient("your_api_key")
result, err := client.RunMedicalCoder(PatientCase{
PatientID: "P123",
CaseDescription: "Patient presents with...",
})
if err != nil {
panic(err)
}
fmt.Printf("Result: %+v\n", result)
}
```
## Error Handling
The API uses standard HTTP status codes and returns detailed error messages in JSON format.
**Common Status Codes:**
| Status Code | Description |
|-------------|-------------|
| 200 | Success |
| 400 | Bad Request - Invalid input |
| 401 | Unauthorized - Invalid or missing API key |
| 422 | Validation Error - Request validation failed |
| 429 | Too Many Requests - Rate limit exceeded |
| 500 | Internal Server Error |
**Error Response Format:**
```json
{
"detail": [
{
"loc": ["body", "patient_id"],
"msg": "field required",
"type": "value_error.missing"
}
]
}
```
# MCS Python Client Documentation
## Installation
```bash
pip install mcs
```
## Quick Start
```python
from mcs import MCSClient, PatientCase
# Using context manager (recommended)
with MCSClient() as client:
# Process a single case
response = client.run_medical_coder(
patient_id="P123",
case_description="Patient presents with acute respiratory symptoms..."
)
print(f"Processed case: {response.case_data}")
# Process multiple cases
cases = [
PatientCase("P124", "Case 1 description..."),
PatientCase("P125", "Case 2 description...")
]
batch_response = client.run_batch(cases)
```
## Client Configuration
### Constructor Arguments
| Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| api_key | str | Yes | - | Authentication API key |
| base_url | str | No | "https://mcs.swarms.ai" | API base URL |
| timeout | int | No | 30 | Request timeout in seconds |
| max_retries | int | No | 3 | Maximum retry attempts |
| logger_name | str | No | "mcs" | Name for the logger instance |
### Example Configuration
```python
client = MCSClient(
,
base_url="https://custom-url.example.com",
timeout=45,
max_retries=5,
logger_name="custom_logger"
)
```
## Data Models
### PatientCase
| Field | Type | Required | Description |
|-------|------|----------|-------------|
| patient_id | str | Yes | Unique identifier for the patient |
| case_description | str | Yes | Medical case details |
### QueryResponse
| Field | Type | Description |
|-------|------|-------------|
| patient_id | str | Patient identifier |
| case_data | str | Processed case data |
## Methods
### run_medical_coder
Process a single patient case.
```python
def run_medical_coder(
self,
patient_id: str,
case_description: str
) -> QueryResponse:
```
**Arguments:**
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| patient_id | str | Yes | Patient identifier |
| case_description | str | Yes | Case details |
**Example:**
```python
response = client.run_medical_coder(
patient_id="P123",
case_description="Patient presents with..."
)
print(response.case_data)
```
### run_batch
Process multiple patient cases in batch.
```python
def run_batch(
self,
cases: List[PatientCase]
) -> List[QueryResponse]:
```
**Arguments:**
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| cases | List[PatientCase] | Yes | List of patient cases |
**Example:**
```python
cases = [
PatientCase("P124", "Case 1 description..."),
PatientCase("P125", "Case 2 description...")
]
responses = client.run_batch(cases)
for response in responses:
print(f"Patient {response.patient_id}: {response.case_data}")
```
### get_patient_data
Retrieve data for a specific patient.
```python
def get_patient_data(
self,
patient_id: str
) -> QueryResponse:
```
**Example:**
```python
patient_data = client.get_patient_data("P123")
print(f"Patient data: {patient_data.case_data}")
```
### get_all_patients
Retrieve data for all patients.
```python
def get_all_patients(self) -> List[QueryResponse]:
```
**Example:**
```python
all_patients = client.get_all_patients()
for patient in all_patients:
print(f"Patient {patient.patient_id}: {patient.case_data}")
```
### get_rate_limits
Get current rate limit status.
```python
def get_rate_limits(self) -> Dict[str, Any]:
```
**Example:**
```python
rate_limits = client.get_rate_limits()
print(f"Rate limit status: {rate_limits}")
```
### health_check
Check if the API is operational.
```python
def health_check(self) -> bool:
```
**Example:**
```python
is_healthy = client.health_check()
print(f"API health: {'Healthy' if is_healthy else 'Unhealthy'}")
```
## Error Handling
### Exception Hierarchy
| Exception | Description |
|-----------|-------------|
| MCSClientError | Base exception for all client errors |
| RateLimitError | Raised when API rate limit is exceeded |
| AuthenticationError | Raised when API authentication fails |
| ValidationError | Raised when request validation fails |
### Example Error Handling
```python
from mcs import MCSClient, MCSClientError, RateLimitError
with MCSClient() as client:
try:
response = client.run_medical_coder("P123", "Case description...")
except RateLimitError:
print("Rate limit exceeded. Please wait before retrying.")
except MCSClientError as e:
print(f"An error occurred: {str(e)}")
```
## Advanced Usage
### Retry Configuration
The client implements two levels of retry logic:
1. Connection-level retries (using `HTTPAdapter`):
```python
client = MCSClient(
,
max_retries=5 # Adjusts connection-level retries
)
```
2. Application-level retries (using `tenacity`):
```python
from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(5))
def process_with_custom_retries():
with MCSClient() as client:
return client.run_medical_coder("P123", "Case description...")
```
### Batch Processing with Progress Tracking
```python
from tqdm import tqdm
with MCSClient() as client:
cases = [
PatientCase(f"P{i}", f"Case description {i}")
for i in range(100)
]
# Process in smaller batches
batch_size = 10
results = []
for i in tqdm(range(0, len(cases), batch_size)):
batch = cases[i:i + batch_size]
batch_results = client.run_batch(batch)
results.extend(batch_results)
```
## Best Practices
1. **Always use context managers:**
```python
with MCSClient() as client:
# Your code here
pass
```
2. **Handle rate limits appropriately:**
```python
from time import sleep
def process_with_rate_limit_handling():
with MCSClient() as client:
try:
return client.run_medical_coder("P123", "Case...")
except RateLimitError:
sleep(60) # Wait before retry
return client.run_medical_coder("P123", "Case...")
```
3. **Implement proper logging:**
```python
from loguru import logger
logger.add("mcs.log", rotation="500 MB")
with MCSClient() as client:
try:
response = client.run_medical_coder("P123", "Case...")
except Exception as e:
logger.exception(f"Error processing case: {str(e)}")
```
4. **Monitor API health:**
```python
def ensure_healthy_api():
with MCSClient() as client:
if not client.health_check():
raise SystemExit("API is not healthy")
```

@ -0,0 +1,96 @@
import os
from swarm_models import OpenAIChat
from swarms import Agent, run_agents_with_tasks_concurrently
# Fetch the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize agents for different roles
delaware_ccorp_agent = Agent(
agent_name="Delaware-CCorp-Hiring-Agent",
system_prompt="""
Create a comprehensive hiring description for a Delaware C Corporation,
including all relevant laws and regulations, such as the Delaware General
Corporation Law (DGCL) and the Delaware Corporate Law. Ensure the description
covers the requirements for hiring employees, contractors, and officers,
including the necessary paperwork, tax obligations, and benefits. Also,
outline the procedures for compliance with Delaware's employment laws,
including anti-discrimination laws, workers' compensation, and unemployment
insurance. Provide guidance on how to navigate the complexities of Delaware's
corporate law and ensure that all hiring practices are in compliance with
state and federal regulations.
""",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
output_type="str",
artifacts_on=True,
artifacts_output_path="delaware_ccorp_hiring_description.md",
artifacts_file_extension=".md",
)
indian_foreign_agent = Agent(
agent_name="Indian-Foreign-Hiring-Agent",
system_prompt="""
Create a comprehensive hiring description for an Indian or foreign country,
including all relevant laws and regulations, such as the Indian Contract Act,
the Indian Labour Laws, and the Foreign Exchange Management Act (FEMA).
Ensure the description covers the requirements for hiring employees,
contractors, and officers, including the necessary paperwork, tax obligations,
and benefits. Also, outline the procedures for compliance with Indian and
foreign employment laws, including anti-discrimination laws, workers'
compensation, and unemployment insurance. Provide guidance on how to navigate
the complexities of Indian and foreign corporate law and ensure that all hiring
practices are in compliance with state and federal regulations. Consider the
implications of hiring foreign nationals and the requirements for obtaining
necessary visas and work permits.
""",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
output_type="str",
artifacts_on=True,
artifacts_output_path="indian_foreign_hiring_description.md",
artifacts_file_extension=".md",
)
# List of agents and corresponding tasks
agents = [delaware_ccorp_agent, indian_foreign_agent]
tasks = [
"""
Create a comprehensive hiring description for an Agent Engineer, including
required skills and responsibilities. Ensure the description covers the
necessary technical expertise, such as proficiency in AI/ML frameworks,
programming languages, and data structures. Outline the key responsibilities,
including designing and developing AI agents, integrating with existing systems,
and ensuring scalability and performance.
""",
"""
Generate a detailed job description for a Prompt Engineer, including
required skills and responsibilities. Ensure the description covers the
necessary technical expertise, such as proficiency in natural language processing,
machine learning, and software development. Outline the key responsibilities,
including designing and optimizing prompts for AI systems, ensuring prompt
quality and consistency, and collaborating with cross-functional teams.
""",
]
# Run agents with tasks concurrently
results = run_agents_with_tasks_concurrently(
agents, tasks, all_cores=True, device="cpu", no_clusterops=True
)
# Print the results
# for result in results:
# print(result)

@ -0,0 +1,111 @@
import os
from dotenv import load_dotenv
from swarm_models import OpenAIChat
from swarms import Agent, GroupChat
if __name__ == "__main__":
load_dotenv()
api_key = os.getenv("GROQ_API_KEY")
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# General Crypto Tax Strategist
agent1 = Agent(
agent_name="Token-Tax-Strategist",
system_prompt="""You are a cryptocurrency tax specialist focusing on token trading in Florida. Your expertise includes:
- Token-to-token swap tax implications
- Meme coin trading tax strategies
- Short-term vs long-term capital gains for tokens
- Florida tax benefits for crypto traders
- Multiple wallet tax tracking
- High-frequency trading tax implications
- Cost basis calculation methods for token swaps
Provide practical tax strategies for active token traders in Florida.""",
llm=model,
max_loops=1,
dynamic_temperature_enabled=True,
user_name="swarms_corp",
output_type="string",
streaming_on=True,
)
# Compliance and Reporting Agent
agent2 = Agent(
agent_name="Florida-Compliance-Expert",
system_prompt="""You are a Florida-based crypto tax compliance expert specializing in:
- Form 8949 preparation for high-volume token trades
- Schedule D reporting for memecoins
- Tax loss harvesting for volatile tokens
- Proper documentation for DEX transactions
- Reporting requirements for airdrops and forks
- Multi-exchange transaction reporting
- Wash sale considerations for tokens
Focus on compliance strategies for active memecoin and token traders.""",
llm=model,
max_loops=1,
dynamic_temperature_enabled=True,
user_name="swarms_corp",
output_type="string",
streaming_on=True,
)
# DeFi and DEX Specialist
agent3 = Agent(
agent_name="DeFi-Tax-Specialist",
system_prompt="""You are a DeFi tax expert focusing on:
- DEX trading tax implications
- Liquidity pool tax treatment
- Token bridging tax considerations
- Gas fee deduction strategies
- Failed transaction tax handling
- Cross-chain transaction reporting
- Impermanent loss tax treatment
- Flash loan tax implications
Specialize in DeFi platform tax optimization for Florida traders.""",
llm=model,
max_loops=1,
dynamic_temperature_enabled=True,
user_name="swarms_corp",
output_type="string",
streaming_on=True,
)
# Memecoin and Token Analysis Agent
agent4 = Agent(
agent_name="Memecoin-Analysis-Expert",
system_prompt="""You are a memecoin and token tax analysis expert specializing in:
- Memecoin volatility tax implications
- Airdrop and token distribution tax treatment
- Social token tax considerations
- Reflective token tax handling
- Rebase token tax implications
- Token burn tax treatment
- Worthless token write-offs
- Pre-sale and fair launch tax strategies
Provide expert guidance on memecoin and new token tax scenarios.""",
llm=model,
max_loops=1,
dynamic_temperature_enabled=True,
user_name="swarms_corp",
output_type="string",
streaming_on=True,
)
agents = [agent1, agent2, agent3, agent4]
chat = GroupChat(
name="Florida Token Tax Advisory",
description="Specialized group for memecoin and token tax analysis, compliance, and DeFi trading in Florida",
agents=agents,
)
# Example query focused on memecoin trading
history = chat.run(
"I'm trading memecoins and tokens on various DEXs from Florida. How should I handle my taxes for multiple token swaps, failed transactions, and potential losses? I have made alot of money and paid team members, delaware c corp, using crypto to pay my team"
)
print(history.model_dump_json(indent=2))

@ -0,0 +1,265 @@
import os
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize the gatekeeper agent
gatekeeper_agent = Agent(
agent_name="HealthScoreGatekeeper",
system_prompt="""
<role>
<title>Health Score Privacy Gatekeeper</title>
<primary_responsibility>Protect and manage sensitive health information while providing necessary access to authorized agents</primary_responsibility>
</role>
<capabilities>
<security>
<encryption>Manage encryption of health scores</encryption>
<access_control>Implement strict access control mechanisms</access_control>
<audit>Track and log all access requests</audit>
</security>
<data_handling>
<anonymization>Remove personally identifiable information</anonymization>
<transformation>Convert raw health data into privacy-preserving formats</transformation>
</data_handling>
</capabilities>
<protocols>
<data_access>
<verification>
<step>Verify agent authorization level</step>
<step>Check request legitimacy</step>
<step>Validate purpose of access</step>
</verification>
<response_format>
<health_score>Numerical value only</health_score>
<metadata>Anonymized timestamp and request ID</metadata>
</response_format>
</data_access>
<privacy_rules>
<patient_data>Never expose patient names or identifiers</patient_data>
<health_history>No access to historical data without explicit authorization</health_history>
<aggregation>Provide only aggregated or anonymized data when possible</aggregation>
</privacy_rules>
</protocols>
<compliance>
<standards>
<hipaa>Maintain HIPAA compliance</hipaa>
<gdpr>Follow GDPR guidelines for data protection</gdpr>
</standards>
<audit_trail>
<logging>Record all data access events</logging>
<monitoring>Track unusual access patterns</monitoring>
</audit_trail>
</compliance>
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="gatekeeper_agent.json",
)
# Initialize the boss agent (Director)
boss_agent = Agent(
agent_name="BossAgent",
system_prompt="""
<role>
<title>Swarm Director</title>
<purpose>Orchestrate and manage agent collaboration while respecting privacy boundaries</purpose>
</role>
<responsibilities>
<coordination>
<task_management>Assign and prioritize tasks</task_management>
<workflow_optimization>Ensure efficient collaboration</workflow_optimization>
<privacy_compliance>Maintain privacy protocols</privacy_compliance>
</coordination>
<oversight>
<performance_monitoring>Track agent effectiveness</performance_monitoring>
<quality_control>Ensure accuracy of outputs</quality_control>
<security_compliance>Enforce data protection policies</security_compliance>
</oversight>
</responsibilities>
<interaction_protocols>
<health_score_access>
<authorization>Request access through gatekeeper only</authorization>
<handling>Process only anonymized health scores</handling>
<distribution>Share authorized information on need-to-know basis</distribution>
</health_score_access>
<communication>
<format>Structured, secure messaging</format>
<encryption>End-to-end encrypted channels</encryption>
</communication>
</interaction_protocols>
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="boss_agent.json",
)
# Initialize worker 1: Health Score Analyzer
worker1 = Agent(
agent_name="HealthScoreAnalyzer",
system_prompt="""
<role>
<title>Health Score Analyst</title>
<purpose>Analyze anonymized health scores for patterns and insights</purpose>
</role>
<capabilities>
<analysis>
<statistical_processing>Advanced statistical analysis</statistical_processing>
<pattern_recognition>Identify health trends</pattern_recognition>
<risk_assessment>Evaluate health risk factors</risk_assessment>
</analysis>
<privacy_compliance>
<data_handling>Work only with anonymized data</data_handling>
<secure_processing>Use encrypted analysis methods</secure_processing>
</privacy_compliance>
</capabilities>
<protocols>
<data_access>
<request_procedure>
<step>Submit authenticated requests to gatekeeper</step>
<step>Process only authorized data</step>
<step>Maintain audit trail</step>
</request_procedure>
</data_access>
<reporting>
<anonymization>Ensure no identifiable information in reports</anonymization>
<aggregation>Present aggregate statistics only</aggregation>
</reporting>
</protocols>
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker1.json",
)
# Initialize worker 2: Report Generator
worker2 = Agent(
agent_name="ReportGenerator",
system_prompt="""
<role>
<title>Privacy-Conscious Report Generator</title>
<purpose>Create secure, anonymized health score reports</purpose>
</role>
<capabilities>
<reporting>
<format>Generate standardized, secure reports</format>
<anonymization>Apply privacy-preserving techniques</anonymization>
<aggregation>Compile statistical summaries</aggregation>
</reporting>
<security>
<data_protection>Implement secure report generation</data_protection>
<access_control>Manage report distribution</access_control>
</security>
</capabilities>
<protocols>
<report_generation>
<privacy_rules>
<rule>No personal identifiers in reports</rule>
<rule>Aggregate data when possible</rule>
<rule>Apply statistical noise for privacy</rule>
</privacy_rules>
<distribution>
<access>Restricted to authorized personnel</access>
<tracking>Monitor report access</tracking>
</distribution>
</report_generation>
</protocols>
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker2.json",
)
# Swarm-Level Prompt (Collaboration Prompt)
swarm_prompt = """
<swarm_configuration>
<objective>Process and analyze health scores while maintaining strict privacy controls</objective>
<workflow>
<step>
<agent>HealthScoreGatekeeper</agent>
<action>Receive and validate data access requests</action>
<output>Anonymized health scores</output>
</step>
<step>
<agent>BossAgent</agent>
<action>Coordinate analysis and reporting tasks</action>
<privacy_control>Enforce data protection protocols</privacy_control>
</step>
<step>
<agent>HealthScoreAnalyzer</agent>
<action>Process authorized health score data</action>
<constraints>Work only with anonymized information</constraints>
</step>
<step>
<agent>ReportGenerator</agent>
<action>Create privacy-preserving reports</action>
<output>Secure, anonymized insights</output>
</step>
</workflow>
</swarm_configuration>
"""
# Create a list of agents
agents = [gatekeeper_agent, boss_agent, worker1, worker2]
# Define the flow pattern for the swarm
flow = "HealthScoreGatekeeper -> BossAgent -> HealthScoreAnalyzer -> ReportGenerator"
# Using AgentRearrange class to manage the swarm
agent_system = AgentRearrange(
name="health-score-swarm",
description="Privacy-focused health score analysis system",
agents=agents,
flow=flow,
return_json=False,
output_type="final",
max_loops=1,
)
# Example task for the swarm
task = f"""
{swarm_prompt}
Process the incoming health score data while ensuring patient privacy. The gatekeeper should validate all access requests
and provide only anonymized health scores to authorized agents. Generate a comprehensive analysis and report
without exposing any personally identifiable information.
"""
# Run the swarm system with the task
output = agent_system.run(task)
print(output)

@ -0,0 +1,291 @@
import os
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat
# Initialize OpenAI model
api_key = os.getenv(
"OPENAI_API_KEY"
) # ANTHROPIC_API_KEY, COHERE_API_KEY
model = OpenAIChat(
api_key=api_key,
model_name="gpt-4o-mini",
temperature=0.7, # Higher temperature for more creative responses
)
# Patient Agent - Holds and protects private information
patient_agent = Agent(
agent_name="PatientAgent",
system_prompt="""
<role>
<identity>Anxious Patient with Private Health Information</identity>
<personality>
<traits>
<trait>Protective of personal information</trait>
<trait>Slightly distrustful of medical system</trait>
<trait>Worried about health insurance rates</trait>
<trait>Selective in information sharing</trait>
</traits>
<background>
<history>Previous negative experience with information leaks</history>
<concerns>Fear of discrimination based on health status</concerns>
</background>
</personality>
</role>
<private_information>
<health_data>
<score>Maintains actual health score</score>
<conditions>Knowledge of undisclosed conditions</conditions>
<medications>Complete list of current medications</medications>
<history>Full medical history</history>
</health_data>
<sharing_rules>
<authorized_sharing>
<condition>Only share general symptoms with doctor</condition>
<condition>Withhold specific details about lifestyle</condition>
<condition>Never reveal full medication list</condition>
<condition>Protect actual health score value</condition>
</authorized_sharing>
</sharing_rules>
</private_information>
<interaction_protocols>
<responses>
<to_questions>
<direct>Deflect sensitive questions</direct>
<vague>Provide partial information when pressed</vague>
<defensive>Become evasive if pressured too much</defensive>
</to_questions>
<to_requests>
<medical>Share only what's absolutely necessary</medical>
<personal>Redirect personal questions</personal>
</to_requests>
</responses>
</interaction_protocols>
""",
llm=model,
max_loops=1,
verbose=True,
stopping_token="<DONE>",
)
# Doctor Agent - Tries to gather accurate information
doctor_agent = Agent(
agent_name="DoctorAgent",
system_prompt="""
<role>
<identity>Empathetic but Thorough Medical Professional</identity>
<personality>
<traits>
<trait>Patient and understanding</trait>
<trait>Professionally persistent</trait>
<trait>Detail-oriented</trait>
<trait>Trust-building focused</trait>
</traits>
<approach>
<style>Non-confrontational but thorough</style>
<method>Uses indirect questions to gather information</method>
</approach>
</personality>
</role>
<capabilities>
<information_gathering>
<techniques>
<technique>Ask open-ended questions</technique>
<technique>Notice inconsistencies in responses</technique>
<technique>Build rapport before sensitive questions</technique>
<technique>Use medical knowledge to probe deeper</technique>
</techniques>
</information_gathering>
<communication>
<strategies>
<strategy>Explain importance of full disclosure</strategy>
<strategy>Provide privacy assurances</strategy>
<strategy>Use empathetic listening</strategy>
</strategies>
</communication>
</capabilities>
<protocols>
<patient_interaction>
<steps>
<step>Establish trust and rapport</step>
<step>Gather general health information</step>
<step>Carefully probe sensitive areas</step>
<step>Respect patient boundaries while encouraging openness</step>
</steps>
</patient_interaction>
</protocols>
""",
llm=model,
max_loops=1,
verbose=True,
stopping_token="<DONE>",
)
# Nurse Agent - Observes and assists
nurse_agent = Agent(
agent_name="NurseAgent",
system_prompt="""
<role>
<identity>Observant Support Medical Staff</identity>
<personality>
<traits>
<trait>Highly perceptive</trait>
<trait>Naturally trustworthy</trait>
<trait>Diplomatically skilled</trait>
</traits>
<functions>
<primary>Support doctor-patient communication</primary>
<secondary>Notice non-verbal cues</secondary>
</functions>
</personality>
</role>
<capabilities>
<observation>
<focus_areas>
<area>Patient body language</area>
<area>Inconsistencies in stories</area>
<area>Signs of withholding information</area>
<area>Emotional responses to questions</area>
</focus_areas>
</observation>
<support>
<actions>
<action>Provide comfortable environment</action>
<action>Offer reassurance when needed</action>
<action>Bridge communication gaps</action>
</actions>
</support>
</capabilities>
<protocols>
<assistance>
<methods>
<method>Share observations with doctor privately</method>
<method>Help patient feel more comfortable</method>
<method>Facilitate trust-building</method>
</methods>
</assistance>
</protocols>
""",
llm=model,
max_loops=1,
verbose=True,
stopping_token="<DONE>",
)
# Medical Records Agent - Analyzes available information
records_agent = Agent(
agent_name="MedicalRecordsAgent",
system_prompt="""
<role>
<identity>Medical Records Analyst</identity>
<function>
<primary>Analyze available medical information</primary>
<secondary>Identify patterns and inconsistencies</secondary>
</function>
</role>
<capabilities>
<analysis>
<methods>
<method>Compare current and historical data</method>
<method>Identify information gaps</method>
<method>Flag potential inconsistencies</method>
<method>Generate questions for follow-up</method>
</methods>
</analysis>
<reporting>
<outputs>
<output>Summarize known information</output>
<output>List missing critical data</output>
<output>Suggest areas for investigation</output>
</outputs>
</reporting>
</capabilities>
<protocols>
<data_handling>
<privacy>
<rule>Work only with authorized information</rule>
<rule>Maintain strict confidentiality</rule>
<rule>Flag but don't speculate about gaps</rule>
</privacy>
</data_handling>
</protocols>
""",
llm=model,
max_loops=1,
verbose=True,
stopping_token="<DONE>",
)
# Swarm-Level Prompt (Medical Consultation Scenario)
swarm_prompt = """
<medical_consultation_scenario>
<setting>
<location>Private medical office</location>
<context>Routine health assessment with complex patient</context>
</setting>
<workflow>
<stage name="initial_contact">
<agent>PatientAgent</agent>
<role>Present for check-up, holding private information</role>
</stage>
<stage name="examination">
<agent>DoctorAgent</agent>
<role>Conduct examination and gather information</role>
<agent>NurseAgent</agent>
<role>Observe and support interaction</role>
</stage>
<stage name="analysis">
<agent>MedicalRecordsAgent</agent>
<role>Process available information and identify gaps</role>
</stage>
</workflow>
<objectives>
<goal>Create realistic medical consultation interaction</goal>
<goal>Demonstrate information protection dynamics</goal>
<goal>Show natural healthcare provider-patient relationship</goal>
</objectives>
</medical_consultation_scenario>
"""
# Create agent list
agents = [patient_agent, doctor_agent, nurse_agent, records_agent]
# Define interaction flow
flow = (
"PatientAgent -> DoctorAgent -> NurseAgent -> MedicalRecordsAgent"
)
# Configure swarm system
agent_system = AgentRearrange(
name="medical-consultation-swarm",
description="Role-playing medical consultation with focus on information privacy",
agents=agents,
flow=flow,
return_json=False,
output_type="final",
max_loops=1,
)
# Example consultation scenario
task = f"""
{swarm_prompt}
Begin a medical consultation where the patient has a health score of 72 but is reluctant to share full details
about their lifestyle and medication history. The doctor needs to gather accurate information while the nurse
observes the interaction. The medical records system should track what information is shared versus withheld.
"""
# Run the consultation scenario
output = agent_system.run(task)
print(output)

@ -0,0 +1,118 @@
import os
from dotenv import load_dotenv
from swarms import Agent, SequentialWorkflow
from swarm_models import OpenAIChat
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize specialized agents
data_extractor_agent = Agent(
agent_name="Data-Extractor",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="data_extractor_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
summarizer_agent = Agent(
agent_name="Document-Summarizer",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="summarizer_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
financial_analyst_agent = Agent(
agent_name="Financial-Analyst",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="financial_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
market_analyst_agent = Agent(
agent_name="Market-Analyst",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="market_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
operational_analyst_agent = Agent(
agent_name="Operational-Analyst",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="operational_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
# Initialize the SwarmRouter
router = SequentialWorkflow(
name="pe-document-analysis-swarm",
description="Analyze documents for private equity due diligence and investment decision-making",
max_loops=1,
agents=[
data_extractor_agent,
summarizer_agent,
financial_analyst_agent,
market_analyst_agent,
operational_analyst_agent,
],
output_type="all",
)
# Example usage
if __name__ == "__main__":
# Run a comprehensive private equity document analysis task
result = router.run(
"Where is the best place to find template term sheets for series A startups. Provide links and references",
img=None,
)
print(result)

@ -0,0 +1,143 @@
import os
from dotenv import load_dotenv
from swarms import Agent, SequentialWorkflow
from swarm_models import OpenAIChat
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize specialized agents
data_extractor_agent = Agent(
agent_name="Data-Extractor",
system_prompt="""You are a data extraction specialist. Your role is to:
1. Extract key information, data points, and metrics from documents
2. Identify and pull out important facts, figures, and statistics
3. Structure extracted data in a clear, organized format
4. Flag any inconsistencies or missing data
5. Ensure accuracy in data extraction while maintaining context""",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="data_extractor_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
summarizer_agent = Agent(
agent_name="Document-Summarizer",
system_prompt="""You are a document summarization expert. Your role is to:
1. Create concise, comprehensive summaries of documents
2. Highlight key points and main takeaways
3. Maintain the essential meaning while reducing length
4. Structure summaries in a logical, readable format
5. Identify and emphasize critical insights""",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="summarizer_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
financial_analyst_agent = Agent(
agent_name="Financial-Analyst",
system_prompt="""You are a financial analysis expert. Your role is to:
1. Analyze financial statements and metrics
2. Evaluate company valuations and financial projections
3. Assess financial risks and opportunities
4. Provide insights on financial performance and health
5. Make recommendations based on financial analysis""",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="financial_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
market_analyst_agent = Agent(
agent_name="Market-Analyst",
system_prompt="""You are a market analysis expert. Your role is to:
1. Analyze market trends and dynamics
2. Evaluate competitive landscape and market positioning
3. Identify market opportunities and threats
4. Assess market size and growth potential
5. Provide strategic market insights and recommendations""",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="market_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
operational_analyst_agent = Agent(
agent_name="Operational-Analyst",
system_prompt="""You are an operational analysis expert. Your role is to:
1. Analyze business operations and processes
2. Evaluate operational efficiency and effectiveness
3. Identify operational risks and opportunities
4. Assess scalability and growth potential
5. Provide recommendations for operational improvements""",
llm=model,
max_loops=2,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="operational_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
# Initialize the SwarmRouter
router = SequentialWorkflow(
name="pe-document-analysis-swarm",
description="Analyze documents for private equity due diligence and investment decision-making",
max_loops=1,
agents=[
data_extractor_agent,
summarizer_agent,
financial_analyst_agent,
market_analyst_agent,
operational_analyst_agent,
],
output_type="all",
)
# Example usage
if __name__ == "__main__":
# Run a comprehensive private equity document analysis task
result = router.run(
"Where is the best place to find template term sheets for series A startups. Provide links and references",
no_use_clusterops=True,
)
print(result)

@ -0,0 +1,219 @@
import os
from swarm_models import OpenAIChat
from swarms import Agent, AgentRearrange, SwarmRearrange
company = "NVDA"
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize the Managing Director agent
managing_director = Agent(
agent_name="Managing-Director",
system_prompt=f"""
As the Managing Director at Blackstone, your role is to oversee the entire investment analysis process for potential acquisitions.
Your responsibilities include:
1. Setting the overall strategy and direction for the analysis
2. Coordinating the efforts of the various team members and ensuring a comprehensive evaluation
3. Reviewing the findings and recommendations from each team member
4. Making the final decision on whether to proceed with the acquisition
For the current potential acquisition of {company}, direct the tasks for the team to thoroughly analyze all aspects of the company, including its financials, industry position, technology, market potential, and regulatory compliance. Provide guidance and feedback as needed to ensure a rigorous and unbiased assessment.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="managing-director.json",
)
# Initialize the Vice President of Finance
vp_finance = Agent(
agent_name="VP-Finance",
system_prompt=f"""
As the Vice President of Finance at Blackstone, your role is to lead the financial analysis of potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a thorough review of {company}' financial statements, including income statements, balance sheets, and cash flow statements
2. Analyzing key financial metrics such as revenue growth, profitability margins, liquidity ratios, and debt levels
3. Assessing the company's historical financial performance and projecting future performance based on assumptions and market conditions
4. Identifying any financial risks or red flags that could impact the acquisition decision
5. Providing a detailed report on your findings and recommendations to the Managing Director
Be sure to consider factors such as the sustainability of {company}' business model, the strength of its customer base, and its ability to generate consistent cash flows. Your analysis should be data-driven, objective, and aligned with Blackstone's investment criteria.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="vp-finance.json",
)
# Initialize the Industry Analyst
industry_analyst = Agent(
agent_name="Industry-Analyst",
system_prompt=f"""
As the Industry Analyst at Blackstone, your role is to provide in-depth research and analysis on the industries and markets relevant to potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a comprehensive analysis of the industrial robotics and automation solutions industry, including market size, growth rates, key trends, and future prospects
2. Identifying the major players in the industry and assessing their market share, competitive strengths and weaknesses, and strategic positioning
3. Evaluating {company}' competitive position within the industry, including its market share, differentiation, and competitive advantages
4. Analyzing the key drivers and restraints for the industry, such as technological advancements, labor costs, regulatory changes, and economic conditions
5. Identifying potential risks and opportunities for {company} based on the industry analysis, such as disruptive technologies, emerging markets, or shifts in customer preferences
Your analysis should provide a clear and objective assessment of the attractiveness and future potential of the industrial robotics industry, as well as {company}' positioning within it. Consider both short-term and long-term factors, and provide evidence-based insights to inform the investment decision.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="industry-analyst.json",
)
# Initialize the Technology Expert
tech_expert = Agent(
agent_name="Tech-Expert",
system_prompt=f"""
As the Technology Expert at Blackstone, your role is to assess the technological capabilities, competitive advantages, and potential risks of companies being considered for acquisition.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a deep dive into {company}' proprietary technologies, including its robotics platforms, automation software, and AI capabilities
2. Assessing the uniqueness, scalability, and defensibility of {company}' technology stack and intellectual property
3. Comparing {company}' technologies to those of its competitors and identifying any key differentiators or technology gaps
4. Evaluating {company}' research and development capabilities, including its innovation pipeline, engineering talent, and R&D investments
5. Identifying any potential technology risks or disruptive threats that could impact {company}' long-term competitiveness, such as emerging technologies or expiring patents
Your analysis should provide a comprehensive assessment of {company}' technological strengths and weaknesses, as well as the sustainability of its competitive advantages. Consider both the current state of its technology and its future potential in light of industry trends and advancements.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="tech-expert.json",
)
# Initialize the Market Researcher
market_researcher = Agent(
agent_name="Market-Researcher",
system_prompt=f"""
As the Market Researcher at Blackstone, your role is to analyze the target company's customer base, market share, and growth potential to assess the commercial viability and attractiveness of the potential acquisition.
For the current potential acquisition of {company}, your tasks include:
1. Analyzing {company}' current customer base, including customer segmentation, concentration risk, and retention rates
2. Assessing {company}' market share within its target markets and identifying key factors driving its market position
3. Conducting a detailed market sizing and segmentation analysis for the industrial robotics and automation markets, including identifying high-growth segments and emerging opportunities
4. Evaluating the demand drivers and sales cycles for {company}' products and services, and identifying any potential risks or limitations to adoption
5. Developing financial projections and estimates for {company}' revenue growth potential based on the market analysis and assumptions around market share and penetration
Your analysis should provide a data-driven assessment of the market opportunity for {company} and the feasibility of achieving our investment return targets. Consider both bottom-up and top-down market perspectives, and identify any key sensitivities or assumptions in your projections.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="market-researcher.json",
)
# Initialize the Regulatory Specialist
regulatory_specialist = Agent(
agent_name="Regulatory-Specialist",
system_prompt=f"""
As the Regulatory Specialist at Blackstone, your role is to identify and assess any regulatory risks, compliance requirements, and potential legal liabilities associated with potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Identifying all relevant regulatory bodies and laws that govern the operations of {company}, including industry-specific regulations, labor laws, and environmental regulations
2. Reviewing {company}' current compliance policies, procedures, and track record to identify any potential gaps or areas of non-compliance
3. Assessing the potential impact of any pending or proposed changes to relevant regulations that could affect {company}' business or create additional compliance burdens
4. Evaluating the potential legal liabilities and risks associated with {company}' products, services, and operations, including product liability, intellectual property, and customer contracts
5. Providing recommendations on any regulatory or legal due diligence steps that should be taken as part of the acquisition process, as well as any post-acquisition integration considerations
Your analysis should provide a comprehensive assessment of the regulatory and legal landscape surrounding {company}, and identify any material risks or potential deal-breakers. Consider both the current state and future outlook, and provide practical recommendations to mitigate identified risks.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="regulatory-specialist.json",
)
# Create a list of agents
agents = [
managing_director,
vp_finance,
industry_analyst,
tech_expert,
market_researcher,
regulatory_specialist,
]
# Define multiple flow patterns
flows = [
"Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Managing-Director -> VP-Finance",
"Managing-Director -> VP-Finance -> Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist",
"Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Industry-Analyst -> Managing-Director -> VP-Finance",
]
# Create instances of AgentRearrange for each flow pattern
blackstone_acquisition_analysis = AgentRearrange(
name="Blackstone-Acquisition-Analysis",
description="A system for analyzing potential acquisitions",
agents=agents,
flow=flows[0],
)
blackstone_investment_strategy = AgentRearrange(
name="Blackstone-Investment-Strategy",
description="A system for evaluating investment opportunities",
agents=agents,
flow=flows[1],
)
blackstone_market_analysis = AgentRearrange(
name="Blackstone-Market-Analysis",
description="A system for analyzing market trends and opportunities",
agents=agents,
flow=flows[2],
)
swarm_arrange = SwarmRearrange(
name="Blackstone-Swarm",
description="A swarm that processes tasks concurrently using multiple agents and rearranges the flow based on the task requirements.",
swarms=[
blackstone_acquisition_analysis,
blackstone_investment_strategy,
blackstone_market_analysis,
],
flow=f"{blackstone_acquisition_analysis.name} -> {blackstone_investment_strategy.name} -> {blackstone_market_analysis.name}",
max_loops=1,
)
print(
swarm_arrange.run(
"Analyze NVIDIA's performance, market trends, and potential for growth in the AI industry"
)
)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "6.8.1"
version = "6.8.3"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]
@ -118,7 +118,6 @@ mypy-protobuf = "^3.0.0"
[tool.poetry.group.test.dependencies]
pytest = "^8.1.1"
pandas = "^2.2.2"
[tool.ruff]
line-length = 70

@ -21,7 +21,6 @@ types-pytz>=2023.3,<2025.0
types-chardet>=5.0.4.6
mypy-protobuf>=3.0.0
pytest>=8.1.1
pandas>=2.2.2
networkx
aiofiles
clusterops

@ -9,12 +9,6 @@ from swarms.structs.agent import Agent
logger = initialize_logger(log_folder="pandas_utils")
try:
import pandas as pd
except ImportError:
logger.error("Failed to import pandas")
subprocess.run(["pip", "install", "pandas"])
import pandas as pd
def display_agents_info(agents: List[Agent]) -> None:
@ -24,6 +18,16 @@ def display_agents_info(agents: List[Agent]) -> None:
:param agents: List of Agent instances.
"""
# Extracting relevant information from each agent
try:
import pandas as pd
except ImportError:
logger.error("Failed to import pandas")
subprocess.run(["pip", "install", "pandas"])
import pandas as pd
agent_data = []
for agent in agents:
try:
@ -57,19 +61,26 @@ def display_agents_info(agents: List[Agent]) -> None:
logger.error(f"Failed to print DataFrame: {e}")
def dict_to_dataframe(data: Dict[str, Any]) -> pd.DataFrame:
def dict_to_dataframe(data: Dict[str, Any]):
"""
Converts a dictionary into a pandas DataFrame.
:param data: Dictionary to convert.
:return: A pandas DataFrame representation of the dictionary.
"""
try:
import pandas as pd
except ImportError:
logger.error("Failed to import pandas")
subprocess.run(["pip", "install", "pandas"])
import pandas as pd
# Convert dictionary to DataFrame
df = pd.json_normalize(data)
return df
def pydantic_model_to_dataframe(model: BaseModel) -> pd.DataFrame:
def pydantic_model_to_dataframe(model: BaseModel) -> any:
"""
Converts a Pydantic Base Model into a pandas DataFrame.

@ -0,0 +1,286 @@
import os
import traceback
from datetime import datetime
from typing import Callable, Dict, List, Optional
from loguru import logger
from swarm_models import OpenAIChat
from swarms.structs.agent import Agent
from swarms.structs.rearrange import AgentRearrange
class TestResult:
"""Class to store test results and metadata"""
def __init__(self, test_name: str):
self.test_name = test_name
self.start_time = datetime.now()
self.end_time = None
self.success = False
self.error = None
self.traceback = None
self.function_output = None
def complete(self, success: bool, error: Optional[Exception] = None):
"""Complete the test execution with results"""
self.end_time = datetime.now()
self.success = success
if error:
self.error = str(error)
self.traceback = traceback.format_exc()
def duration(self) -> float:
"""Calculate test duration in seconds"""
if self.end_time:
return (self.end_time - self.start_time).total_seconds()
return 0
def run_test(test_func: Callable) -> TestResult:
"""
Decorator to run tests with error handling and logging
Args:
test_func (Callable): Test function to execute
Returns:
TestResult: Object containing test execution details
"""
def wrapper(*args, **kwargs) -> TestResult:
result = TestResult(test_func.__name__)
logger.info(f"\n{'='*20} Running test: {test_func.__name__} {'='*20}")
try:
output = test_func(*args, **kwargs)
result.function_output = output
result.complete(success=True)
logger.success(f"✅ Test {test_func.__name__} passed successfully")
except Exception as e:
result.complete(success=False, error=e)
logger.error(f"❌ Test {test_func.__name__} failed with error: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
logger.info(f"Test duration: {result.duration():.2f} seconds\n")
return result
return wrapper
def create_functional_agents() -> List[Agent]:
"""
Create a list of functional agents with real LLM integration for testing.
Using OpenAI's GPT model for realistic agent behavior testing.
"""
# Initialize OpenAI Chat model
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
logger.warning("No OpenAI API key found. Using mock agents instead.")
return [create_mock_agent("TestAgent1"), create_mock_agent("TestAgent2")]
try:
model = OpenAIChat(
api_key=api_key,
model_name="gpt-4o",
temperature=0.1
)
# Create boss agent
boss_agent = Agent(
agent_name="BossAgent",
system_prompt="""
You are the BossAgent responsible for managing and overseeing test scenarios.
Your role is to coordinate tasks between agents and ensure efficient collaboration.
Analyze inputs, break down tasks, and provide clear directives to other agents.
Maintain a structured approach to task management and result compilation.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="test_boss_agent.json",
)
# Create analysis agent
analysis_agent = Agent(
agent_name="AnalysisAgent",
system_prompt="""
You are the AnalysisAgent responsible for detailed data processing and analysis.
Your role is to examine input data, identify patterns, and provide analytical insights.
Focus on breaking down complex information into clear, actionable components.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="test_analysis_agent.json",
)
# Create summary agent
summary_agent = Agent(
agent_name="SummaryAgent",
system_prompt="""
You are the SummaryAgent responsible for consolidating and summarizing information.
Your role is to take detailed analysis and create concise, actionable summaries.
Focus on highlighting key points and ensuring clarity in communication.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="test_summary_agent.json",
)
logger.info("Successfully created functional agents with LLM integration")
return [boss_agent, analysis_agent, summary_agent]
except Exception as e:
logger.error(f"Failed to create functional agents: {str(e)}")
logger.warning("Falling back to mock agents")
return [create_mock_agent("TestAgent1"), create_mock_agent("TestAgent2")]
def create_mock_agent(name: str) -> Agent:
"""Create a mock agent for testing when LLM integration is not available"""
return Agent(
agent_name=name,
system_prompt=f"You are a test agent named {name}",
llm=None
)
@run_test
def test_init():
"""Test AgentRearrange initialization with functional agents"""
logger.info("Creating agents for initialization test")
agents = create_functional_agents()
rearrange = AgentRearrange(
name="TestRearrange",
agents=agents,
flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}"
)
assert rearrange.name == "TestRearrange"
assert len(rearrange.agents) == 3
assert rearrange.flow == f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}"
logger.info(f"Initialized AgentRearrange with {len(agents)} agents")
return True
@run_test
def test_validate_flow():
"""Test flow validation logic"""
agents = create_functional_agents()
rearrange = AgentRearrange(
agents=agents,
flow=f"{agents[0].agent_name} -> {agents[1].agent_name}"
)
logger.info("Testing valid flow pattern")
valid = rearrange.validate_flow()
assert valid is True
logger.info("Testing invalid flow pattern")
rearrange.flow = f"{agents[0].agent_name} {agents[1].agent_name}" # Missing arrow
try:
rearrange.validate_flow()
assert False, "Should have raised ValueError"
except ValueError as e:
logger.info(f"Successfully caught invalid flow error: {str(e)}")
assert True
return True
@run_test
def test_add_remove_agent():
"""Test adding and removing agents from the swarm"""
agents = create_functional_agents()
rearrange = AgentRearrange(agents=agents[:2]) # Start with first two agents
logger.info("Testing agent addition")
new_agent = agents[2] # Use the third agent as new agent
rearrange.add_agent(new_agent)
assert new_agent.agent_name in rearrange.agents
logger.info("Testing agent removal")
rearrange.remove_agent(new_agent.agent_name)
assert new_agent.agent_name not in rearrange.agents
return True
@run_test
def test_basic_run():
"""Test basic task execution with the swarm"""
agents = create_functional_agents()
rearrange = AgentRearrange(
name="TestSwarm",
agents=agents,
flow=f"{agents[0].agent_name} -> {agents[1].agent_name} -> {agents[2].agent_name}",
max_loops=1
)
test_task = "Analyze this test message and provide a brief summary."
logger.info(f"Running test task: {test_task}")
try:
result = rearrange.run(test_task)
assert result is not None
logger.info(f"Successfully executed task with result length: {len(str(result))}")
return True
except Exception as e:
logger.error(f"Task execution failed: {str(e)}")
raise
def run_all_tests() -> Dict[str, TestResult]:
"""
Run all test cases and collect results
Returns:
Dict[str, TestResult]: Dictionary mapping test names to their results
"""
logger.info("\n🚀 Starting AgentRearrange test suite execution")
test_functions = [
test_init,
test_validate_flow,
test_add_remove_agent,
test_basic_run
]
results = {}
for test in test_functions:
result = test()
results[test.__name__] = result
# Log summary
total_tests = len(results)
passed_tests = sum(1 for r in results.values() if r.success)
failed_tests = total_tests - passed_tests
logger.info("\n📊 Test Suite Summary:")
logger.info(f"Total Tests: {total_tests}")
print(f"✅ Passed: {passed_tests}")
if failed_tests > 0:
logger.error(f"❌ Failed: {failed_tests}")
# Detailed failure information
if failed_tests > 0:
logger.error("\n❌ Failed Tests Details:")
for name, result in results.items():
if not result.success:
logger.error(f"\n{name}:")
logger.error(f"Error: {result.error}")
logger.error(f"Traceback: {result.traceback}")
return results
if __name__ == "__main__":
print("🌟 Starting AgentRearrange Test Suite")
results = run_all_tests()
print("🏁 Test Suite Execution Completed")
Loading…
Cancel
Save