swarms api docs

pull/785/head
Kye Gomez 1 month ago
parent 49b65e4d07
commit c3c9254414

2
.gitignore vendored

@ -9,6 +9,8 @@ video/
artifacts_three
dataframe/
.ruff_cache
target/
Cargo.lock
.pytest_cache
static/generated
runs

@ -275,6 +275,7 @@ nav:
- Deploying Swarms on Google Cloud Run: "swarms_cloud/cloud_run.md"
# - Swarms Cloud CLI: "swarms_cloud/cli.md"
- Swarm APIs:
- Swarms API: "swarms_cloud/swarms_api.md"
- MCS API: "swarms_cloud/mcs_api.md"
- CreateNow API: "swarms_cloud/create_api.md"
- Swarms Memory:

@ -0,0 +1,675 @@
# Swarms API Documentation
The Swarms API is a powerful REST API designed to help you create, manage, and execute various types of swarms efficiently. Whether you need to run tasks sequentially, concurrently, or in a custom workflow, the Swarms API has you covered.
### Key Features:
- **Sequential Swarms**: Execute tasks one after another in a defined order.
- **Concurrent Swarms**: Run multiple tasks simultaneously to save time and resources.
- **Custom Workflows**: Design your own swarm workflows to fit your specific needs.
To get started, find your API key in the Swarms Cloud dashboard. [Get your API key here](https://swarms.world/platform/api-keys)
## Base URL
```
https://swarms-api-285321057562.us-east1.run.app
```
## Authentication
All API requests (except `/health`) require authentication using an API key passed in the `x-api-key` header:
```http
x-api-key: your_api_key_here
```
## Endpoints
### Health Check
Check if the API is operational.
**Endpoint:** `GET /health`
**Authentication Required:** No
**Response:**
```json
{
"status": "ok"
}
```
### Single Swarm Completion
Run a single swarm with specified agents and tasks.
**Endpoint:** `POST /v1/swarm/completions`
**Authentication Required:** Yes
#### Request Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| name | string | Optional | Name of the swarm (max 100 chars) |
| description | string | Optional | Description of the swarm (max 500 chars) |
| agents | array | Required | Array of agent configurations |
| max_loops | integer | Optional | Maximum number of iterations |
| swarm_type | string | Optional | Type of swarm workflow |
| task | string | Required | The task to be performed |
| img | string | Optional | Image URL if relevant |
#### Agent Configuration Parameters
| Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| agent_name | string | Required | - | Name of the agent (max 100 chars) |
| description | string | Optional | - | Description of the agent (max 500 chars) |
| system_prompt | string | Optional | - | System prompt for the agent (max 500 chars) |
| model_name | string | Optional | "gpt-4o" | Model to be used by the agent |
| auto_generate_prompt | boolean | Optional | false | Whether to auto-generate prompts |
| max_tokens | integer | Optional | - | Maximum tokens for response |
| temperature | float | Optional | 0.5 | Temperature for response generation |
| role | string | Optional | "worker" | Role of the agent |
| max_loops | integer | Optional | 1 | Maximum iterations for this agent |
#### Example Request
```json
{
"name": "Test Swarm",
"description": "A test swarm",
"agents": [
{
"agent_name": "Research Agent",
"description": "Conducts research",
"system_prompt": "You are a research assistant.",
"model_name": "gpt-4o",
"role": "worker",
"max_loops": 1
}
],
"max_loops": 1,
"swarm_type": "ConcurrentWorkflow",
"task": "Write a short blog post about AI agents."
}
```
#### Response Structure
| Field | Type | Description |
|-------|------|-------------|
| status | string | Status of the swarm execution |
| swarm_name | string | Name of the executed swarm |
| description | string | Description of the swarm |
| task | string | Original task description |
| metadata | object | Execution metadata |
| output | object/array | Results from the swarm execution |
### Batch Swarm Completion
Run multiple swarms in a single request.
**Endpoint:** `POST /v1/swarm/batch/completions`
**Authentication Required:** Yes
#### Request Format
Array of swarm configurations, each following the same format as single swarm completion.
#### Example Batch Request
```json
[
{
"name": "Batch Swarm 1",
"description": "First swarm in batch",
"agents": [...],
"task": "Task 1"
},
{
"name": "Batch Swarm 2",
"description": "Second swarm in batch",
"agents": [...],
"task": "Task 2"
}
]
```
# Swarms API Implementation Examples
## Python
### Using requests
```python
import requests
import json
API_KEY = "your_api_key_here"
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
headers = {
"x-api-key": API_KEY,
"Content-Type": "application/json"
}
def run_single_swarm():
payload = {
"name": "Financial Analysis Swarm",
"description": "Market analysis swarm",
"agents": [
{
"agent_name": "Market Analyst",
"description": "Analyzes market trends",
"system_prompt": "You are a financial analyst expert.",
"model_name": "gpt-4o",
"role": "worker",
"max_loops": 1
}
],
"max_loops": 1,
"swarm_type": "SequentialWorkflow",
"task": "Analyze current market trends in tech sector"
}
response = requests.post(
f"{BASE_URL}/v1/swarm/completions",
headers=headers,
json=payload
)
return response.json()
def run_batch_swarms():
payload = [
{
"name": "Market Analysis",
"description": "First swarm",
"agents": [
{
"agent_name": "Analyst",
"system_prompt": "You are a market analyst.",
"model_name": "gpt-4o",
"role": "worker"
}
],
"task": "Analyze tech trends"
},
{
"name": "Risk Assessment",
"description": "Second swarm",
"agents": [
{
"agent_name": "Risk Analyst",
"system_prompt": "You are a risk analyst.",
"model_name": "gpt-4o",
"role": "worker"
}
],
"task": "Assess market risks"
}
]
response = requests.post(
f"{BASE_URL}/v1/swarm/batch/completions",
headers=headers,
json=payload
)
return response.json()
# Using async/await with aiohttp
import aiohttp
import asyncio
async def run_swarm_async():
async with aiohttp.ClientSession() as session:
async with session.post(
f"{BASE_URL}/v1/swarm/completions",
headers=headers,
json=payload
) as response:
return await response.json()
```
## Node.js
### Using Fetch API
```javascript
const API_KEY = 'your_api_key_here';
const BASE_URL = 'https://swarms-api-285321057562.us-east1.run.app';
const headers = {
'x-api-key': API_KEY,
'Content-Type': 'application/json'
};
// Single swarm execution
async function runSingleSwarm() {
const payload = {
name: 'Financial Analysis',
description: 'Market analysis swarm',
agents: [
{
agent_name: 'Market Analyst',
description: 'Analyzes market trends',
system_prompt: 'You are a financial analyst expert.',
model_name: 'gpt-4o',
role: 'worker',
max_loops: 1
}
],
max_loops: 1,
swarm_type: 'SequentialWorkflow',
task: 'Analyze current market trends'
};
try {
const response = await fetch(`${BASE_URL}/v1/swarm/completions`, {
method: 'POST',
headers,
body: JSON.stringify(payload)
});
return await response.json();
} catch (error) {
console.error('Error:', error);
throw error;
}
}
// Batch swarm execution
async function runBatchSwarms() {
const payload = [
{
name: 'Market Analysis',
agents: [{
agent_name: 'Analyst',
system_prompt: 'You are a market analyst.',
model_name: 'gpt-4o',
role: 'worker'
}],
task: 'Analyze tech trends'
},
{
name: 'Risk Assessment',
agents: [{
agent_name: 'Risk Analyst',
system_prompt: 'You are a risk analyst.',
model_name: 'gpt-4o',
role: 'worker'
}],
task: 'Assess market risks'
}
];
try {
const response = await fetch(`${BASE_URL}/v1/swarm/batch/completions`, {
method: 'POST',
headers,
body: JSON.stringify(payload)
});
return await response.json();
} catch (error) {
console.error('Error:', error);
throw error;
}
}
```
### Using Axios
```javascript
const axios = require('axios');
const api = axios.create({
baseURL: BASE_URL,
headers: {
'x-api-key': API_KEY,
'Content-Type': 'application/json'
}
});
async function runSwarm() {
try {
const response = await api.post('/v1/swarm/completions', payload);
return response.data;
} catch (error) {
console.error('Error:', error.response?.data || error.message);
throw error;
}
}
```
## Go
```go
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
)
const (
baseURL = "https://swarms-api-285321057562.us-east1.run.app"
apiKey = "your_api_key_here"
)
type Agent struct {
AgentName string `json:"agent_name"`
Description string `json:"description"`
SystemPrompt string `json:"system_prompt"`
ModelName string `json:"model_name"`
Role string `json:"role"`
MaxLoops int `json:"max_loops"`
}
type SwarmRequest struct {
Name string `json:"name"`
Description string `json:"description"`
Agents []Agent `json:"agents"`
MaxLoops int `json:"max_loops"`
SwarmType string `json:"swarm_type"`
Task string `json:"task"`
}
func runSingleSwarm() ([]byte, error) {
payload := SwarmRequest{
Name: "Financial Analysis",
Description: "Market analysis swarm",
Agents: []Agent{
{
AgentName: "Market Analyst",
Description: "Analyzes market trends",
SystemPrompt: "You are a financial analyst expert.",
ModelName: "gpt-4o",
Role: "worker",
MaxLoops: 1,
},
},
MaxLoops: 1,
SwarmType: "SequentialWorkflow",
Task: "Analyze current market trends",
}
jsonPayload, err := json.Marshal(payload)
if err != nil {
return nil, err
}
client := &http.Client{}
req, err := http.NewRequest("POST", baseURL+"/v1/swarm/completions", bytes.NewBuffer(jsonPayload))
if err != nil {
return nil, err
}
req.Header.Set("x-api-key", apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
func main() {
response, err := runSingleSwarm()
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Response: %s\n", response)
}
```
## Rust
```rust
use reqwest::{Client, header};
use serde::{Deserialize, Serialize};
use serde_json::json;
const BASE_URL: &str = "https://swarms-api-285321057562.us-east1.run.app";
const API_KEY: &str = "your_api_key_here";
#[derive(Serialize, Deserialize)]
struct Agent {
agent_name: String,
description: String,
system_prompt: String,
model_name: String,
role: String,
max_loops: i32,
}
#[derive(Serialize, Deserialize)]
struct SwarmRequest {
name: String,
description: String,
agents: Vec<Agent>,
max_loops: i32,
swarm_type: String,
task: String,
}
async fn run_single_swarm() -> Result<String, Box<dyn std::error::Error>> {
let client = Client::new();
let payload = SwarmRequest {
name: "Financial Analysis".to_string(),
description: "Market analysis swarm".to_string(),
agents: vec![Agent {
agent_name: "Market Analyst".to_string(),
description: "Analyzes market trends".to_string(),
system_prompt: "You are a financial analyst expert.".to_string(),
model_name: "gpt-4o".to_string(),
role: "worker".to_string(),
max_loops: 1,
}],
max_loops: 1,
swarm_type: "SequentialWorkflow".to_string(),
task: "Analyze current market trends".to_string(),
};
let response = client
.post(format!("{}/v1/swarm/completions", BASE_URL))
.header("x-api-key", API_KEY)
.header("Content-Type", "application/json")
.json(&payload)
.send()
.await?;
let result = response.text().await?;
Ok(result)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let response = run_single_swarm().await?;
println!("Response: {}", response);
Ok(())
}
```
## C#
```csharp
using System;
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
public class SwarmClient
{
private readonly HttpClient _client;
private const string BaseUrl = "https://swarms-api-285321057562.us-east1.run.app";
private readonly string _apiKey;
public SwarmClient(string apiKey)
{
_apiKey = apiKey;
_client = new HttpClient();
_client.DefaultRequestHeaders.Add("x-api-key", apiKey);
}
public class Agent
{
public string AgentName { get; set; }
public string Description { get; set; }
public string SystemPrompt { get; set; }
public string ModelName { get; set; }
public string Role { get; set; }
public int MaxLoops { get; set; }
}
public class SwarmRequest
{
public string Name { get; set; }
public string Description { get; set; }
public List<Agent> Agents { get; set; }
public int MaxLoops { get; set; }
public string SwarmType { get; set; }
public string Task { get; set; }
}
public async Task<string> RunSingleSwarm()
{
var payload = new SwarmRequest
{
Name = "Financial Analysis",
Description = "Market analysis swarm",
Agents = new List<Agent>
{
new Agent
{
AgentName = "Market Analyst",
Description = "Analyzes market trends",
SystemPrompt = "You are a financial analyst expert.",
ModelName = "gpt-4o",
Role = "worker",
MaxLoops = 1
}
},
MaxLoops = 1,
SwarmType = "SequentialWorkflow",
Task = "Analyze current market trends"
};
var content = new StringContent(
JsonSerializer.Serialize(payload),
Encoding.UTF8,
"application/json"
);
var response = await _client.PostAsync(
$"{BaseUrl}/v1/swarm/completions",
content
);
return await response.Content.ReadAsStringAsync();
}
}
// Usage
class Program
{
static async Task Main(string[] args)
{
var client = new SwarmClient("your_api_key_here");
var response = await client.RunSingleSwarm();
Console.WriteLine($"Response: {response}");
}
}
```
## Shell (cURL)
```bash
# Single swarm execution
curl -X POST "https://swarms-api-285321057562.us-east1.run.app/v1/swarm/completions" \
-H "x-api-key: your_api_key_here" \
-H "Content-Type: application/json" \
-d '{
"name": "Financial Analysis",
"description": "Market analysis swarm",
"agents": [
{
"agent_name": "Market Analyst",
"description": "Analyzes market trends",
"system_prompt": "You are a financial analyst expert.",
"model_name": "gpt-4o",
"role": "worker",
"max_loops": 1
}
],
"max_loops": 1,
"swarm_type": "SequentialWorkflow",
"task": "Analyze current market trends"
}'
# Batch swarm execution
curl -X POST "https://swarms-api-285321057562.us-east1.run.app/v1/swarm/batch/completions" \
-H "x-api-key: your_api_key_here" \
-H "Content-Type: application/json" \
-d '[
{
"name": "Market Analysis",
"agents": [{
"agent_name": "Analyst",
"system_prompt": "You are a market analyst.",
"model_name": "gpt-4o",
"role": "worker"
}],
"task": "Analyze tech trends"
},
{
"name": "Risk Assessment",
"agents": [{
"agent_name": "Risk Analyst",
"system_prompt": "You are a risk analyst.",
"model_name": "gpt-4o",
"role": "worker"
}],
"task": "Assess market risks"
}
]'
```
## Billing and Credits
The API uses a credit-based billing system with the following components:
### Cost Calculation
| Component | Cost |
|-----------|------|
| Base cost per agent | $0.01 |
| Input tokens (per 1M) | $5.00 |
| Output tokens (per 1M) | $15.50 |
Credits are deducted based on:
- Number of agents used
- Total input tokens (including system prompts and agent memory)
- Total output tokens generated
- Execution time
### Credit Types
- Free credits: Used first
- Regular credits: Used after free credits are exhausted
## Error Handling
| HTTP Status Code | Description |
|-----------------|-------------|
| 402 | Insufficient credits |
| 403 | Invalid API key |
| 404 | Resource not found |
| 500 | Internal server error |
## Best Practices
1. Start with small swarms and gradually increase complexity
2. Monitor credit usage and token counts
3. Use appropriate max_loops values to control execution
4. Implement proper error handling for API responses
5. Consider using batch completions for multiple related tasks

@ -24,7 +24,7 @@ agent = Agent(
max_tokens=4000, # max output tokens
saved_state_path="agent_00.json",
interactive=False,
roles="director",
role="director",
)
agent.run(

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "7.2.7"
version = "7.4.0"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]
@ -75,6 +75,7 @@ aiofiles = "*"
rich = "*"
numpy = "*"
litellm = "*"
torch = "*"
[tool.poetry.scripts]
swarms = "swarms.cli.main:main"
@ -114,3 +115,10 @@ exclude = '''
)/
'''
[tool.maturin]
module-name = "swarms_rust"
[tool.maturin.build]
features = ["extension-module"]

@ -0,0 +1,27 @@
from swarms import Agent, SwarmRouter
agents = [
Agent(
agent_name="test_agent_1",
agent_description="test_agent_1_description",
system_prompt="test_agent_1_system_prompt",
model_name="gpt-4o",
),
Agent(
agent_name="test_agent_2",
agent_description="test_agent_2_description",
system_prompt="test_agent_2_system_prompt",
model_name="gpt-4o",
),
Agent(
agent_name="test_agent_3",
agent_description="test_agent_3_description",
system_prompt="test_agent_3_system_prompt",
model_name="gpt-4o",
),
]
router = SwarmRouter(agents=agents, swarm_type="SequentialWorkflow")
print(router.run("How are you doing?"))

@ -10,13 +10,13 @@ from swarms.structs.stopping_conditions import (
check_stopped,
check_success,
)
from swarms.agents.tool_agent import ToolAgent
# from swarms.agents.tool_agent import ToolAgent
from swarms.agents.create_agents_from_yaml import (
create_agents_from_yaml,
)
__all__ = [
"ToolAgent",
# "ToolAgent",
"check_done",
"check_finished",
"check_complete",

@ -31,7 +31,8 @@ class AgentRole(Enum):
@dataclass
class CommunicationEvent:
"""Represents a structured communication event between agents."""
message: str
message: str
background: Optional[str] = None
intermediate_output: Optional[Dict[str, Any]] = None
sender: str = ""
@ -96,7 +97,7 @@ class TalkHier:
import re
json_match = re.search(r"\{.*\}", json_str, re.DOTALL)
if (json_match):
if json_match:
return json.loads(json_match.group())
# Try extracting from markdown code blocks
code_block_match = re.search(
@ -104,7 +105,7 @@ class TalkHier:
json_str,
re.DOTALL,
)
if (code_block_match):
if code_block_match:
return json.loads(code_block_match.group(1))
except Exception as e:
logger.warning(f"Failed to extract JSON: {str(e)}")
@ -173,7 +174,9 @@ Output all responses in strict JSON format:
system_prompt=self._get_criteria_generator_prompt(),
model_name=self.model_name,
max_loops=1,
saved_state_path=str(self.base_path / "criteria_generator.json"),
saved_state_path=str(
self.base_path / "criteria_generator.json"
),
verbose=True,
)
@ -350,77 +353,97 @@ Output all responses in strict JSON format:
}
}"""
def _generate_criteria_for_task(self, task: str) -> Dict[str, Any]:
def _generate_criteria_for_task(
self, task: str
) -> Dict[str, Any]:
"""Generate evaluation criteria for the given task."""
try:
criteria_input = {
"task": task,
"instruction": "Generate specific evaluation criteria for this task."
"instruction": "Generate specific evaluation criteria for this task.",
}
criteria_response = self.criteria_generator.run(json.dumps(criteria_input))
criteria_response = self.criteria_generator.run(
json.dumps(criteria_input)
)
self.conversation.add(
role="Criteria-Generator",
content=criteria_response
role="Criteria-Generator", content=criteria_response
)
return self._safely_parse_json(criteria_response)
except Exception as e:
logger.error(f"Error generating criteria: {str(e)}")
return {"criteria": {}}
def _create_comm_event(self, sender: Agent, receiver: Agent, response: Dict) -> CommunicationEvent:
def _create_comm_event(
self, sender: Agent, receiver: Agent, response: Dict
) -> CommunicationEvent:
"""Create a structured communication event between agents."""
return CommunicationEvent(
message=response.get("message", ""),
background=response.get("background", ""),
intermediate_output=response.get("intermediate_output", {}),
intermediate_output=response.get(
"intermediate_output", {}
),
sender=sender.agent_name,
receiver=receiver.agent_name,
)
def _evaluate_content(self, content: Union[str, Dict], task: str) -> Dict[str, Any]:
def _evaluate_content(
self, content: Union[str, Dict], task: str
) -> Dict[str, Any]:
"""Coordinate evaluation process with parallel evaluator execution."""
try:
content_dict = self._safely_parse_json(content) if isinstance(content, str) else content
content_dict = (
self._safely_parse_json(content)
if isinstance(content, str)
else content
)
criteria_data = self._generate_criteria_for_task(task)
def run_evaluator(evaluator, eval_input):
response = evaluator.run(json.dumps(eval_input))
return {
"evaluator_id": evaluator.agent_name,
"evaluation": self._safely_parse_json(response)
"evaluator_id": evaluator.agent_name,
"evaluation": self._safely_parse_json(response),
}
eval_inputs = [{
"task": task,
"content": content_dict,
"criteria": criteria_data.get("criteria", {})
} for _ in self.evaluators]
eval_inputs = [
{
"task": task,
"content": content_dict,
"criteria": criteria_data.get("criteria", {}),
}
for _ in self.evaluators
]
with ThreadPoolExecutor() as executor:
evaluations = list(executor.map(
lambda x: run_evaluator(*x),
zip(self.evaluators, eval_inputs)
))
evaluations = list(
executor.map(
lambda x: run_evaluator(*x),
zip(self.evaluators, eval_inputs),
)
)
supervisor_input = {
"evaluations": evaluations,
"task": task,
"instruction": "Synthesize feedback"
"instruction": "Synthesize feedback",
}
supervisor_response = self.main_supervisor.run(json.dumps(supervisor_input))
aggregated_eval = self._safely_parse_json(supervisor_response)
supervisor_response = self.main_supervisor.run(
json.dumps(supervisor_input)
)
aggregated_eval = self._safely_parse_json(
supervisor_response
)
# Track communication
comm_event = self._create_comm_event(
self.main_supervisor,
self.revisor,
aggregated_eval
self.main_supervisor, self.revisor, aggregated_eval
)
self.conversation.add(
role="Communication",
content=json.dumps(asdict(comm_event))
content=json.dumps(asdict(comm_event)),
)
return aggregated_eval
@ -455,11 +478,15 @@ Output all responses in strict JSON format:
# Collect all unique criteria from evaluators
all_criteria = set()
for eval_data in evaluations:
categories = eval_data.get("scores", {}).get("categories", {})
categories = eval_data.get("scores", {}).get(
"categories", {}
)
all_criteria.update(categories.keys())
# Initialize score aggregation
aggregated_scores = {criterion: [] for criterion in all_criteria}
aggregated_scores = {
criterion: [] for criterion in all_criteria
}
overall_scores = []
all_feedback = []
@ -467,7 +494,7 @@ Output all responses in strict JSON format:
for eval_data in evaluations:
scores = eval_data.get("scores", {})
overall_scores.append(scores.get("overall", 0.5))
categories = scores.get("categories", {})
for criterion in all_criteria:
if criterion in categories:
@ -508,32 +535,40 @@ Output all responses in strict JSON format:
try:
# Get evaluations and supervisor selection
evaluation_result = self._evaluate_content(content, task)
# Extract selected evaluation and supervisor reasoning
selected_evaluation = evaluation_result.get("selected_evaluation", {})
supervisor_reasoning = evaluation_result.get("supervisor_reasoning", {})
selected_evaluation = evaluation_result.get(
"selected_evaluation", {}
)
supervisor_reasoning = evaluation_result.get(
"supervisor_reasoning", {}
)
# Prepare revision input with selected evaluation
revision_input = {
"content": content,
"evaluation": selected_evaluation,
"supervisor_feedback": supervisor_reasoning,
"instruction": "Revise the content based on the selected evaluation feedback"
"instruction": "Revise the content based on the selected evaluation feedback",
}
# Get revision from content generator
revision_response = self.generator.run(json.dumps(revision_input))
revised_content = self._safely_parse_json(revision_response)
revision_response = self.generator.run(
json.dumps(revision_input)
)
revised_content = self._safely_parse_json(
revision_response
)
return {
"content": revised_content,
"evaluation": evaluation_result
"evaluation": evaluation_result,
}
except Exception as e:
logger.error(f"Evaluation and revision error: {str(e)}")
return {
"content": content,
"evaluation": self._get_fallback_evaluation()
"evaluation": self._get_fallback_evaluation(),
}
def run(self, task: str) -> Dict[str, Any]:
@ -579,22 +614,34 @@ Output all responses in strict JSON format:
logger.info(f"Starting iteration {iteration + 1}")
# Evaluate and revise content
result = self._evaluate_and_revise(current_content, task)
result = self._evaluate_and_revise(
current_content, task
)
evaluation = result["evaluation"]
current_content = result["content"]
# Check if quality threshold is met
selected_eval = evaluation.get("selected_evaluation", {})
overall_score = selected_eval.get("scores", {}).get("overall", 0.0)
selected_eval = evaluation.get(
"selected_evaluation", {}
)
overall_score = selected_eval.get("scores", {}).get(
"overall", 0.0
)
if overall_score >= self.quality_threshold:
logger.info("Quality threshold met, returning content")
logger.info(
"Quality threshold met, returning content"
)
return {
"content": current_content.get("content", {}).get("main_body", ""),
"content": current_content.get(
"content", {}
).get("main_body", ""),
"final_score": overall_score,
"iterations": iteration + 1,
"metadata": {
"content_metadata": current_content.get("content", {}).get("metadata", {}),
"content_metadata": current_content.get(
"content", {}
).get("metadata", {}),
"evaluation": evaluation,
},
}
@ -665,18 +712,18 @@ Output all responses in strict JSON format:
)
if __name__ == "__main__":
try:
talkhier = TalkHier(
max_iterations=1,
quality_threshold=0.8,
model_name="gpt-4o",
return_string=False,
)
# if __name__ == "__main__":
# try:
# talkhier = TalkHier(
# max_iterations=1,
# quality_threshold=0.8,
# model_name="gpt-4o",
# return_string=False,
# )
# Ask for user input
task = input("Enter the content generation task description: ")
result = talkhier.run(task)
# # Ask for user input
# task = input("Enter the content generation task description: ")
# result = talkhier.run(task)
except Exception as e:
logger.error(f"Error in main execution: {str(e)}")
# except Exception as e:
# logger.error(f"Error in main execution: {str(e)}")

@ -340,6 +340,7 @@ class Agent:
llm_args: dict = None,
load_state_path: str = None,
role: agent_roles = "worker",
no_print: bool = False,
*args,
**kwargs,
):
@ -457,6 +458,7 @@ class Agent:
self.llm_args = llm_args
self.load_state_path = load_state_path
self.role = role
self.no_print = no_print
# Initialize the short term memory
self.short_memory = Conversation(
@ -867,18 +869,19 @@ class Agent:
# # break
# Print
if self.streaming_on is True:
# self.stream_response(response)
formatter.print_panel_token_by_token(
f"{self.agent_name}: {response}",
title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]",
)
else:
# logger.info(f"Response: {response}")
formatter.print_panel(
f"{self.agent_name}: {response}",
f"Agent Name {self.agent_name} [Max Loops: {loop_count} ]",
)
if self.no_print is False:
if self.streaming_on is True:
# self.stream_response(response)
formatter.print_panel_token_by_token(
f"{self.agent_name}: {response}",
title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]",
)
else:
# logger.info(f"Response: {response}")
formatter.print_panel(
f"{self.agent_name}: {response}",
f"Agent Name {self.agent_name} [Max Loops: {loop_count} ]",
)
# Check if response is a dictionary and has 'choices' key
if (
@ -2606,3 +2609,10 @@ class Agent:
Get the role of the agent.
"""
return self.role
# def __getstate__(self):
# state = self.__dict__.copy()
# # Remove or replace unpicklable attributes.
# if '_queue' in state:
# del state['_queue']
# return state

@ -1,3 +1,4 @@
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
@ -5,7 +6,7 @@ from typing import Any, Callable, Dict, List, Optional
from pydantic import BaseModel, Field, ValidationError
from swarms import Agent
from swarms.structs.agent import Agent
from swarms.utils.loguru_logger import logger
@ -302,6 +303,25 @@ class AgentRegistry:
logger.error(f"Error: {e}")
raise e
def find_agent_by_id(self, agent_id: str) -> Optional[Agent]:
"""
Find an agent by its ID.
"""
return self.agents.get(agent_id)
def agents_to_json(self) -> str:
"""
Converts all agents in the registry to a JSON string.
Returns:
str: A JSON string representation of all agents, keyed by their names.
"""
agents_dict = {
name: agent.to_dict()
for name, agent in self.agents.items()
}
return json.dumps(agents_dict, indent=4)
def agent_to_py_model(self, agent: Agent):
"""
Converts an agent to a Pydantic model.
@ -328,3 +348,30 @@ class AgentRegistry:
)
self.agent_registry.agents.append(schema)
# if __name__ == "__main__":
# from swarms import Agent
# agent1 = Agent(agent_name="test_agent_1")
# agent2 = Agent(agent_name="test_agent_2")
# agent3 = Agent(agent_name="test_agent_3")
# print(f"Created agents: {agent1}, {agent2}, {agent3}")
# registry = AgentRegistry()
# print(f"Created agent registry: {registry}")
# registry.add(agent1)
# registry.add(agent2)
# registry.add(agent3)
# print(f"Added agents to registry: {agent1}, {agent2}, {agent3}")
# all_agents = registry.return_all_agents()
# print(f"All agents in registry: {all_agents}")
# found_agent1 = registry.find_agent_by_name("test_agent_1")
# found_agent2 = registry.find_agent_by_name("test_agent_2")
# found_agent3 = registry.find_agent_by_name("test_agent_3")
# print(f"Found agents by name: {found_agent1}, {found_agent2}, {found_agent3}")
# print(registry.agents_to_json())

@ -109,6 +109,7 @@ class ConcurrentWorkflow(BaseSwarm):
agent_responses: list = [],
auto_generate_prompts: bool = False,
max_workers: int = None,
user_interface: bool = True,
*args,
**kwargs,
):
@ -130,10 +131,15 @@ class ConcurrentWorkflow(BaseSwarm):
self.agent_responses = agent_responses
self.auto_generate_prompts = auto_generate_prompts
self.max_workers = max_workers or os.cpu_count()
self.user_interface = user_interface
self.tasks = [] # Initialize tasks list
self.reliability_check()
def disable_agent_prints(self):
for agent in self.agents:
agent.no_print = False
def reliability_check(self):
try:
logger.info("Starting reliability checks")
@ -176,62 +182,6 @@ class ConcurrentWorkflow(BaseSwarm):
agent.auto_generate_prompt = True
# @retry(wait=wait_exponential(min=2), stop=stop_after_attempt(3))
async def _run_agent(
self,
agent: Agent,
task: str,
executor: ThreadPoolExecutor,
) -> AgentOutputSchema:
"""
Runs a single agent with the given task and tracks its output and metadata with retry logic.
Args:
agent (Agent): The agent instance to run.
task (str): The task or query to give to the agent.
img (str): The image to be processed by the agent.
executor (ThreadPoolExecutor): The thread pool executor to use for running the agent task.
Returns:
AgentOutputSchema: The metadata and output from the agent's execution.
Example:
>>> async def run_agent_example():
>>> executor = ThreadPoolExecutor()
>>> agent_output = await workflow._run_agent(agent=Agent(), task="Example task", img="example.jpg", executor=executor)
>>> print(agent_output)
"""
start_time = datetime.now()
try:
loop = asyncio.get_running_loop()
output = await loop.run_in_executor(
executor,
agent.run,
task,
)
except Exception as e:
logger.error(
f"Error running agent {agent.agent_name}: {e}"
)
raise
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
agent_output = AgentOutputSchema(
run_id=uuid.uuid4().hex,
agent_name=agent.agent_name,
task=task,
output=output,
start_time=start_time,
end_time=end_time,
duration=duration,
)
logger.info(
f"Agent {agent.agent_name} completed task: {task} in {duration:.2f} seconds."
)
return agent_output
def transform_metadata_schema_to_str(
self, schema: MetadataSchema
@ -258,47 +208,6 @@ class ConcurrentWorkflow(BaseSwarm):
# Return the agent responses as a string
return "\n".join(self.agent_responses)
# @retry(wait=wait_exponential(min=2), stop=stop_after_attempt(3))
async def _execute_agents_concurrently(
self, task: str, img: str = None, *args, **kwargs
) -> MetadataSchema:
"""
Executes multiple agents concurrently with the same task, incorporating retry logic for failed executions.
Args:
task (str): The task or query to give to all agents.
img (str): The image to be processed by the agents.
Returns:
MetadataSchema: The aggregated metadata and outputs from all agents.
Example:
>>> async def execute_agents_example():
>>> metadata_schema = await workflow._execute_agents_concurrently(task="Example task", img="example.jpg")
>>> print(metadata_schema)
"""
with ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
tasks_to_run = [
self._run_agent(
agent=agent,
task=task,
executor=executor,
*args,
**kwargs,
)
for agent in self.agents
]
agent_outputs = await asyncio.gather(*tasks_to_run)
return MetadataSchema(
swarm_id=uuid.uuid4().hex,
task=task,
description=self.description,
agents=agent_outputs,
)
def save_metadata(self):
"""
Saves the metadata to a JSON file based on the auto_save flag.
@ -322,7 +231,7 @@ class ConcurrentWorkflow(BaseSwarm):
self, task: str, img: str = None, *args, **kwargs
) -> Union[Dict[str, Any], str]:
"""
Runs the workflow for the given task, executes agents concurrently, and saves metadata in a production-grade manner.
Runs the workflow for the given task, executes agents concurrently using ThreadPoolExecutor, and saves metadata.
Args:
task (str): The task or query to give to all agents.
@ -339,10 +248,50 @@ class ConcurrentWorkflow(BaseSwarm):
logger.info(
f"Running concurrent workflow with {len(self.agents)} agents."
)
self.output_schema = asyncio.run(
self._execute_agents_concurrently(
task, img, *args, **kwargs
def run_agent(agent: Agent, task: str) -> AgentOutputSchema:
start_time = datetime.now()
try:
output = agent.run(task)
except Exception as e:
logger.error(
f"Error running agent {agent.agent_name}: {e}"
)
raise
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
agent_output = AgentOutputSchema(
run_id=uuid.uuid4().hex,
agent_name=agent.agent_name,
task=task,
output=output,
start_time=start_time,
end_time=end_time,
duration=duration,
)
logger.info(
f"Agent {agent.agent_name} completed task: {task} in {duration:.2f} seconds."
)
return agent_output
with ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
agent_outputs = list(
executor.map(
lambda agent: run_agent(agent, task), self.agents
)
)
self.output_schema = MetadataSchema(
swarm_id=uuid.uuid4().hex,
task=task,
description=self.description,
agents=agent_outputs,
)
self.save_metadata()
@ -351,9 +300,7 @@ class ConcurrentWorkflow(BaseSwarm):
return self.transform_metadata_schema_to_str(
self.output_schema
)
else:
# Return metadata as a dictionary
return self.output_schema.model_dump_json(indent=4)
def run(
@ -390,7 +337,8 @@ class ConcurrentWorkflow(BaseSwarm):
self.tasks.append(task)
try:
return self._run(task, img, *args, **kwargs)
outputs = self._run(task, img, *args, **kwargs)
return outputs
except ValueError as e:
logger.error(f"Invalid device specified: {e}")
raise e
@ -523,26 +471,12 @@ class ConcurrentWorkflow(BaseSwarm):
# if __name__ == "__main__":
# # Assuming you've already initialized some agents outside of this class
# model = OpenAIChat(
# api_key=os.getenv("OPENAI_API_KEY"),
# model_name="gpt-4o-mini",
# temperature=0.1,
# )
# agents = [
# Agent(
# agent_name=f"Financial-Analysis-Agent-{i}",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
# llm=model,
# model_name="gpt-4o",
# max_loops=1,
# autosave=True,
# dashboard=False,
# verbose=True,
# dynamic_temperature_enabled=True,
# saved_state_path=f"finance_agent_{i}.json",
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# return_step_meta=False,
# )
# for i in range(3) # Adjust number of agents as needed
# ]

@ -1,7 +1,9 @@
import asyncio
import os
import threading
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import (
ThreadPoolExecutor,
)
from dataclasses import dataclass
from typing import Any, List
@ -413,29 +415,141 @@ def run_agents_with_tasks_concurrently(
)
# from joblib import Parallel, delayed
# def run_agents_joblib(
# agents: List[Any],
# tasks: List[str] = [],
# img: List[str] = None,
# max_workers: int = None,
# max_loops: int = 1,
# prefer: str = "threads",
# ) -> List[Any]:
# """
# Executes a list of agents with their corresponding tasks concurrently using joblib.
# Each agent is expected to have a .run() method that accepts at least:
# - task: A string indicating the task to execute.
# - img: (Optional) A string representing image input.
# Args:
# agents (List[Any]): A list of agent instances.
# tasks (List[str], optional): A list of task strings. If provided, each agent gets a task.
# If fewer tasks than agents, the first task is reused.
# img (List[str], optional): A list of image strings. If provided, each agent gets an image.
# If fewer images than agents, the first image is reused.
# max_workers (int, optional): The maximum number of processes to use.
# Defaults to all available CPU cores.
# max_loops (int, optional): Number of times to execute the whole batch.
# Returns:
# List[Any]: The list of results returned by each agents run() method.
# """
# max_workers = max_workers or os.cpu_count()
# results = []
# for _ in range(max_loops):
# results.extend(
# Parallel(n_jobs=max_workers, prefer=prefer)(
# delayed(lambda a, t, i: a.run(task=t, img=i))(
# agent,
# (
# tasks[idx]
# if tasks and idx < len(tasks)
# else (tasks[0] if tasks else "")
# ),
# (
# img[idx]
# if img and idx < len(img)
# else (img[0] if img else None)
# ),
# )
# for idx, agent in enumerate(agents)
# )
# )
# return results
# # Example usage:
# if __name__ == '__main__':
# # Dummy Agent class for demonstration.
# class Agent:
# def __init__(self, agent_name, max_loops, model_name):
# self.agent_name = agent_name
# self.max_loops = max_loops
# self.model_name = model_name
# def run(self, task: str, img: str = None) -> str:
# img_info = f" with image '{img}'" if img else ""
# return (f"{self.agent_name} using model '{self.model_name}' processed task: '{task}'{img_info}")
# # Create a few Agent instances.
# agents = [
# Agent(
# agent_name=f"Financial-Analysis-Agent_parallel_swarm{i}",
# max_loops=1,
# model_name="gpt-4o-mini",
# )
# for i in range(3)
# ]
# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria"
# outputs = run_agents_process_pool(agents, tasks=[task])
# for i, output in enumerate(outputs):
# print(f"Output from agent {i+1}:\n{output}")
# # Example usage:
# # Initialize your agents with the same model to avoid re-creating it
# agents = [
# Agent(
# agent_name=f"Financial-Analysis-Agent_parallel_swarm{i}",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
# llm=model,
# max_loops=1,
# autosave=True,
# dashboard=False,
# verbose=False,
# dynamic_temperature_enabled=False,
# saved_state_path=f"finance_agent_{i}.json",
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# return_step_meta=False,
# if __name__ == '__main__':
# # A sample agent class with a run method.
# class SampleAgent:
# def __init__(self, name):
# self.name = name
# def run(self, task, device, device_id, no_clusterops):
# # Simulate some processing.
# return (f"Agent {self.name} processed task '{task}' on {device} "
# f"(device_id={device_id}), no_clusterops={no_clusterops}")
# # Create a list of sample agents.
# agents = [SampleAgent(f"Agent_{i}") for i in range(5)]
# # Define tasks; if fewer tasks than agents, the first task will be reused.
# tasks = ["task1", "task2", "task3"]
# outputs = run_agents_with_tasks_concurrently(
# agents=agents,
# tasks=tasks,
# max_workers=4,
# device="cpu",
# device_id=1,
# all_cores=True,
# no_clusterops=False
# )
# for i in range(5) # Assuming you want 10 agents
# ]
# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria"
# outputs = run_agents_concurrently(agents, task)
# for output in outputs:
# print(output)
# for i, output in enumerate(outputs):
# print(f"Output from agent {i+1}:\n{output}")
# # Example usage:
# if __name__ == "__main__":
# # Initialize your agents (for example, 3 agents)
# agents = [
# Agent(
# agent_name=f"Financial-Analysis-Agent_parallel_swarm{i}",
# max_loops=1,
# model_name="gpt-4o-mini",
# )
# for i in range(3)
# ]
# # Generate a list of tasks.
# tasks = [
# "How can I establish a ROTH IRA to buy stocks and get a tax break?",
# "What are the criteria for establishing a ROTH IRA?",
# "What are the tax benefits of a ROTH IRA?",
# "How to buy stocks using a ROTH IRA?",
# "What are the limitations of a ROTH IRA?",
# ]
# outputs = run_agents_joblib(agents, tasks)

@ -113,6 +113,7 @@ class AgentRearrange(BaseSwarm):
all_gpus: bool = True,
no_use_clusterops: bool = True,
autosave: bool = True,
return_entire_history: bool = False,
*args,
**kwargs,
):
@ -141,7 +142,7 @@ class AgentRearrange(BaseSwarm):
self.all_gpus = all_gpus
self.no_use_clusterops = no_use_clusterops
self.autosave = autosave
self.return_entire_history = return_entire_history
self.output_schema = AgentRearrangeOutput(
input=AgentRearrangeInput(
swarm_id=id,
@ -465,6 +466,9 @@ class AgentRearrange(BaseSwarm):
if self.return_json:
return self.output_schema.model_dump_json(indent=4)
if self.return_entire_history:
return self.output_schema.model_dump_json(indent=4)
# Handle different output types
if self.output_type == "all":
output = " ".join(all_responses)

@ -33,6 +33,7 @@ class SequentialWorkflow:
output_type: OutputType = "all",
return_json: bool = False,
shared_memory_system: callable = None,
return_entire_history: bool = False,
*args,
**kwargs,
):
@ -43,6 +44,7 @@ class SequentialWorkflow:
self.output_type = output_type
self.return_json = return_json
self.shared_memory_system = shared_memory_system
self.return_entire_history = return_entire_history
self.reliability_check()
self.flow = self.sequential_flow()

@ -4,7 +4,6 @@ from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Union
from pydantic import BaseModel, Field
from tenacity import retry, stop_after_attempt, wait_fixed
from swarms.prompts.ag_prompt import aggregator_system_prompt
from swarms.structs.agent import Agent
@ -138,11 +137,12 @@ class SwarmRouter:
shared_memory_system: Any = None,
rules: str = None,
documents: List[str] = [], # A list of docs file paths
output_type: str = "string", # Md, PDF, Txt, csv
output_type: str = "all",
no_cluster_ops: bool = False,
speaker_fn: callable = None,
load_agents_from_csv: bool = False,
csv_file_path: str = None,
return_entire_history: bool = False,
*args,
**kwargs,
):
@ -164,6 +164,7 @@ class SwarmRouter:
self.logs = []
self.load_agents_from_csv = load_agents_from_csv
self.csv_file_path = csv_file_path
self.return_entire_history = return_entire_history
if self.load_agents_from_csv:
self.agents = AgentLoader(
@ -233,7 +234,6 @@ class SwarmRouter:
self._log("error", error_msg)
raise RuntimeError(error_msg) from e
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def reliability_check(self):
logger.info("Initializing reliability checks")
@ -256,6 +256,8 @@ class SwarmRouter:
SpreadSheetSwarm,
SequentialWorkflow,
ConcurrentWorkflow,
GroupChat,
MultiAgentRouter,
]:
"""
Dynamically create and return the specified swarm type or automatically match the best swarm type for a given task.
@ -286,6 +288,7 @@ class SwarmRouter:
flow=self.rearrange_flow,
return_json=self.return_json,
output_type=self.output_type,
return_entire_history=self.return_entire_history,
*args,
**kwargs,
)
@ -339,6 +342,7 @@ class SwarmRouter:
shared_memory_system=self.shared_memory_system,
output_type=self.output_type,
return_json=self.return_json,
return_entire_history=self.return_entire_history,
*args,
**kwargs,
)
@ -384,7 +388,6 @@ class SwarmRouter:
self.logs.append(log_entry)
logger.log(level.upper(), message)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def _run(self, task: str, img: str, *args, **kwargs) -> Any:
"""
Dynamically run the specified task on the selected or matched swarm type.

@ -11,10 +11,6 @@ import psutil
import requests
import toml
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="capture_sys_data")
# Helper functions
def generate_user_id():
@ -262,7 +258,8 @@ def capture_system_data() -> Dict[str, str]:
return system_data
except Exception as e:
logger.error("Failed to capture system data: {}", e)
# logger.error("Failed to capture system data: {}", e)
print(f"Failed to capture system data: {e}")
return {}

@ -16,6 +16,7 @@ from swarms.utils.parse_code import extract_code_from_markdown
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.try_except_wrapper import try_except_wrapper
from swarms.utils.calculate_func_metrics import profile_func
from swarms.utils.litellm_tokenizer import count_tokens
__all__ = [
@ -33,4 +34,5 @@ __all__ = [
"pdf_to_text",
"try_except_wrapper",
"profile_func",
"count_tokens",
]

@ -1,50 +1,26 @@
import os
import uuid
import sys
from loguru import logger
def initialize_logger(log_folder: str = "logs"):
AGENT_WORKSPACE = "agent_workspace"
# Check if WORKSPACE_DIR is set, if not, set it to AGENT_WORKSPACE
if "WORKSPACE_DIR" not in os.environ:
os.environ["WORKSPACE_DIR"] = AGENT_WORKSPACE
# Create a folder within the agent_workspace
log_folder_path = os.path.join(
os.getenv("WORKSPACE_DIR"), log_folder
)
if not os.path.exists(log_folder_path):
os.makedirs(log_folder_path)
# Generate a unique identifier for the log file
uuid_for_log = str(uuid.uuid4())
log_file_path = os.path.join(
log_folder_path, f"{log_folder}_{uuid_for_log}.log"
)
# Remove default handler and add custom handlers
# Remove default handler and add a combined handler
logger.remove()
# Add console handler with colors
# Add a combined console and file handler
logger.add(
sys.stdout,
colorize=True,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO",
)
# Add file handler
logger.add(
log_file_path,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="INFO",
backtrace=True,
diagnose=True,
enqueue=True,
retention="10 days",
# compression="zip",
# retention="10 days", # Removed this line
)
return logger
# logger = initialize_logger()
# logger.info("Hello, world!")

@ -1,7 +1,6 @@
import asyncio
from dataclasses import dataclass
from datetime import datetime
from queue import Queue
from typing import Any, Callable, Dict, List, Optional
import psutil
@ -19,6 +18,8 @@ from rich.table import Table
from rich.text import Text
from rich.tree import Tree
from swarms.structs.agent import Agent
try:
import pynvml
@ -30,59 +31,47 @@ except ImportError:
@dataclass
class SwarmMetadata:
name: str
description: str
version: str
type: str # hierarchical, parallel, sequential
created_at: datetime
author: str
tags: List[str]
primary_objective: str
secondary_objectives: List[str]
@dataclass
class Agent:
name: str
role: str
description: str
agent_type: str # e.g., "LLM", "Neural", "Rule-based"
capabilities: List[str]
parameters: Dict[str, any]
metadata: Dict[str, str]
children: List["Agent"] = None
parent: Optional["Agent"] = None
output_stream: Queue = None
name: Optional[str] = None
description: Optional[str] = None
version: Optional[str] = None
type: Optional[str] = None # hierarchical, parallel, sequential
created_at: Optional[datetime] = None
author: Optional[str] = None
tags: Optional[List[str]] = None
primary_objective: Optional[str] = None
secondary_objectives: Optional[List[str]] = None
def __post_init__(self):
self.children = self.children or []
self.output_stream = Queue()
@property
def hierarchy_level(self) -> int:
level = 0
current = self
while current.parent:
level += 1
current = current.parent
return level
self.tags = self.tags or []
self.secondary_objectives = self.secondary_objectives or []
self.created_at = self.created_at or datetime.now()
class SwarmVisualizationRich:
def __init__(
self,
swarm_metadata: SwarmMetadata,
root_agent: Agent,
agents: List[Agent],
update_resources: bool = True,
refresh_rate: float = 0.1,
):
"""
Initializes the visualizer with a list of agents.
Args:
swarm_metadata (SwarmMetadata): Metadata for the swarm.
agents (List[Agent]): List of root agents.
update_resources (bool): Whether to update system resource stats.
refresh_rate (float): Refresh rate for the live visualization.
"""
self.swarm_metadata = swarm_metadata
self.root_agent = root_agent
self.agents = agents
self.update_resources = update_resources
self.refresh_rate = refresh_rate
self.console = Console()
self.live = None
self.output_history = {}
# A dictionary mapping agent names to list of output messages
self.output_history: Dict[str, List[Dict[str, Any]]] = {}
# System monitoring
self.cores_available = 0
@ -100,96 +89,112 @@ class SwarmVisualizationRich:
minutes, seconds = divmod(remainder, 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def _build_agent_tree(
self, agent: Agent, tree: Optional[Tree] = None
) -> Tree:
"""Builds a detailed tree visualization of the agent hierarchy."""
def _build_agent_tree(self, agents: List[Agent]) -> Tree:
"""
Builds a detailed tree visualization for a list of agents.
Args:
agents (List[Agent]): The list of root agents.
Returns:
Tree: A rich Tree object displaying agent metadata.
"""
tree = Tree("[bold underline]Agents[/bold underline]")
for agent in agents:
self._add_agent_to_tree(agent, tree)
return tree
def _add_agent_to_tree(self, agent: Agent, tree: Tree) -> None:
"""
Recursively adds an agent and its children to the given tree.
Args:
agent (Agent): The agent to add.
tree (Tree): The tree to update.
"""
agent_info = [
f"[bold cyan]{agent.name}[/bold cyan]",
f"[yellow]Role:[/yellow] {agent.role}",
f"[green]Type:[/green] {agent.agent_type}",
f"[blue]Level:[/blue] {agent.hierarchy_level}",
f"[magenta]Capabilities:[/magenta] {', '.join(agent.capabilities)}",
]
# Add any custom metadata
for key, value in agent.metadata.items():
agent_info.append(f"[white]{key}:[/white] {value}")
# # Add any custom metadata from the agent (if available)
# for key, value in getattr(agent, "metadata", {}).items():
# agent_info.append(f"[white]{key}:[/white] {value}")
# Parameters summary
param_summary = ", ".join(
f"{k}: {v}" for k, v in agent.parameters.items()
)
agent_info.append(
f"[white]Parameters:[/white] {param_summary}"
)
# # Parameters summary if available
# parameters = getattr(agent, "parameters", {})
# if parameters:
# param_summary = ", ".join(f"{k}: {v}" for k, v in parameters.items())
# agent_info.append(f"[white]Parameters:[/white] {param_summary}")
node_text = "\n".join(agent_info)
branch = tree.add(node_text)
for child in getattr(agent, "children", []):
self._add_agent_to_tree(child, branch)
if tree is None:
tree = Tree(node_text)
else:
branch = tree.add(node_text)
tree = branch
for child in agent.children:
self._build_agent_tree(child, tree)
def _count_agents(self, agents: List[Agent]) -> int:
"""
Recursively counts total number of agents from a list of root agents.
return tree
Args:
agents (List[Agent]): List of agents.
def _count_agents(self, agent: Agent) -> int:
"""Recursively counts total number of agents in the swarm."""
count = 1 # Count current agent
for child in agent.children or []:
count += self._count_agents(child)
return count
Returns:
int: Total count of agents including children.
"""
return len(agents)
def _create_unified_info_panel(self) -> Panel:
"""Creates a unified panel showing both swarm metadata and architecture."""
# Create the main container
"""
Creates a unified panel showing swarm metadata and agents' metadata.
"""
info_layout = Layout()
info_layout.split_column(
Layout(name="metadata", size=13),
Layout(name="metadata", size=15),
Layout(name="architecture"),
)
# Calculate total agents
total_agents = self._count_agents(self.root_agent)
total_agents = self._count_agents(self.agents)
# Metadata section
metadata_table = Table.grid(padding=1, expand=True)
metadata_table.add_column("Label", style="bold cyan")
metadata_table.add_column("Value", style="white")
# System resources
# Update system resources if needed
if self.update_resources:
self._update_resource_stats()
# Add description with proper wrapping
# Wrap the description text properly
description_text = Text(
self.swarm_metadata.description, style="italic"
self.swarm_metadata.description or "", style="italic"
)
description_text.wrap(self.console, width=60, overflow="fold")
metadata_table.add_row("Swarm Name", self.swarm_metadata.name)
metadata_table.add_row(
"Swarm Name", self.swarm_metadata.name or "N/A"
)
metadata_table.add_row("Description", description_text)
metadata_table.add_row("Version", self.swarm_metadata.version)
metadata_table.add_row(
"Version", self.swarm_metadata.version or "N/A"
)
metadata_table.add_row("Total Agents", str(total_agents))
metadata_table.add_row("Author", self.swarm_metadata.author)
metadata_table.add_row(
"Author", self.swarm_metadata.author or "N/A"
)
metadata_table.add_row(
"System",
f"CPU: {self.cores_available} cores | Memory: {self.memory_usage}",
)
metadata_table.add_row(
"Primary Objective", self.swarm_metadata.primary_objective
"Primary Objective",
self.swarm_metadata.primary_objective or "N/A",
)
info_layout["metadata"].update(metadata_table)
info_layout["metadata"].update(metadata_table)
# Architecture section with tree visualization
architecture_tree = self._build_agent_tree(self.root_agent)
# Architecture section with the agent tree
architecture_tree = self._build_agent_tree(self.agents)
info_layout["architecture"].update(architecture_tree)
return Panel(
@ -198,12 +203,13 @@ class SwarmVisualizationRich:
)
def _create_outputs_panel(self) -> Panel:
"""Creates a panel that displays stacked message history for all agents."""
# Create a container for all messages across all agents
"""
Creates a panel that displays stacked message history for all agents.
"""
all_messages = []
def collect_agent_messages(agent: Agent):
"""Recursively collect messages from all agents."""
"""Recursively collect messages from an agent and its children."""
messages = self.output_history.get(agent.name, [])
for msg in messages:
all_messages.append(
@ -214,37 +220,31 @@ class SwarmVisualizationRich:
"style": msg["style"],
}
)
for child in agent.children:
for child in getattr(agent, "children", []):
collect_agent_messages(child)
# Collect all messages
collect_agent_messages(self.root_agent)
# Collect messages from every root agent
for agent in self.agents:
collect_agent_messages(agent)
# Sort messages by timestamp
all_messages.sort(key=lambda x: x["time"])
# Create the stacked message display
Layout()
messages_container = []
for msg in all_messages:
# Create a panel for each message
message_text = Text()
message_text.append(f"[{msg['time']}] ", style="dim")
message_text.append(
f"{msg['agent']}: ", style="bold cyan"
)
message_text.append(msg["content"], style=msg["style"])
messages_container.append(message_text)
# Join all messages with line breaks
if messages_container:
final_text = Text("\n").join(messages_container)
else:
final_text = Text("No messages yet...", style="dim")
# Create scrollable panel for all messages
return Panel(
final_text,
title="[bold]Agent Communication Log[/bold]",
@ -286,36 +286,30 @@ class SwarmVisualizationRich:
by_word: bool = False,
):
"""
Streams output for a specific agent with sophisticated token-by-token animation.
Streams output for a specific agent with token-by-token animation.
Args:
agent (Agent): The agent whose output is being streamed
text (str): The text to stream
title (Optional[str]): Custom title for the output panel
style (str): Style for the output text
delay (float): Delay between tokens
by_word (bool): If True, streams word by word instead of character by character
agent (Agent): The agent whose output is being streamed.
text (str): The text to stream.
title (Optional[str]): Custom title for the output panel.
style (str): Style for the output text.
delay (float): Delay between tokens.
by_word (bool): If True, stream word by word instead of character by character.
"""
display_text = Text(style=style)
current_output = ""
# Split into words or characters
tokens = text.split() if by_word else text
# Create a panel for this agent's output
title = title or f"{agent.name} Output"
for token in tokens:
# Add appropriate spacing
token_with_space = token + (" " if by_word else "")
current_output += token_with_space
display_text.append(token_with_space)
# Initialize history list if it doesn't exist
if agent.name not in self.output_history:
self.output_history[agent.name] = []
# Store the complete message when finished
if token == tokens[-1]:
timestamp = datetime.now().strftime("%H:%M:%S")
self.output_history[agent.name].append(
@ -326,11 +320,19 @@ class SwarmVisualizationRich:
}
)
# Update live display if active
if self.live:
self.live.update(self._create_layout())
await asyncio.sleep(delay)
def log_agent_output(self, agent: Agent, text: str):
asyncio.create_task(
self.stream_output(
agent=agent,
text=text,
title=f"{agent.name} Output {agent.max_loops}",
)
)
async def print_progress(
self,
description: str,
@ -342,13 +344,13 @@ class SwarmVisualizationRich:
Displays a progress spinner while executing a task.
Args:
description (str): Task description
task_fn (Callable): Function to execute
*args (Any): Arguments for task_fn
**kwargs (Any): Keyword arguments for task_fn
description (str): Task description.
task_fn (Callable): Function to execute.
*args (Any): Arguments for task_fn.
**kwargs (Any): Keyword arguments for task_fn.
Returns:
Any: Result of task_fn
Any: The result of task_fn.
"""
progress = Progress(
SpinnerColumn(),
@ -393,10 +395,12 @@ class SwarmVisualizationRich:
asyncio.create_task(
self.stream_output(agent, new_output)
)
for child in agent.children:
for child in getattr(agent, "children", []):
process_agent_streams(child)
process_agent_streams(self.root_agent)
# Process streams for each root agent
for agent in self.agents:
process_agent_streams(agent)
await asyncio.sleep(self.refresh_rate)

@ -0,0 +1,67 @@
import asyncio
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.utils.visualizer import (
SwarmVisualizationRich,
SwarmMetadata,
) # Replace with your actual module name
# Create two example agents
agent1 = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
model_name="gpt-4o-mini",
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="finance_agent.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
output_type="string",
streaming_on=False,
)
# Create a second dummy agent for demonstration
agent2 = Agent(
agent_name="Stock-Advisor-Agent",
system_prompt="Provide stock market insights and investment advice.",
model_name="gpt-4o-mini",
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="stock_agent.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
output_type="string",
streaming_on=False,
)
# Create swarm metadata
metadata = SwarmMetadata(
name="Financial Swarm",
description="A swarm of agents focused on financial analysis and stock market advice.",
version="1.0",
author="Your Name",
primary_objective="Provide comprehensive financial and investment analysis.",
)
# Instantiate the visualizer with a list of agents
visualizer = SwarmVisualizationRich(
swarm_metadata=metadata,
agents=[agent1, agent2],
update_resources=True,
refresh_rate=0.1,
)
# Start the visualization
asyncio.run(visualizer.start())
Loading…
Cancel
Save