diff --git a/.gitignore b/.gitignore index 28e8b8d5..313a28f5 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,8 @@ video/ artifacts_three dataframe/ .ruff_cache +target/ +Cargo.lock .pytest_cache static/generated runs diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 4f1e4bb3..ce410146 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -275,6 +275,7 @@ nav: - Deploying Swarms on Google Cloud Run: "swarms_cloud/cloud_run.md" # - Swarms Cloud CLI: "swarms_cloud/cli.md" - Swarm APIs: + - Swarms API: "swarms_cloud/swarms_api.md" - MCS API: "swarms_cloud/mcs_api.md" - CreateNow API: "swarms_cloud/create_api.md" - Swarms Memory: diff --git a/docs/swarms_cloud/swarms_api.md b/docs/swarms_cloud/swarms_api.md new file mode 100644 index 00000000..5ae7f2d7 --- /dev/null +++ b/docs/swarms_cloud/swarms_api.md @@ -0,0 +1,675 @@ +# Swarms API Documentation + +The Swarms API is a powerful REST API designed to help you create, manage, and execute various types of swarms efficiently. Whether you need to run tasks sequentially, concurrently, or in a custom workflow, the Swarms API has you covered. + +### Key Features: +- **Sequential Swarms**: Execute tasks one after another in a defined order. +- **Concurrent Swarms**: Run multiple tasks simultaneously to save time and resources. +- **Custom Workflows**: Design your own swarm workflows to fit your specific needs. + +To get started, find your API key in the Swarms Cloud dashboard. [Get your API key here](https://swarms.world/platform/api-keys) + +## Base URL +``` +https://swarms-api-285321057562.us-east1.run.app +``` + +## Authentication +All API requests (except `/health`) require authentication using an API key passed in the `x-api-key` header: + +```http +x-api-key: your_api_key_here +``` + +## Endpoints + +### Health Check +Check if the API is operational. + +**Endpoint:** `GET /health` +**Authentication Required:** No +**Response:** +```json +{ + "status": "ok" +} +``` + +### Single Swarm Completion +Run a single swarm with specified agents and tasks. + +**Endpoint:** `POST /v1/swarm/completions` +**Authentication Required:** Yes + +#### Request Parameters + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| name | string | Optional | Name of the swarm (max 100 chars) | +| description | string | Optional | Description of the swarm (max 500 chars) | +| agents | array | Required | Array of agent configurations | +| max_loops | integer | Optional | Maximum number of iterations | +| swarm_type | string | Optional | Type of swarm workflow | +| task | string | Required | The task to be performed | +| img | string | Optional | Image URL if relevant | + +#### Agent Configuration Parameters + +| Parameter | Type | Required | Default | Description | +|-----------|------|----------|---------|-------------| +| agent_name | string | Required | - | Name of the agent (max 100 chars) | +| description | string | Optional | - | Description of the agent (max 500 chars) | +| system_prompt | string | Optional | - | System prompt for the agent (max 500 chars) | +| model_name | string | Optional | "gpt-4o" | Model to be used by the agent | +| auto_generate_prompt | boolean | Optional | false | Whether to auto-generate prompts | +| max_tokens | integer | Optional | - | Maximum tokens for response | +| temperature | float | Optional | 0.5 | Temperature for response generation | +| role | string | Optional | "worker" | Role of the agent | +| max_loops | integer | Optional | 1 | Maximum iterations for this agent | + +#### Example Request +```json +{ + "name": "Test Swarm", + "description": "A test swarm", + "agents": [ + { + "agent_name": "Research Agent", + "description": "Conducts research", + "system_prompt": "You are a research assistant.", + "model_name": "gpt-4o", + "role": "worker", + "max_loops": 1 + } + ], + "max_loops": 1, + "swarm_type": "ConcurrentWorkflow", + "task": "Write a short blog post about AI agents." +} +``` + +#### Response Structure + +| Field | Type | Description | +|-------|------|-------------| +| status | string | Status of the swarm execution | +| swarm_name | string | Name of the executed swarm | +| description | string | Description of the swarm | +| task | string | Original task description | +| metadata | object | Execution metadata | +| output | object/array | Results from the swarm execution | + +### Batch Swarm Completion +Run multiple swarms in a single request. + +**Endpoint:** `POST /v1/swarm/batch/completions` +**Authentication Required:** Yes + +#### Request Format +Array of swarm configurations, each following the same format as single swarm completion. + +#### Example Batch Request +```json +[ + { + "name": "Batch Swarm 1", + "description": "First swarm in batch", + "agents": [...], + "task": "Task 1" + }, + { + "name": "Batch Swarm 2", + "description": "Second swarm in batch", + "agents": [...], + "task": "Task 2" + } +] +``` +# Swarms API Implementation Examples + +## Python +### Using requests + +```python +import requests +import json + +API_KEY = "your_api_key_here" +BASE_URL = "https://swarms-api-285321057562.us-east1.run.app" + +headers = { + "x-api-key": API_KEY, + "Content-Type": "application/json" +} + +def run_single_swarm(): + payload = { + "name": "Financial Analysis Swarm", + "description": "Market analysis swarm", + "agents": [ + { + "agent_name": "Market Analyst", + "description": "Analyzes market trends", + "system_prompt": "You are a financial analyst expert.", + "model_name": "gpt-4o", + "role": "worker", + "max_loops": 1 + } + ], + "max_loops": 1, + "swarm_type": "SequentialWorkflow", + "task": "Analyze current market trends in tech sector" + } + + response = requests.post( + f"{BASE_URL}/v1/swarm/completions", + headers=headers, + json=payload + ) + + return response.json() + +def run_batch_swarms(): + payload = [ + { + "name": "Market Analysis", + "description": "First swarm", + "agents": [ + { + "agent_name": "Analyst", + "system_prompt": "You are a market analyst.", + "model_name": "gpt-4o", + "role": "worker" + } + ], + "task": "Analyze tech trends" + }, + { + "name": "Risk Assessment", + "description": "Second swarm", + "agents": [ + { + "agent_name": "Risk Analyst", + "system_prompt": "You are a risk analyst.", + "model_name": "gpt-4o", + "role": "worker" + } + ], + "task": "Assess market risks" + } + ] + + response = requests.post( + f"{BASE_URL}/v1/swarm/batch/completions", + headers=headers, + json=payload + ) + + return response.json() + +# Using async/await with aiohttp +import aiohttp +import asyncio + +async def run_swarm_async(): + async with aiohttp.ClientSession() as session: + async with session.post( + f"{BASE_URL}/v1/swarm/completions", + headers=headers, + json=payload + ) as response: + return await response.json() +``` + +## Node.js +### Using Fetch API + +```javascript +const API_KEY = 'your_api_key_here'; +const BASE_URL = 'https://swarms-api-285321057562.us-east1.run.app'; + +const headers = { + 'x-api-key': API_KEY, + 'Content-Type': 'application/json' +}; + +// Single swarm execution +async function runSingleSwarm() { + const payload = { + name: 'Financial Analysis', + description: 'Market analysis swarm', + agents: [ + { + agent_name: 'Market Analyst', + description: 'Analyzes market trends', + system_prompt: 'You are a financial analyst expert.', + model_name: 'gpt-4o', + role: 'worker', + max_loops: 1 + } + ], + max_loops: 1, + swarm_type: 'SequentialWorkflow', + task: 'Analyze current market trends' + }; + + try { + const response = await fetch(`${BASE_URL}/v1/swarm/completions`, { + method: 'POST', + headers, + body: JSON.stringify(payload) + }); + + return await response.json(); + } catch (error) { + console.error('Error:', error); + throw error; + } +} + +// Batch swarm execution +async function runBatchSwarms() { + const payload = [ + { + name: 'Market Analysis', + agents: [{ + agent_name: 'Analyst', + system_prompt: 'You are a market analyst.', + model_name: 'gpt-4o', + role: 'worker' + }], + task: 'Analyze tech trends' + }, + { + name: 'Risk Assessment', + agents: [{ + agent_name: 'Risk Analyst', + system_prompt: 'You are a risk analyst.', + model_name: 'gpt-4o', + role: 'worker' + }], + task: 'Assess market risks' + } + ]; + + try { + const response = await fetch(`${BASE_URL}/v1/swarm/batch/completions`, { + method: 'POST', + headers, + body: JSON.stringify(payload) + }); + + return await response.json(); + } catch (error) { + console.error('Error:', error); + throw error; + } +} +``` + +### Using Axios + +```javascript +const axios = require('axios'); + +const api = axios.create({ + baseURL: BASE_URL, + headers: { + 'x-api-key': API_KEY, + 'Content-Type': 'application/json' + } +}); + +async function runSwarm() { + try { + const response = await api.post('/v1/swarm/completions', payload); + return response.data; + } catch (error) { + console.error('Error:', error.response?.data || error.message); + throw error; + } +} +``` + +## Go + +```go +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" +) + +const ( + baseURL = "https://swarms-api-285321057562.us-east1.run.app" + apiKey = "your_api_key_here" +) + +type Agent struct { + AgentName string `json:"agent_name"` + Description string `json:"description"` + SystemPrompt string `json:"system_prompt"` + ModelName string `json:"model_name"` + Role string `json:"role"` + MaxLoops int `json:"max_loops"` +} + +type SwarmRequest struct { + Name string `json:"name"` + Description string `json:"description"` + Agents []Agent `json:"agents"` + MaxLoops int `json:"max_loops"` + SwarmType string `json:"swarm_type"` + Task string `json:"task"` +} + +func runSingleSwarm() ([]byte, error) { + payload := SwarmRequest{ + Name: "Financial Analysis", + Description: "Market analysis swarm", + Agents: []Agent{ + { + AgentName: "Market Analyst", + Description: "Analyzes market trends", + SystemPrompt: "You are a financial analyst expert.", + ModelName: "gpt-4o", + Role: "worker", + MaxLoops: 1, + }, + }, + MaxLoops: 1, + SwarmType: "SequentialWorkflow", + Task: "Analyze current market trends", + } + + jsonPayload, err := json.Marshal(payload) + if err != nil { + return nil, err + } + + client := &http.Client{} + req, err := http.NewRequest("POST", baseURL+"/v1/swarm/completions", bytes.NewBuffer(jsonPayload)) + if err != nil { + return nil, err + } + + req.Header.Set("x-api-key", apiKey) + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + return ioutil.ReadAll(resp.Body) +} + +func main() { + response, err := runSingleSwarm() + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + + fmt.Printf("Response: %s\n", response) +} +``` + +## Rust + +```rust +use reqwest::{Client, header}; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +const BASE_URL: &str = "https://swarms-api-285321057562.us-east1.run.app"; +const API_KEY: &str = "your_api_key_here"; + +#[derive(Serialize, Deserialize)] +struct Agent { + agent_name: String, + description: String, + system_prompt: String, + model_name: String, + role: String, + max_loops: i32, +} + +#[derive(Serialize, Deserialize)] +struct SwarmRequest { + name: String, + description: String, + agents: Vec, + max_loops: i32, + swarm_type: String, + task: String, +} + +async fn run_single_swarm() -> Result> { + let client = Client::new(); + + let payload = SwarmRequest { + name: "Financial Analysis".to_string(), + description: "Market analysis swarm".to_string(), + agents: vec![Agent { + agent_name: "Market Analyst".to_string(), + description: "Analyzes market trends".to_string(), + system_prompt: "You are a financial analyst expert.".to_string(), + model_name: "gpt-4o".to_string(), + role: "worker".to_string(), + max_loops: 1, + }], + max_loops: 1, + swarm_type: "SequentialWorkflow".to_string(), + task: "Analyze current market trends".to_string(), + }; + + let response = client + .post(format!("{}/v1/swarm/completions", BASE_URL)) + .header("x-api-key", API_KEY) + .header("Content-Type", "application/json") + .json(&payload) + .send() + .await?; + + let result = response.text().await?; + Ok(result) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let response = run_single_swarm().await?; + println!("Response: {}", response); + Ok(()) +} +``` + +## C# + +```csharp +using System; +using System.Net.Http; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; + +public class SwarmClient +{ + private readonly HttpClient _client; + private const string BaseUrl = "https://swarms-api-285321057562.us-east1.run.app"; + private readonly string _apiKey; + + public SwarmClient(string apiKey) + { + _apiKey = apiKey; + _client = new HttpClient(); + _client.DefaultRequestHeaders.Add("x-api-key", apiKey); + } + + public class Agent + { + public string AgentName { get; set; } + public string Description { get; set; } + public string SystemPrompt { get; set; } + public string ModelName { get; set; } + public string Role { get; set; } + public int MaxLoops { get; set; } + } + + public class SwarmRequest + { + public string Name { get; set; } + public string Description { get; set; } + public List Agents { get; set; } + public int MaxLoops { get; set; } + public string SwarmType { get; set; } + public string Task { get; set; } + } + + public async Task RunSingleSwarm() + { + var payload = new SwarmRequest + { + Name = "Financial Analysis", + Description = "Market analysis swarm", + Agents = new List + { + new Agent + { + AgentName = "Market Analyst", + Description = "Analyzes market trends", + SystemPrompt = "You are a financial analyst expert.", + ModelName = "gpt-4o", + Role = "worker", + MaxLoops = 1 + } + }, + MaxLoops = 1, + SwarmType = "SequentialWorkflow", + Task = "Analyze current market trends" + }; + + var content = new StringContent( + JsonSerializer.Serialize(payload), + Encoding.UTF8, + "application/json" + ); + + var response = await _client.PostAsync( + $"{BaseUrl}/v1/swarm/completions", + content + ); + + return await response.Content.ReadAsStringAsync(); + } +} + +// Usage +class Program +{ + static async Task Main(string[] args) + { + var client = new SwarmClient("your_api_key_here"); + var response = await client.RunSingleSwarm(); + Console.WriteLine($"Response: {response}"); + } +} +``` + +## Shell (cURL) + +```bash +# Single swarm execution +curl -X POST "https://swarms-api-285321057562.us-east1.run.app/v1/swarm/completions" \ + -H "x-api-key: your_api_key_here" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "Financial Analysis", + "description": "Market analysis swarm", + "agents": [ + { + "agent_name": "Market Analyst", + "description": "Analyzes market trends", + "system_prompt": "You are a financial analyst expert.", + "model_name": "gpt-4o", + "role": "worker", + "max_loops": 1 + } + ], + "max_loops": 1, + "swarm_type": "SequentialWorkflow", + "task": "Analyze current market trends" + }' + +# Batch swarm execution +curl -X POST "https://swarms-api-285321057562.us-east1.run.app/v1/swarm/batch/completions" \ + -H "x-api-key: your_api_key_here" \ + -H "Content-Type: application/json" \ + -d '[ + { + "name": "Market Analysis", + "agents": [{ + "agent_name": "Analyst", + "system_prompt": "You are a market analyst.", + "model_name": "gpt-4o", + "role": "worker" + }], + "task": "Analyze tech trends" + }, + { + "name": "Risk Assessment", + "agents": [{ + "agent_name": "Risk Analyst", + "system_prompt": "You are a risk analyst.", + "model_name": "gpt-4o", + "role": "worker" + }], + "task": "Assess market risks" + } + ]' +``` + + +## Billing and Credits + +The API uses a credit-based billing system with the following components: + +### Cost Calculation + +| Component | Cost | +|-----------|------| +| Base cost per agent | $0.01 | +| Input tokens (per 1M) | $5.00 | +| Output tokens (per 1M) | $15.50 | + +Credits are deducted based on: +- Number of agents used +- Total input tokens (including system prompts and agent memory) +- Total output tokens generated +- Execution time + +### Credit Types +- Free credits: Used first +- Regular credits: Used after free credits are exhausted + +## Error Handling + +| HTTP Status Code | Description | +|-----------------|-------------| +| 402 | Insufficient credits | +| 403 | Invalid API key | +| 404 | Resource not found | +| 500 | Internal server error | + +## Best Practices + +1. Start with small swarms and gradually increase complexity +2. Monitor credit usage and token counts +3. Use appropriate max_loops values to control execution +4. Implement proper error handling for API responses +5. Consider using batch completions for multiple related tasks \ No newline at end of file diff --git a/example.py b/example.py index c52f49ae..f9182e14 100644 --- a/example.py +++ b/example.py @@ -24,7 +24,7 @@ agent = Agent( max_tokens=4000, # max output tokens saved_state_path="agent_00.json", interactive=False, - roles="director", + role="director", ) agent.run( diff --git a/pyproject.toml b/pyproject.toml index 6afd6211..4e8d63d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "7.2.7" +version = "7.4.0" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] @@ -75,6 +75,7 @@ aiofiles = "*" rich = "*" numpy = "*" litellm = "*" +torch = "*" [tool.poetry.scripts] swarms = "swarms.cli.main:main" @@ -114,3 +115,10 @@ exclude = ''' )/ ''' + + +[tool.maturin] +module-name = "swarms_rust" + +[tool.maturin.build] +features = ["extension-module"] diff --git a/swarm_router.py b/swarm_router.py new file mode 100644 index 00000000..acec8d55 --- /dev/null +++ b/swarm_router.py @@ -0,0 +1,27 @@ +from swarms import Agent, SwarmRouter + + +agents = [ + Agent( + agent_name="test_agent_1", + agent_description="test_agent_1_description", + system_prompt="test_agent_1_system_prompt", + model_name="gpt-4o", + ), + Agent( + agent_name="test_agent_2", + agent_description="test_agent_2_description", + system_prompt="test_agent_2_system_prompt", + model_name="gpt-4o", + ), + Agent( + agent_name="test_agent_3", + agent_description="test_agent_3_description", + system_prompt="test_agent_3_system_prompt", + model_name="gpt-4o", + ), +] + +router = SwarmRouter(agents=agents, swarm_type="SequentialWorkflow") + +print(router.run("How are you doing?")) diff --git a/swarms/agents/__init__.py b/swarms/agents/__init__.py index 68f75f99..835954de 100644 --- a/swarms/agents/__init__.py +++ b/swarms/agents/__init__.py @@ -10,13 +10,13 @@ from swarms.structs.stopping_conditions import ( check_stopped, check_success, ) -from swarms.agents.tool_agent import ToolAgent +# from swarms.agents.tool_agent import ToolAgent from swarms.agents.create_agents_from_yaml import ( create_agents_from_yaml, ) __all__ = [ - "ToolAgent", + # "ToolAgent", "check_done", "check_finished", "check_complete", diff --git a/swarms/structs/Talk_Hier.py b/swarms/structs/Talk_Hier.py index 23048ce8..e06d86c9 100644 --- a/swarms/structs/Talk_Hier.py +++ b/swarms/structs/Talk_Hier.py @@ -31,7 +31,8 @@ class AgentRole(Enum): @dataclass class CommunicationEvent: """Represents a structured communication event between agents.""" - message: str + + message: str background: Optional[str] = None intermediate_output: Optional[Dict[str, Any]] = None sender: str = "" @@ -96,7 +97,7 @@ class TalkHier: import re json_match = re.search(r"\{.*\}", json_str, re.DOTALL) - if (json_match): + if json_match: return json.loads(json_match.group()) # Try extracting from markdown code blocks code_block_match = re.search( @@ -104,7 +105,7 @@ class TalkHier: json_str, re.DOTALL, ) - if (code_block_match): + if code_block_match: return json.loads(code_block_match.group(1)) except Exception as e: logger.warning(f"Failed to extract JSON: {str(e)}") @@ -173,7 +174,9 @@ Output all responses in strict JSON format: system_prompt=self._get_criteria_generator_prompt(), model_name=self.model_name, max_loops=1, - saved_state_path=str(self.base_path / "criteria_generator.json"), + saved_state_path=str( + self.base_path / "criteria_generator.json" + ), verbose=True, ) @@ -350,77 +353,97 @@ Output all responses in strict JSON format: } }""" - def _generate_criteria_for_task(self, task: str) -> Dict[str, Any]: + def _generate_criteria_for_task( + self, task: str + ) -> Dict[str, Any]: """Generate evaluation criteria for the given task.""" try: criteria_input = { "task": task, - "instruction": "Generate specific evaluation criteria for this task." + "instruction": "Generate specific evaluation criteria for this task.", } - - criteria_response = self.criteria_generator.run(json.dumps(criteria_input)) + + criteria_response = self.criteria_generator.run( + json.dumps(criteria_input) + ) self.conversation.add( - role="Criteria-Generator", - content=criteria_response + role="Criteria-Generator", content=criteria_response ) - + return self._safely_parse_json(criteria_response) except Exception as e: logger.error(f"Error generating criteria: {str(e)}") return {"criteria": {}} - def _create_comm_event(self, sender: Agent, receiver: Agent, response: Dict) -> CommunicationEvent: + def _create_comm_event( + self, sender: Agent, receiver: Agent, response: Dict + ) -> CommunicationEvent: """Create a structured communication event between agents.""" return CommunicationEvent( message=response.get("message", ""), background=response.get("background", ""), - intermediate_output=response.get("intermediate_output", {}), + intermediate_output=response.get( + "intermediate_output", {} + ), sender=sender.agent_name, receiver=receiver.agent_name, ) - def _evaluate_content(self, content: Union[str, Dict], task: str) -> Dict[str, Any]: + def _evaluate_content( + self, content: Union[str, Dict], task: str + ) -> Dict[str, Any]: """Coordinate evaluation process with parallel evaluator execution.""" try: - content_dict = self._safely_parse_json(content) if isinstance(content, str) else content + content_dict = ( + self._safely_parse_json(content) + if isinstance(content, str) + else content + ) criteria_data = self._generate_criteria_for_task(task) def run_evaluator(evaluator, eval_input): response = evaluator.run(json.dumps(eval_input)) return { - "evaluator_id": evaluator.agent_name, - "evaluation": self._safely_parse_json(response) + "evaluator_id": evaluator.agent_name, + "evaluation": self._safely_parse_json(response), } - eval_inputs = [{ - "task": task, - "content": content_dict, - "criteria": criteria_data.get("criteria", {}) - } for _ in self.evaluators] + eval_inputs = [ + { + "task": task, + "content": content_dict, + "criteria": criteria_data.get("criteria", {}), + } + for _ in self.evaluators + ] with ThreadPoolExecutor() as executor: - evaluations = list(executor.map( - lambda x: run_evaluator(*x), - zip(self.evaluators, eval_inputs) - )) + evaluations = list( + executor.map( + lambda x: run_evaluator(*x), + zip(self.evaluators, eval_inputs), + ) + ) supervisor_input = { "evaluations": evaluations, "task": task, - "instruction": "Synthesize feedback" + "instruction": "Synthesize feedback", } - supervisor_response = self.main_supervisor.run(json.dumps(supervisor_input)) - aggregated_eval = self._safely_parse_json(supervisor_response) + supervisor_response = self.main_supervisor.run( + json.dumps(supervisor_input) + ) + aggregated_eval = self._safely_parse_json( + supervisor_response + ) # Track communication comm_event = self._create_comm_event( - self.main_supervisor, - self.revisor, - aggregated_eval + self.main_supervisor, self.revisor, aggregated_eval ) self.conversation.add( role="Communication", - content=json.dumps(asdict(comm_event)) + content=json.dumps(asdict(comm_event)), ) return aggregated_eval @@ -455,11 +478,15 @@ Output all responses in strict JSON format: # Collect all unique criteria from evaluators all_criteria = set() for eval_data in evaluations: - categories = eval_data.get("scores", {}).get("categories", {}) + categories = eval_data.get("scores", {}).get( + "categories", {} + ) all_criteria.update(categories.keys()) # Initialize score aggregation - aggregated_scores = {criterion: [] for criterion in all_criteria} + aggregated_scores = { + criterion: [] for criterion in all_criteria + } overall_scores = [] all_feedback = [] @@ -467,7 +494,7 @@ Output all responses in strict JSON format: for eval_data in evaluations: scores = eval_data.get("scores", {}) overall_scores.append(scores.get("overall", 0.5)) - + categories = scores.get("categories", {}) for criterion in all_criteria: if criterion in categories: @@ -508,32 +535,40 @@ Output all responses in strict JSON format: try: # Get evaluations and supervisor selection evaluation_result = self._evaluate_content(content, task) - + # Extract selected evaluation and supervisor reasoning - selected_evaluation = evaluation_result.get("selected_evaluation", {}) - supervisor_reasoning = evaluation_result.get("supervisor_reasoning", {}) - + selected_evaluation = evaluation_result.get( + "selected_evaluation", {} + ) + supervisor_reasoning = evaluation_result.get( + "supervisor_reasoning", {} + ) + # Prepare revision input with selected evaluation revision_input = { "content": content, "evaluation": selected_evaluation, "supervisor_feedback": supervisor_reasoning, - "instruction": "Revise the content based on the selected evaluation feedback" + "instruction": "Revise the content based on the selected evaluation feedback", } # Get revision from content generator - revision_response = self.generator.run(json.dumps(revision_input)) - revised_content = self._safely_parse_json(revision_response) + revision_response = self.generator.run( + json.dumps(revision_input) + ) + revised_content = self._safely_parse_json( + revision_response + ) return { "content": revised_content, - "evaluation": evaluation_result + "evaluation": evaluation_result, } except Exception as e: logger.error(f"Evaluation and revision error: {str(e)}") return { "content": content, - "evaluation": self._get_fallback_evaluation() + "evaluation": self._get_fallback_evaluation(), } def run(self, task: str) -> Dict[str, Any]: @@ -579,22 +614,34 @@ Output all responses in strict JSON format: logger.info(f"Starting iteration {iteration + 1}") # Evaluate and revise content - result = self._evaluate_and_revise(current_content, task) + result = self._evaluate_and_revise( + current_content, task + ) evaluation = result["evaluation"] current_content = result["content"] # Check if quality threshold is met - selected_eval = evaluation.get("selected_evaluation", {}) - overall_score = selected_eval.get("scores", {}).get("overall", 0.0) - + selected_eval = evaluation.get( + "selected_evaluation", {} + ) + overall_score = selected_eval.get("scores", {}).get( + "overall", 0.0 + ) + if overall_score >= self.quality_threshold: - logger.info("Quality threshold met, returning content") + logger.info( + "Quality threshold met, returning content" + ) return { - "content": current_content.get("content", {}).get("main_body", ""), + "content": current_content.get( + "content", {} + ).get("main_body", ""), "final_score": overall_score, "iterations": iteration + 1, "metadata": { - "content_metadata": current_content.get("content", {}).get("metadata", {}), + "content_metadata": current_content.get( + "content", {} + ).get("metadata", {}), "evaluation": evaluation, }, } @@ -665,18 +712,18 @@ Output all responses in strict JSON format: ) -if __name__ == "__main__": - try: - talkhier = TalkHier( - max_iterations=1, - quality_threshold=0.8, - model_name="gpt-4o", - return_string=False, - ) +# if __name__ == "__main__": +# try: +# talkhier = TalkHier( +# max_iterations=1, +# quality_threshold=0.8, +# model_name="gpt-4o", +# return_string=False, +# ) - # Ask for user input - task = input("Enter the content generation task description: ") - result = talkhier.run(task) +# # Ask for user input +# task = input("Enter the content generation task description: ") +# result = talkhier.run(task) - except Exception as e: - logger.error(f"Error in main execution: {str(e)}") \ No newline at end of file +# except Exception as e: +# logger.error(f"Error in main execution: {str(e)}") diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index db77e562..5a1bfb3d 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -340,6 +340,7 @@ class Agent: llm_args: dict = None, load_state_path: str = None, role: agent_roles = "worker", + no_print: bool = False, *args, **kwargs, ): @@ -457,6 +458,7 @@ class Agent: self.llm_args = llm_args self.load_state_path = load_state_path self.role = role + self.no_print = no_print # Initialize the short term memory self.short_memory = Conversation( @@ -867,18 +869,19 @@ class Agent: # # break # Print - if self.streaming_on is True: - # self.stream_response(response) - formatter.print_panel_token_by_token( - f"{self.agent_name}: {response}", - title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]", - ) - else: - # logger.info(f"Response: {response}") - formatter.print_panel( - f"{self.agent_name}: {response}", - f"Agent Name {self.agent_name} [Max Loops: {loop_count} ]", - ) + if self.no_print is False: + if self.streaming_on is True: + # self.stream_response(response) + formatter.print_panel_token_by_token( + f"{self.agent_name}: {response}", + title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]", + ) + else: + # logger.info(f"Response: {response}") + formatter.print_panel( + f"{self.agent_name}: {response}", + f"Agent Name {self.agent_name} [Max Loops: {loop_count} ]", + ) # Check if response is a dictionary and has 'choices' key if ( @@ -2606,3 +2609,10 @@ class Agent: Get the role of the agent. """ return self.role + + # def __getstate__(self): + # state = self.__dict__.copy() + # # Remove or replace unpicklable attributes. + # if '_queue' in state: + # del state['_queue'] + # return state diff --git a/swarms/structs/agent_registry.py b/swarms/structs/agent_registry.py index 09348622..a32b5865 100644 --- a/swarms/structs/agent_registry.py +++ b/swarms/structs/agent_registry.py @@ -1,3 +1,4 @@ +import json import time from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock @@ -5,7 +6,7 @@ from typing import Any, Callable, Dict, List, Optional from pydantic import BaseModel, Field, ValidationError -from swarms import Agent +from swarms.structs.agent import Agent from swarms.utils.loguru_logger import logger @@ -302,6 +303,25 @@ class AgentRegistry: logger.error(f"Error: {e}") raise e + def find_agent_by_id(self, agent_id: str) -> Optional[Agent]: + """ + Find an agent by its ID. + """ + return self.agents.get(agent_id) + + def agents_to_json(self) -> str: + """ + Converts all agents in the registry to a JSON string. + + Returns: + str: A JSON string representation of all agents, keyed by their names. + """ + agents_dict = { + name: agent.to_dict() + for name, agent in self.agents.items() + } + return json.dumps(agents_dict, indent=4) + def agent_to_py_model(self, agent: Agent): """ Converts an agent to a Pydantic model. @@ -328,3 +348,30 @@ class AgentRegistry: ) self.agent_registry.agents.append(schema) + + +# if __name__ == "__main__": +# from swarms import Agent + +# agent1 = Agent(agent_name="test_agent_1") +# agent2 = Agent(agent_name="test_agent_2") +# agent3 = Agent(agent_name="test_agent_3") +# print(f"Created agents: {agent1}, {agent2}, {agent3}") + +# registry = AgentRegistry() +# print(f"Created agent registry: {registry}") + +# registry.add(agent1) +# registry.add(agent2) +# registry.add(agent3) +# print(f"Added agents to registry: {agent1}, {agent2}, {agent3}") + +# all_agents = registry.return_all_agents() +# print(f"All agents in registry: {all_agents}") + +# found_agent1 = registry.find_agent_by_name("test_agent_1") +# found_agent2 = registry.find_agent_by_name("test_agent_2") +# found_agent3 = registry.find_agent_by_name("test_agent_3") +# print(f"Found agents by name: {found_agent1}, {found_agent2}, {found_agent3}") + +# print(registry.agents_to_json()) diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index 7e197b51..0a86e676 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -109,6 +109,7 @@ class ConcurrentWorkflow(BaseSwarm): agent_responses: list = [], auto_generate_prompts: bool = False, max_workers: int = None, + user_interface: bool = True, *args, **kwargs, ): @@ -130,10 +131,15 @@ class ConcurrentWorkflow(BaseSwarm): self.agent_responses = agent_responses self.auto_generate_prompts = auto_generate_prompts self.max_workers = max_workers or os.cpu_count() + self.user_interface = user_interface self.tasks = [] # Initialize tasks list self.reliability_check() + def disable_agent_prints(self): + for agent in self.agents: + agent.no_print = False + def reliability_check(self): try: logger.info("Starting reliability checks") @@ -176,62 +182,6 @@ class ConcurrentWorkflow(BaseSwarm): agent.auto_generate_prompt = True # @retry(wait=wait_exponential(min=2), stop=stop_after_attempt(3)) - async def _run_agent( - self, - agent: Agent, - task: str, - executor: ThreadPoolExecutor, - ) -> AgentOutputSchema: - """ - Runs a single agent with the given task and tracks its output and metadata with retry logic. - - Args: - agent (Agent): The agent instance to run. - task (str): The task or query to give to the agent. - img (str): The image to be processed by the agent. - executor (ThreadPoolExecutor): The thread pool executor to use for running the agent task. - - Returns: - AgentOutputSchema: The metadata and output from the agent's execution. - - Example: - >>> async def run_agent_example(): - >>> executor = ThreadPoolExecutor() - >>> agent_output = await workflow._run_agent(agent=Agent(), task="Example task", img="example.jpg", executor=executor) - >>> print(agent_output) - """ - start_time = datetime.now() - try: - loop = asyncio.get_running_loop() - output = await loop.run_in_executor( - executor, - agent.run, - task, - ) - except Exception as e: - logger.error( - f"Error running agent {agent.agent_name}: {e}" - ) - raise - - end_time = datetime.now() - duration = (end_time - start_time).total_seconds() - - agent_output = AgentOutputSchema( - run_id=uuid.uuid4().hex, - agent_name=agent.agent_name, - task=task, - output=output, - start_time=start_time, - end_time=end_time, - duration=duration, - ) - - logger.info( - f"Agent {agent.agent_name} completed task: {task} in {duration:.2f} seconds." - ) - - return agent_output def transform_metadata_schema_to_str( self, schema: MetadataSchema @@ -258,47 +208,6 @@ class ConcurrentWorkflow(BaseSwarm): # Return the agent responses as a string return "\n".join(self.agent_responses) - # @retry(wait=wait_exponential(min=2), stop=stop_after_attempt(3)) - async def _execute_agents_concurrently( - self, task: str, img: str = None, *args, **kwargs - ) -> MetadataSchema: - """ - Executes multiple agents concurrently with the same task, incorporating retry logic for failed executions. - - Args: - task (str): The task or query to give to all agents. - img (str): The image to be processed by the agents. - - Returns: - MetadataSchema: The aggregated metadata and outputs from all agents. - - Example: - >>> async def execute_agents_example(): - >>> metadata_schema = await workflow._execute_agents_concurrently(task="Example task", img="example.jpg") - >>> print(metadata_schema) - """ - with ThreadPoolExecutor( - max_workers=os.cpu_count() - ) as executor: - tasks_to_run = [ - self._run_agent( - agent=agent, - task=task, - executor=executor, - *args, - **kwargs, - ) - for agent in self.agents - ] - - agent_outputs = await asyncio.gather(*tasks_to_run) - return MetadataSchema( - swarm_id=uuid.uuid4().hex, - task=task, - description=self.description, - agents=agent_outputs, - ) - def save_metadata(self): """ Saves the metadata to a JSON file based on the auto_save flag. @@ -322,7 +231,7 @@ class ConcurrentWorkflow(BaseSwarm): self, task: str, img: str = None, *args, **kwargs ) -> Union[Dict[str, Any], str]: """ - Runs the workflow for the given task, executes agents concurrently, and saves metadata in a production-grade manner. + Runs the workflow for the given task, executes agents concurrently using ThreadPoolExecutor, and saves metadata. Args: task (str): The task or query to give to all agents. @@ -339,10 +248,50 @@ class ConcurrentWorkflow(BaseSwarm): logger.info( f"Running concurrent workflow with {len(self.agents)} agents." ) - self.output_schema = asyncio.run( - self._execute_agents_concurrently( - task, img, *args, **kwargs + + def run_agent(agent: Agent, task: str) -> AgentOutputSchema: + start_time = datetime.now() + try: + output = agent.run(task) + except Exception as e: + logger.error( + f"Error running agent {agent.agent_name}: {e}" + ) + raise + + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + + agent_output = AgentOutputSchema( + run_id=uuid.uuid4().hex, + agent_name=agent.agent_name, + task=task, + output=output, + start_time=start_time, + end_time=end_time, + duration=duration, ) + + logger.info( + f"Agent {agent.agent_name} completed task: {task} in {duration:.2f} seconds." + ) + + return agent_output + + with ThreadPoolExecutor( + max_workers=os.cpu_count() + ) as executor: + agent_outputs = list( + executor.map( + lambda agent: run_agent(agent, task), self.agents + ) + ) + + self.output_schema = MetadataSchema( + swarm_id=uuid.uuid4().hex, + task=task, + description=self.description, + agents=agent_outputs, ) self.save_metadata() @@ -351,9 +300,7 @@ class ConcurrentWorkflow(BaseSwarm): return self.transform_metadata_schema_to_str( self.output_schema ) - else: - # Return metadata as a dictionary return self.output_schema.model_dump_json(indent=4) def run( @@ -390,7 +337,8 @@ class ConcurrentWorkflow(BaseSwarm): self.tasks.append(task) try: - return self._run(task, img, *args, **kwargs) + outputs = self._run(task, img, *args, **kwargs) + return outputs except ValueError as e: logger.error(f"Invalid device specified: {e}") raise e @@ -523,26 +471,12 @@ class ConcurrentWorkflow(BaseSwarm): # if __name__ == "__main__": # # Assuming you've already initialized some agents outside of this class -# model = OpenAIChat( -# api_key=os.getenv("OPENAI_API_KEY"), -# model_name="gpt-4o-mini", -# temperature=0.1, -# ) # agents = [ # Agent( # agent_name=f"Financial-Analysis-Agent-{i}", # system_prompt=FINANCIAL_AGENT_SYS_PROMPT, -# llm=model, +# model_name="gpt-4o", # max_loops=1, -# autosave=True, -# dashboard=False, -# verbose=True, -# dynamic_temperature_enabled=True, -# saved_state_path=f"finance_agent_{i}.json", -# user_name="swarms_corp", -# retry_attempts=1, -# context_length=200000, -# return_step_meta=False, # ) # for i in range(3) # Adjust number of agents as needed # ] diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py index 28b6c421..6bb3a09d 100644 --- a/swarms/structs/multi_agent_exec.py +++ b/swarms/structs/multi_agent_exec.py @@ -1,7 +1,9 @@ import asyncio import os import threading -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ( + ThreadPoolExecutor, +) from dataclasses import dataclass from typing import Any, List @@ -413,29 +415,141 @@ def run_agents_with_tasks_concurrently( ) +# from joblib import Parallel, delayed + + +# def run_agents_joblib( +# agents: List[Any], +# tasks: List[str] = [], +# img: List[str] = None, +# max_workers: int = None, +# max_loops: int = 1, +# prefer: str = "threads", +# ) -> List[Any]: +# """ +# Executes a list of agents with their corresponding tasks concurrently using joblib. + +# Each agent is expected to have a .run() method that accepts at least: +# - task: A string indicating the task to execute. +# - img: (Optional) A string representing image input. + +# Args: +# agents (List[Any]): A list of agent instances. +# tasks (List[str], optional): A list of task strings. If provided, each agent gets a task. +# If fewer tasks than agents, the first task is reused. +# img (List[str], optional): A list of image strings. If provided, each agent gets an image. +# If fewer images than agents, the first image is reused. +# max_workers (int, optional): The maximum number of processes to use. +# Defaults to all available CPU cores. +# max_loops (int, optional): Number of times to execute the whole batch. + +# Returns: +# List[Any]: The list of results returned by each agent’s run() method. +# """ +# max_workers = max_workers or os.cpu_count() +# results = [] + +# for _ in range(max_loops): +# results.extend( +# Parallel(n_jobs=max_workers, prefer=prefer)( +# delayed(lambda a, t, i: a.run(task=t, img=i))( +# agent, +# ( +# tasks[idx] +# if tasks and idx < len(tasks) +# else (tasks[0] if tasks else "") +# ), +# ( +# img[idx] +# if img and idx < len(img) +# else (img[0] if img else None) +# ), +# ) +# for idx, agent in enumerate(agents) +# ) +# ) + +# return results + + +# # Example usage: +# if __name__ == '__main__': +# # Dummy Agent class for demonstration. +# class Agent: +# def __init__(self, agent_name, max_loops, model_name): +# self.agent_name = agent_name +# self.max_loops = max_loops +# self.model_name = model_name + +# def run(self, task: str, img: str = None) -> str: +# img_info = f" with image '{img}'" if img else "" +# return (f"{self.agent_name} using model '{self.model_name}' processed task: '{task}'{img_info}") + +# # Create a few Agent instances. +# agents = [ +# Agent( +# agent_name=f"Financial-Analysis-Agent_parallel_swarm{i}", +# max_loops=1, +# model_name="gpt-4o-mini", +# ) +# for i in range(3) +# ] + +# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria" +# outputs = run_agents_process_pool(agents, tasks=[task]) + +# for i, output in enumerate(outputs): +# print(f"Output from agent {i+1}:\n{output}") + # # Example usage: -# # Initialize your agents with the same model to avoid re-creating it -# agents = [ -# Agent( -# agent_name=f"Financial-Analysis-Agent_parallel_swarm{i}", -# system_prompt=FINANCIAL_AGENT_SYS_PROMPT, -# llm=model, -# max_loops=1, -# autosave=True, -# dashboard=False, -# verbose=False, -# dynamic_temperature_enabled=False, -# saved_state_path=f"finance_agent_{i}.json", -# user_name="swarms_corp", -# retry_attempts=1, -# context_length=200000, -# return_step_meta=False, +# if __name__ == '__main__': +# # A sample agent class with a run method. +# class SampleAgent: +# def __init__(self, name): +# self.name = name + +# def run(self, task, device, device_id, no_clusterops): +# # Simulate some processing. +# return (f"Agent {self.name} processed task '{task}' on {device} " +# f"(device_id={device_id}), no_clusterops={no_clusterops}") + +# # Create a list of sample agents. +# agents = [SampleAgent(f"Agent_{i}") for i in range(5)] +# # Define tasks; if fewer tasks than agents, the first task will be reused. +# tasks = ["task1", "task2", "task3"] + +# outputs = run_agents_with_tasks_concurrently( +# agents=agents, +# tasks=tasks, +# max_workers=4, +# device="cpu", +# device_id=1, +# all_cores=True, +# no_clusterops=False # ) -# for i in range(5) # Assuming you want 10 agents -# ] -# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria" -# outputs = run_agents_concurrently(agents, task) +# for output in outputs: +# print(output) + -# for i, output in enumerate(outputs): -# print(f"Output from agent {i+1}:\n{output}") +# # Example usage: +# if __name__ == "__main__": +# # Initialize your agents (for example, 3 agents) +# agents = [ +# Agent( +# agent_name=f"Financial-Analysis-Agent_parallel_swarm{i}", +# max_loops=1, +# model_name="gpt-4o-mini", +# ) +# for i in range(3) +# ] + +# # Generate a list of tasks. +# tasks = [ +# "How can I establish a ROTH IRA to buy stocks and get a tax break?", +# "What are the criteria for establishing a ROTH IRA?", +# "What are the tax benefits of a ROTH IRA?", +# "How to buy stocks using a ROTH IRA?", +# "What are the limitations of a ROTH IRA?", +# ] +# outputs = run_agents_joblib(agents, tasks) diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index ff15993b..3e89e58e 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -113,6 +113,7 @@ class AgentRearrange(BaseSwarm): all_gpus: bool = True, no_use_clusterops: bool = True, autosave: bool = True, + return_entire_history: bool = False, *args, **kwargs, ): @@ -141,7 +142,7 @@ class AgentRearrange(BaseSwarm): self.all_gpus = all_gpus self.no_use_clusterops = no_use_clusterops self.autosave = autosave - + self.return_entire_history = return_entire_history self.output_schema = AgentRearrangeOutput( input=AgentRearrangeInput( swarm_id=id, @@ -465,6 +466,9 @@ class AgentRearrange(BaseSwarm): if self.return_json: return self.output_schema.model_dump_json(indent=4) + if self.return_entire_history: + return self.output_schema.model_dump_json(indent=4) + # Handle different output types if self.output_type == "all": output = " ".join(all_responses) diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index 7d919c72..d10f178a 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -33,6 +33,7 @@ class SequentialWorkflow: output_type: OutputType = "all", return_json: bool = False, shared_memory_system: callable = None, + return_entire_history: bool = False, *args, **kwargs, ): @@ -43,6 +44,7 @@ class SequentialWorkflow: self.output_type = output_type self.return_json = return_json self.shared_memory_system = shared_memory_system + self.return_entire_history = return_entire_history self.reliability_check() self.flow = self.sequential_flow() diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index bf829357..0aaa17be 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -4,7 +4,6 @@ from datetime import datetime from typing import Any, Callable, Dict, List, Literal, Union from pydantic import BaseModel, Field -from tenacity import retry, stop_after_attempt, wait_fixed from swarms.prompts.ag_prompt import aggregator_system_prompt from swarms.structs.agent import Agent @@ -138,11 +137,12 @@ class SwarmRouter: shared_memory_system: Any = None, rules: str = None, documents: List[str] = [], # A list of docs file paths - output_type: str = "string", # Md, PDF, Txt, csv + output_type: str = "all", no_cluster_ops: bool = False, speaker_fn: callable = None, load_agents_from_csv: bool = False, csv_file_path: str = None, + return_entire_history: bool = False, *args, **kwargs, ): @@ -164,6 +164,7 @@ class SwarmRouter: self.logs = [] self.load_agents_from_csv = load_agents_from_csv self.csv_file_path = csv_file_path + self.return_entire_history = return_entire_history if self.load_agents_from_csv: self.agents = AgentLoader( @@ -233,7 +234,6 @@ class SwarmRouter: self._log("error", error_msg) raise RuntimeError(error_msg) from e - @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) def reliability_check(self): logger.info("Initializing reliability checks") @@ -256,6 +256,8 @@ class SwarmRouter: SpreadSheetSwarm, SequentialWorkflow, ConcurrentWorkflow, + GroupChat, + MultiAgentRouter, ]: """ Dynamically create and return the specified swarm type or automatically match the best swarm type for a given task. @@ -286,6 +288,7 @@ class SwarmRouter: flow=self.rearrange_flow, return_json=self.return_json, output_type=self.output_type, + return_entire_history=self.return_entire_history, *args, **kwargs, ) @@ -339,6 +342,7 @@ class SwarmRouter: shared_memory_system=self.shared_memory_system, output_type=self.output_type, return_json=self.return_json, + return_entire_history=self.return_entire_history, *args, **kwargs, ) @@ -384,7 +388,6 @@ class SwarmRouter: self.logs.append(log_entry) logger.log(level.upper(), message) - @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) def _run(self, task: str, img: str, *args, **kwargs) -> Any: """ Dynamically run the specified task on the selected or matched swarm type. diff --git a/swarms/telemetry/main.py b/swarms/telemetry/main.py index 1b2396f2..380ece40 100644 --- a/swarms/telemetry/main.py +++ b/swarms/telemetry/main.py @@ -11,10 +11,6 @@ import psutil import requests import toml -from swarms.utils.loguru_logger import initialize_logger - -logger = initialize_logger(log_folder="capture_sys_data") - # Helper functions def generate_user_id(): @@ -262,7 +258,8 @@ def capture_system_data() -> Dict[str, str]: return system_data except Exception as e: - logger.error("Failed to capture system data: {}", e) + # logger.error("Failed to capture system data: {}", e) + print(f"Failed to capture system data: {e}") return {} diff --git a/swarms/utils/__init__.py b/swarms/utils/__init__.py index 5b321ba3..6632b26b 100644 --- a/swarms/utils/__init__.py +++ b/swarms/utils/__init__.py @@ -16,6 +16,7 @@ from swarms.utils.parse_code import extract_code_from_markdown from swarms.utils.pdf_to_text import pdf_to_text from swarms.utils.try_except_wrapper import try_except_wrapper from swarms.utils.calculate_func_metrics import profile_func +from swarms.utils.litellm_tokenizer import count_tokens __all__ = [ @@ -33,4 +34,5 @@ __all__ = [ "pdf_to_text", "try_except_wrapper", "profile_func", + "count_tokens", ] diff --git a/swarms/utils/loguru_logger.py b/swarms/utils/loguru_logger.py index 9295e9eb..b0906c5b 100644 --- a/swarms/utils/loguru_logger.py +++ b/swarms/utils/loguru_logger.py @@ -1,50 +1,26 @@ -import os -import uuid import sys from loguru import logger def initialize_logger(log_folder: str = "logs"): - AGENT_WORKSPACE = "agent_workspace" - - # Check if WORKSPACE_DIR is set, if not, set it to AGENT_WORKSPACE - if "WORKSPACE_DIR" not in os.environ: - os.environ["WORKSPACE_DIR"] = AGENT_WORKSPACE - - # Create a folder within the agent_workspace - log_folder_path = os.path.join( - os.getenv("WORKSPACE_DIR"), log_folder - ) - if not os.path.exists(log_folder_path): - os.makedirs(log_folder_path) - - # Generate a unique identifier for the log file - uuid_for_log = str(uuid.uuid4()) - log_file_path = os.path.join( - log_folder_path, f"{log_folder}_{uuid_for_log}.log" - ) - - # Remove default handler and add custom handlers + # Remove default handler and add a combined handler logger.remove() - # Add console handler with colors + # Add a combined console and file handler logger.add( sys.stdout, colorize=True, format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", level="INFO", - ) - - # Add file handler - logger.add( - log_file_path, - format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", - level="INFO", backtrace=True, diagnose=True, enqueue=True, - retention="10 days", - # compression="zip", + # retention="10 days", # Removed this line ) return logger + + +# logger = initialize_logger() + +# logger.info("Hello, world!") diff --git a/swarms/utils/loguru_test.py b/swarms/utils/visualizer.py similarity index 70% rename from swarms/utils/loguru_test.py rename to swarms/utils/visualizer.py index 6d3d777f..849cfe28 100644 --- a/swarms/utils/loguru_test.py +++ b/swarms/utils/visualizer.py @@ -1,7 +1,6 @@ import asyncio from dataclasses import dataclass from datetime import datetime -from queue import Queue from typing import Any, Callable, Dict, List, Optional import psutil @@ -19,6 +18,8 @@ from rich.table import Table from rich.text import Text from rich.tree import Tree +from swarms.structs.agent import Agent + try: import pynvml @@ -30,59 +31,47 @@ except ImportError: @dataclass class SwarmMetadata: - name: str - description: str - version: str - type: str # hierarchical, parallel, sequential - created_at: datetime - author: str - tags: List[str] - primary_objective: str - secondary_objectives: List[str] - - -@dataclass -class Agent: - name: str - role: str - description: str - agent_type: str # e.g., "LLM", "Neural", "Rule-based" - capabilities: List[str] - parameters: Dict[str, any] - metadata: Dict[str, str] - children: List["Agent"] = None - parent: Optional["Agent"] = None - output_stream: Queue = None + name: Optional[str] = None + description: Optional[str] = None + version: Optional[str] = None + type: Optional[str] = None # hierarchical, parallel, sequential + created_at: Optional[datetime] = None + author: Optional[str] = None + tags: Optional[List[str]] = None + primary_objective: Optional[str] = None + secondary_objectives: Optional[List[str]] = None def __post_init__(self): - self.children = self.children or [] - self.output_stream = Queue() - - @property - def hierarchy_level(self) -> int: - level = 0 - current = self - while current.parent: - level += 1 - current = current.parent - return level + self.tags = self.tags or [] + self.secondary_objectives = self.secondary_objectives or [] + self.created_at = self.created_at or datetime.now() class SwarmVisualizationRich: def __init__( self, swarm_metadata: SwarmMetadata, - root_agent: Agent, + agents: List[Agent], update_resources: bool = True, refresh_rate: float = 0.1, ): + """ + Initializes the visualizer with a list of agents. + + Args: + swarm_metadata (SwarmMetadata): Metadata for the swarm. + agents (List[Agent]): List of root agents. + update_resources (bool): Whether to update system resource stats. + refresh_rate (float): Refresh rate for the live visualization. + """ self.swarm_metadata = swarm_metadata - self.root_agent = root_agent + self.agents = agents self.update_resources = update_resources self.refresh_rate = refresh_rate self.console = Console() self.live = None - self.output_history = {} + # A dictionary mapping agent names to list of output messages + self.output_history: Dict[str, List[Dict[str, Any]]] = {} # System monitoring self.cores_available = 0 @@ -100,96 +89,112 @@ class SwarmVisualizationRich: minutes, seconds = divmod(remainder, 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" - def _build_agent_tree( - self, agent: Agent, tree: Optional[Tree] = None - ) -> Tree: - """Builds a detailed tree visualization of the agent hierarchy.""" + def _build_agent_tree(self, agents: List[Agent]) -> Tree: + """ + Builds a detailed tree visualization for a list of agents. + + Args: + agents (List[Agent]): The list of root agents. + + Returns: + Tree: A rich Tree object displaying agent metadata. + """ + tree = Tree("[bold underline]Agents[/bold underline]") + for agent in agents: + self._add_agent_to_tree(agent, tree) + return tree + + def _add_agent_to_tree(self, agent: Agent, tree: Tree) -> None: + """ + Recursively adds an agent and its children to the given tree. + + Args: + agent (Agent): The agent to add. + tree (Tree): The tree to update. + """ agent_info = [ f"[bold cyan]{agent.name}[/bold cyan]", f"[yellow]Role:[/yellow] {agent.role}", - f"[green]Type:[/green] {agent.agent_type}", - f"[blue]Level:[/blue] {agent.hierarchy_level}", - f"[magenta]Capabilities:[/magenta] {', '.join(agent.capabilities)}", ] - # Add any custom metadata - for key, value in agent.metadata.items(): - agent_info.append(f"[white]{key}:[/white] {value}") + # # Add any custom metadata from the agent (if available) + # for key, value in getattr(agent, "metadata", {}).items(): + # agent_info.append(f"[white]{key}:[/white] {value}") - # Parameters summary - param_summary = ", ".join( - f"{k}: {v}" for k, v in agent.parameters.items() - ) - agent_info.append( - f"[white]Parameters:[/white] {param_summary}" - ) + # # Parameters summary if available + # parameters = getattr(agent, "parameters", {}) + # if parameters: + # param_summary = ", ".join(f"{k}: {v}" for k, v in parameters.items()) + # agent_info.append(f"[white]Parameters:[/white] {param_summary}") node_text = "\n".join(agent_info) + branch = tree.add(node_text) + for child in getattr(agent, "children", []): + self._add_agent_to_tree(child, branch) - if tree is None: - tree = Tree(node_text) - else: - branch = tree.add(node_text) - tree = branch - - for child in agent.children: - self._build_agent_tree(child, tree) + def _count_agents(self, agents: List[Agent]) -> int: + """ + Recursively counts total number of agents from a list of root agents. - return tree + Args: + agents (List[Agent]): List of agents. - def _count_agents(self, agent: Agent) -> int: - """Recursively counts total number of agents in the swarm.""" - count = 1 # Count current agent - for child in agent.children or []: - count += self._count_agents(child) - return count + Returns: + int: Total count of agents including children. + """ + return len(agents) def _create_unified_info_panel(self) -> Panel: - """Creates a unified panel showing both swarm metadata and architecture.""" - # Create the main container + """ + Creates a unified panel showing swarm metadata and agents' metadata. + """ info_layout = Layout() info_layout.split_column( - Layout(name="metadata", size=13), + Layout(name="metadata", size=15), Layout(name="architecture"), ) - # Calculate total agents - total_agents = self._count_agents(self.root_agent) + total_agents = self._count_agents(self.agents) # Metadata section metadata_table = Table.grid(padding=1, expand=True) metadata_table.add_column("Label", style="bold cyan") metadata_table.add_column("Value", style="white") - # System resources + # Update system resources if needed if self.update_resources: self._update_resource_stats() - # Add description with proper wrapping + # Wrap the description text properly description_text = Text( - self.swarm_metadata.description, style="italic" + self.swarm_metadata.description or "", style="italic" ) description_text.wrap(self.console, width=60, overflow="fold") - metadata_table.add_row("Swarm Name", self.swarm_metadata.name) + metadata_table.add_row( + "Swarm Name", self.swarm_metadata.name or "N/A" + ) metadata_table.add_row("Description", description_text) - metadata_table.add_row("Version", self.swarm_metadata.version) + metadata_table.add_row( + "Version", self.swarm_metadata.version or "N/A" + ) metadata_table.add_row("Total Agents", str(total_agents)) - metadata_table.add_row("Author", self.swarm_metadata.author) + metadata_table.add_row( + "Author", self.swarm_metadata.author or "N/A" + ) metadata_table.add_row( "System", f"CPU: {self.cores_available} cores | Memory: {self.memory_usage}", ) metadata_table.add_row( - "Primary Objective", self.swarm_metadata.primary_objective + "Primary Objective", + self.swarm_metadata.primary_objective or "N/A", ) info_layout["metadata"].update(metadata_table) - info_layout["metadata"].update(metadata_table) - - # Architecture section with tree visualization - architecture_tree = self._build_agent_tree(self.root_agent) + # Architecture section with the agent tree + architecture_tree = self._build_agent_tree(self.agents) info_layout["architecture"].update(architecture_tree) return Panel( @@ -198,12 +203,13 @@ class SwarmVisualizationRich: ) def _create_outputs_panel(self) -> Panel: - """Creates a panel that displays stacked message history for all agents.""" - # Create a container for all messages across all agents + """ + Creates a panel that displays stacked message history for all agents. + """ all_messages = [] def collect_agent_messages(agent: Agent): - """Recursively collect messages from all agents.""" + """Recursively collect messages from an agent and its children.""" messages = self.output_history.get(agent.name, []) for msg in messages: all_messages.append( @@ -214,37 +220,31 @@ class SwarmVisualizationRich: "style": msg["style"], } ) - for child in agent.children: + for child in getattr(agent, "children", []): collect_agent_messages(child) - # Collect all messages - collect_agent_messages(self.root_agent) + # Collect messages from every root agent + for agent in self.agents: + collect_agent_messages(agent) # Sort messages by timestamp all_messages.sort(key=lambda x: x["time"]) - # Create the stacked message display - Layout() messages_container = [] - for msg in all_messages: - # Create a panel for each message message_text = Text() message_text.append(f"[{msg['time']}] ", style="dim") message_text.append( f"{msg['agent']}: ", style="bold cyan" ) message_text.append(msg["content"], style=msg["style"]) - messages_container.append(message_text) - # Join all messages with line breaks if messages_container: final_text = Text("\n").join(messages_container) else: final_text = Text("No messages yet...", style="dim") - # Create scrollable panel for all messages return Panel( final_text, title="[bold]Agent Communication Log[/bold]", @@ -286,36 +286,30 @@ class SwarmVisualizationRich: by_word: bool = False, ): """ - Streams output for a specific agent with sophisticated token-by-token animation. + Streams output for a specific agent with token-by-token animation. Args: - agent (Agent): The agent whose output is being streamed - text (str): The text to stream - title (Optional[str]): Custom title for the output panel - style (str): Style for the output text - delay (float): Delay between tokens - by_word (bool): If True, streams word by word instead of character by character + agent (Agent): The agent whose output is being streamed. + text (str): The text to stream. + title (Optional[str]): Custom title for the output panel. + style (str): Style for the output text. + delay (float): Delay between tokens. + by_word (bool): If True, stream word by word instead of character by character. """ display_text = Text(style=style) current_output = "" - # Split into words or characters tokens = text.split() if by_word else text - - # Create a panel for this agent's output title = title or f"{agent.name} Output" for token in tokens: - # Add appropriate spacing token_with_space = token + (" " if by_word else "") current_output += token_with_space display_text.append(token_with_space) - # Initialize history list if it doesn't exist if agent.name not in self.output_history: self.output_history[agent.name] = [] - # Store the complete message when finished if token == tokens[-1]: timestamp = datetime.now().strftime("%H:%M:%S") self.output_history[agent.name].append( @@ -326,11 +320,19 @@ class SwarmVisualizationRich: } ) - # Update live display if active if self.live: self.live.update(self._create_layout()) await asyncio.sleep(delay) + def log_agent_output(self, agent: Agent, text: str): + asyncio.create_task( + self.stream_output( + agent=agent, + text=text, + title=f"{agent.name} Output {agent.max_loops}", + ) + ) + async def print_progress( self, description: str, @@ -342,13 +344,13 @@ class SwarmVisualizationRich: Displays a progress spinner while executing a task. Args: - description (str): Task description - task_fn (Callable): Function to execute - *args (Any): Arguments for task_fn - **kwargs (Any): Keyword arguments for task_fn + description (str): Task description. + task_fn (Callable): Function to execute. + *args (Any): Arguments for task_fn. + **kwargs (Any): Keyword arguments for task_fn. Returns: - Any: Result of task_fn + Any: The result of task_fn. """ progress = Progress( SpinnerColumn(), @@ -393,10 +395,12 @@ class SwarmVisualizationRich: asyncio.create_task( self.stream_output(agent, new_output) ) - for child in agent.children: + for child in getattr(agent, "children", []): process_agent_streams(child) - process_agent_streams(self.root_agent) + # Process streams for each root agent + for agent in self.agents: + process_agent_streams(agent) await asyncio.sleep(self.refresh_rate) diff --git a/visualizer_test.py b/visualizer_test.py new file mode 100644 index 00000000..15f95d80 --- /dev/null +++ b/visualizer_test.py @@ -0,0 +1,67 @@ +import asyncio +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) +from swarms.utils.visualizer import ( + SwarmVisualizationRich, + SwarmMetadata, +) # Replace with your actual module name + +# Create two example agents +agent1 = Agent( + agent_name="Financial-Analysis-Agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + model_name="gpt-4o-mini", + max_loops=1, + autosave=True, + dashboard=False, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="finance_agent.json", + user_name="swarms_corp", + retry_attempts=1, + context_length=200000, + return_step_meta=False, + output_type="string", + streaming_on=False, +) + +# Create a second dummy agent for demonstration +agent2 = Agent( + agent_name="Stock-Advisor-Agent", + system_prompt="Provide stock market insights and investment advice.", + model_name="gpt-4o-mini", + max_loops=1, + autosave=True, + dashboard=False, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="stock_agent.json", + user_name="swarms_corp", + retry_attempts=1, + context_length=200000, + return_step_meta=False, + output_type="string", + streaming_on=False, +) + +# Create swarm metadata +metadata = SwarmMetadata( + name="Financial Swarm", + description="A swarm of agents focused on financial analysis and stock market advice.", + version="1.0", + author="Your Name", + primary_objective="Provide comprehensive financial and investment analysis.", +) + +# Instantiate the visualizer with a list of agents +visualizer = SwarmVisualizationRich( + swarm_metadata=metadata, + agents=[agent1, agent2], + update_resources=True, + refresh_rate=0.1, +) + +# Start the visualization +asyncio.run(visualizer.start())