diff --git a/concurrent_swarm_example.py b/concurrent_swarm_example.py index 724346d4..af2f659a 100644 --- a/concurrent_swarm_example.py +++ b/concurrent_swarm_example.py @@ -18,11 +18,8 @@ if __name__ == "__main__": # Initialize the workflow with the list of agents workflow = ConcurrentWorkflow( agents=agents, - metadata_output_path="agent_metadata_4.json", output_type="list", - show_progress=False, - max_loops=3, - interactive=True, + max_loops=1, ) # Define the task for all agents diff --git a/docs/swarms/models/agent_and_models.md b/docs/swarms/models/agent_and_models.md index 2b13b34f..9ef9e5b3 100644 --- a/docs/swarms/models/agent_and_models.md +++ b/docs/swarms/models/agent_and_models.md @@ -6,13 +6,13 @@ ## Important Note on Model Names !!! warning "Required Format" - When specifying a model in Swarms, you must use the format `provider/model_name`. For example: + When specifying a model in an agent, you must use the format `provider/model_name`. For example: ```python "openai/gpt-4" "anthropic/claude-3-opus-latest" "cohere/command-r-plus" ``` - This format ensures Swarms knows which provider to use for the specified model. + This format ensures the agent knows which provider to use for the specified model. ## Available Model Providers @@ -79,12 +79,12 @@ - `mistral-small` - `mistral-medium` -## Using Different Models with Swarms +## Using Different Models In Your Agents To use a different model with your Swarms agent, specify the model name in the `model_name` parameter when initializing the Agent, using the provider/model_name format: ```python -from swarms.structs.agent import Agent +from swarms import Agent # Using OpenAI's GPT-4 agent = Agent( diff --git a/docs/swarms_cloud/python_client.md b/docs/swarms_cloud/python_client.md new file mode 100644 index 00000000..8a6dd295 --- /dev/null +++ b/docs/swarms_cloud/python_client.md @@ -0,0 +1,1006 @@ +# Swarms API Client Reference Documentation + +## Table of Contents + +1. [Introduction](#introduction) +2. [Installation](#installation) +3. [Quick Start](#quick-start) +4. [Authentication](#authentication) +5. [Client Configuration](#client-configuration) +6. [API Endpoints Overview](#api-endpoints-overview) +7. [Core Methods](#core-methods) +8. [Swarm Management](#swarm-management) +9. [Agent Management](#agent-management) +10. [Batch Operations](#batch-operations) +11. [Health and Monitoring](#health-and-monitoring) +12. [Error Handling](#error-handling) +13. [Performance Optimization](#performance-optimization) +14. [Type Reference](#type-reference) +15. [Code Examples](#code-examples) +16. [Best Practices](#best-practices) +17. [Troubleshooting](#troubleshooting) + +## Introduction + +The Swarms API Client is a production-grade Python library designed to interact with the Swarms API. It provides both synchronous and asynchronous interfaces for maximum flexibility, enabling developers to create and manage swarms of AI agents efficiently. The client includes advanced features such as automatic retrying, response caching, connection pooling, and comprehensive error handling. + +### Key Features + +- **Dual Interface**: Both synchronous and asynchronous APIs +- **Automatic Retrying**: Built-in retry logic with exponential backoff +- **Response Caching**: TTL-based caching for improved performance +- **Connection Pooling**: Optimized connection management +- **Type Safety**: Pydantic models for input validation +- **Comprehensive Logging**: Structured logging with Loguru +- **Thread-Safe**: Safe for use in multi-threaded applications +- **Rate Limiting**: Built-in rate limit handling +- **Performance Optimized**: DNS caching, TCP optimizations, and more + +## Installation + +```bash +pip install swarms-client +``` + + +## Quick Start + +```python +from swarms_client import SwarmsClient + +# Initialize the client +client = SwarmsClient(api_key="your-api-key") + +# Create a simple swarm +swarm = client.create_swarm( + name="analysis-swarm", + task="Analyze this market data", + agents=[ + { + "agent_name": "data-analyst", + "model_name": "gpt-4", + "role": "worker" + } + ] +) + +# Run a single agent +result = client.run_agent( + agent_name="researcher", + task="Research the latest AI trends", + model_name="gpt-4" +) +``` + +### Async Example + +```python +import asyncio +from swarms_client import SwarmsClient + +async def main(): + async with SwarmsClient(api_key="your-api-key") as client: + # Create a swarm asynchronously + swarm = await client.async_create_swarm( + name="async-swarm", + task="Process these documents", + agents=[ + { + "agent_name": "document-processor", + "model_name": "gpt-4", + "role": "worker" + } + ] + ) + print(swarm) + +asyncio.run(main()) +``` + +## Authentication + +### Obtaining API Keys + +API keys can be obtained from the Swarms platform at: [https://swarms.world/platform/api-keys](https://swarms.world/platform/api-keys) + +### Setting API Keys + +There are three ways to provide your API key: + +1. **Direct Parameter** (Recommended for development): +```python +client = SwarmsClient(api_key="your-api-key") +``` + +2. **Environment Variable** (Recommended for production): +```bash +export SWARMS_API_KEY="your-api-key" +``` +```python +client = SwarmsClient() # Will use SWARMS_API_KEY env var +``` + +3. **Configuration Object**: +```python +from swarms_client.config import SwarmsConfig + +SwarmsConfig.set_api_key("your-api-key") +client = SwarmsClient() +``` + +## Client Configuration + +### Configuration Parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `api_key` | Optional[str] | None | API key for authentication | +| `base_url` | Optional[str] | "https://api.swarms.world" | Base URL for the API | +| `timeout` | Optional[int] | 30 | Request timeout in seconds | +| `max_retries` | Optional[int] | 3 | Maximum number of retry attempts | +| `max_concurrent_requests` | Optional[int] | 100 | Maximum concurrent requests | +| `retry_on_status` | Optional[Set[int]] | {429, 500, 502, 503, 504} | HTTP status codes to retry | +| `retry_delay` | Optional[float] | 1.0 | Initial retry delay in seconds | +| `max_retry_delay` | Optional[int] | 60 | Maximum retry delay in seconds | +| `jitter` | bool | True | Add random jitter to retry delays | +| `enable_cache` | bool | True | Enable response caching | +| `thread_pool_size` | Optional[int] | min(32, max_concurrent_requests * 2) | Thread pool size for sync operations | + +### Configuration Example + +```python +from swarms_client import SwarmsClient + +client = SwarmsClient( + api_key="your-api-key", + base_url="https://api.swarms.world", + timeout=60, + max_retries=5, + max_concurrent_requests=50, + retry_delay=2.0, + enable_cache=True, + thread_pool_size=20 +) +``` + +## API Endpoints Overview + +### Endpoint Reference Table + +| Endpoint | Method | Description | Sync Method | Async Method | +|----------|--------|-------------|-------------|--------------| +| `/health` | GET | Check API health | `get_health()` | `async_get_health()` | +| `/v1/swarm/completions` | POST | Create and run a swarm | `create_swarm()` | `async_create_swarm()` | +| `/v1/swarm/{swarm_id}/run` | POST | Run existing swarm | `run_swarm()` | `async_run_swarm()` | +| `/v1/swarm/{swarm_id}/logs` | GET | Get swarm logs | `get_swarm_logs()` | `async_get_swarm_logs()` | +| `/v1/models/available` | GET | List available models | `get_available_models()` | `async_get_available_models()` | +| `/v1/swarms/available` | GET | List swarm types | `get_swarm_types()` | `async_get_swarm_types()` | +| `/v1/agent/completions` | POST | Run single agent | `run_agent()` | `async_run_agent()` | +| `/v1/agent/batch/completions` | POST | Run agent batch | `run_agent_batch()` | `async_run_agent_batch()` | +| `/v1/swarm/batch/completions` | POST | Run swarm batch | `run_swarm_batch()` | `async_run_swarm_batch()` | +| `/v1/swarm/logs` | GET | Get API logs | `get_api_logs()` | `async_get_api_logs()` | + +## Core Methods + +### Health Check + +Check the API health status to ensure the service is operational. + +```python +# Synchronous +health = client.get_health() + +# Asynchronous +health = await client.async_get_health() +``` + +**Response Example:** +```json +{ + "status": "healthy", + "version": "1.0.0", + "timestamp": "2025-01-20T12:00:00Z" +} +``` + +### Available Models + +Retrieve a list of all available models that can be used with agents. + +```python +# Synchronous +models = client.get_available_models() + +# Asynchronous +models = await client.async_get_available_models() +``` + +**Response Example:** +```json +{ + "models": [ + "gpt-4", + "gpt-3.5-turbo", + "claude-3-opus", + "claude-3-sonnet" + ] +} +``` + +### Swarm Types + +Get available swarm architecture types. + +```python +# Synchronous +swarm_types = client.get_swarm_types() + +# Asynchronous +swarm_types = await client.async_get_swarm_types() +``` + +**Response Example:** +```json +{ + "swarm_types": [ + "sequential", + "parallel", + "hierarchical", + "mesh" + ] +} +``` + +## Swarm Management + +### Create Swarm + +Create and run a new swarm with specified configuration. + +#### Method Signature + +```python +def create_swarm( + self, + name: str, + task: str, + agents: List[AgentSpec], + description: Optional[str] = None, + max_loops: int = 1, + swarm_type: Optional[str] = None, + rearrange_flow: Optional[str] = None, + return_history: bool = True, + rules: Optional[str] = None, + tasks: Optional[List[str]] = None, + messages: Optional[List[Dict[str, Any]]] = None, + stream: bool = False, + service_tier: str = "standard", +) -> Dict[str, Any] +``` + +#### Parameters + +| Parameter | Type | Required | Default | Description | +|-----------|------|----------|---------|-------------| +| `name` | str | Yes | - | Name of the swarm | +| `task` | str | Yes | - | Main task for the swarm | +| `agents` | List[AgentSpec] | Yes | - | List of agent specifications | +| `description` | Optional[str] | No | None | Swarm description | +| `max_loops` | int | No | 1 | Maximum execution loops | +| `swarm_type` | Optional[str] | No | None | Type of swarm architecture | +| `rearrange_flow` | Optional[str] | No | None | Flow rearrangement instructions | +| `return_history` | bool | No | True | Whether to return execution history | +| `rules` | Optional[str] | No | None | Swarm behavior rules | +| `tasks` | Optional[List[str]] | No | None | List of subtasks | +| `messages` | Optional[List[Dict]] | No | None | Initial messages | +| `stream` | bool | No | False | Whether to stream output | +| `service_tier` | str | No | "standard" | Service tier for processing | + +#### Example + +```python +from swarms_client.models import AgentSpec + +# Define agents +agents = [ + AgentSpec( + agent_name="researcher", + model_name="gpt-4", + role="leader", + system_prompt="You are an expert researcher.", + temperature=0.7, + max_tokens=1000 + ), + AgentSpec( + agent_name="analyst", + model_name="gpt-3.5-turbo", + role="worker", + system_prompt="You are a data analyst.", + temperature=0.5, + max_tokens=800 + ) +] + +# Create swarm +swarm = client.create_swarm( + name="research-team", + task="Research and analyze climate change impacts", + agents=agents, + description="A swarm for climate research", + max_loops=3, + swarm_type="hierarchical", + rules="Always cite sources and verify facts" +) +``` + +### Run Swarm + +Run an existing swarm by its ID. + +```python +# Synchronous +result = client.run_swarm(swarm_id="swarm-123") + +# Asynchronous +result = await client.async_run_swarm(swarm_id="swarm-123") +``` + +### Get Swarm Logs + +Retrieve execution logs for a specific swarm. + +```python +# Synchronous +logs = client.get_swarm_logs(swarm_id="swarm-123") + +# Asynchronous +logs = await client.async_get_swarm_logs(swarm_id="swarm-123") +``` + +**Response Example:** +```json +{ + "logs": [ + { + "timestamp": "2025-01-20T12:00:00Z", + "level": "INFO", + "message": "Swarm started", + "agent": "researcher", + "task": "Initial research" + } + ] +} +``` + +## Agent Management + +### Run Agent + +Run a single agent with specified configuration. + +#### Method Signature + +```python +def run_agent( + self, + agent_name: str, + task: str, + model_name: str = "gpt-4", + temperature: float = 0.7, + max_tokens: int = 1000, + system_prompt: Optional[str] = None, + description: Optional[str] = None, + auto_generate_prompt: bool = False, + role: str = "worker", + max_loops: int = 1, + tools_dictionary: Optional[List[Dict[str, Any]]] = None, +) -> Dict[str, Any] +``` + +#### Parameters + +| Parameter | Type | Required | Default | Description | +|-----------|------|----------|---------|-------------| +| `agent_name` | str | Yes | - | Name of the agent | +| `task` | str | Yes | - | Task for the agent | +| `model_name` | str | No | "gpt-4" | Model to use | +| `temperature` | float | No | 0.7 | Generation temperature | +| `max_tokens` | int | No | 1000 | Maximum tokens | +| `system_prompt` | Optional[str] | No | None | System prompt | +| `description` | Optional[str] | No | None | Agent description | +| `auto_generate_prompt` | bool | No | False | Auto-generate prompts | +| `role` | str | No | "worker" | Agent role | +| `max_loops` | int | No | 1 | Maximum loops | +| `tools_dictionary` | Optional[List[Dict]] | No | None | Available tools | + +#### Example + +```python +# Run a single agent +result = client.run_agent( + agent_name="code-reviewer", + task="Review this Python code for best practices", + model_name="gpt-4", + temperature=0.3, + max_tokens=1500, + system_prompt="You are an expert Python developer.", + role="expert" +) + +# With tools +tools = [ + { + "name": "code_analyzer", + "description": "Analyze code quality", + "parameters": { + "language": "python", + "metrics": ["complexity", "coverage"] + } + } +] + +result = client.run_agent( + agent_name="analyzer", + task="Analyze this codebase", + tools_dictionary=tools +) +``` + +## Batch Operations + +### Run Agent Batch + +Run multiple agents in parallel for improved efficiency. + +```python +# Define multiple agent configurations +agents = [ + { + "agent_name": "agent1", + "task": "Task 1", + "model_name": "gpt-4" + }, + { + "agent_name": "agent2", + "task": "Task 2", + "model_name": "gpt-3.5-turbo" + } +] + +# Run batch +results = client.run_agent_batch(agents=agents) +``` + +### Run Swarm Batch + +Run multiple swarms in parallel. + +```python +# Define multiple swarm configurations +swarms = [ + { + "name": "swarm1", + "task": "Research topic A", + "agents": [{"agent_name": "researcher1", "model_name": "gpt-4"}] + }, + { + "name": "swarm2", + "task": "Research topic B", + "agents": [{"agent_name": "researcher2", "model_name": "gpt-4"}] + } +] + +# Run batch +results = client.run_swarm_batch(swarms=swarms) +``` + +## Health and Monitoring + +### API Logs + +Retrieve all API request logs for your API key. + +```python +# Synchronous +logs = client.get_api_logs() + +# Asynchronous +logs = await client.async_get_api_logs() +``` + +**Response Example:** +```json +{ + "logs": [ + { + "request_id": "req-123", + "timestamp": "2025-01-20T12:00:00Z", + "method": "POST", + "endpoint": "/v1/agent/completions", + "status": 200, + "duration_ms": 1234 + } + ] +} +``` + +## Error Handling + +### Exception Types + +| Exception | Description | Common Causes | +|-----------|-------------|---------------| +| `SwarmsError` | Base exception | General API errors | +| `AuthenticationError` | Authentication failed | Invalid API key | +| `RateLimitError` | Rate limit exceeded | Too many requests | +| `ValidationError` | Input validation failed | Invalid parameters | +| `APIError` | API returned an error | Server-side issues | + +### Error Handling Example + +```python +from swarms_client import ( + SwarmsClient, + AuthenticationError, + RateLimitError, + ValidationError, + APIError +) + +try: + result = client.run_agent( + agent_name="test", + task="Analyze data" + ) +except AuthenticationError: + print("Invalid API key. Please check your credentials.") +except RateLimitError: + print("Rate limit exceeded. Please wait before retrying.") +except ValidationError as e: + print(f"Invalid input: {e}") +except APIError as e: + print(f"API error: {e.message} (Status: {e.status_code})") +except Exception as e: + print(f"Unexpected error: {e}") +``` + +## Performance Optimization + +### Caching + +The client includes built-in response caching for GET requests: + +```python +# Enable caching (default) +client = SwarmsClient(api_key="your-key", enable_cache=True) + +# Disable caching +client = SwarmsClient(api_key="your-key", enable_cache=False) + +# Skip cache for specific request +health = await client.async_get_health(skip_cache=True) +``` + +### Connection Pooling + +The client automatically manages connection pools for optimal performance: + +```python +# Configure pool size +client = SwarmsClient( + api_key="your-key", + max_concurrent_requests=50, # Pool size + thread_pool_size=20 # Thread pool for sync ops +) +``` + +### Batch Operations + +Use batch operations for processing multiple items: + +```python +# Instead of this (sequential) +results = [] +for task in tasks: + result = client.run_agent(agent_name="agent", task=task) + results.append(result) + +# Do this (parallel) +agents = [{"agent_name": "agent", "task": task} for task in tasks] +results = client.run_agent_batch(agents=agents) +``` + +## Type Reference + +### AgentSpec + +```python +class AgentSpec(BaseModel): + agent_name: str + system_prompt: Optional[str] = None + description: Optional[str] = None + model_name: str = "gpt-4" + auto_generate_prompt: bool = False + max_tokens: int = 1000 + temperature: float = 0.5 + role: Literal["worker", "leader", "expert"] = "worker" + max_loops: int = 1 + tools_dictionary: Optional[List[Dict[str, Any]]] = None +``` + +### SwarmSpec + +```python +class SwarmSpec(BaseModel): + name: str + description: Optional[str] = None + agents: List[AgentSpec] + swarm_type: Optional[str] = None + rearrange_flow: Optional[str] = None + task: str + return_history: bool = True + rules: Optional[str] = None + tasks: Optional[List[str]] = None + messages: Optional[List[Dict[str, Any]]] = None + max_loops: int = 1 + stream: bool = False + service_tier: Literal["standard", "premium"] = "standard" +``` + +### AgentCompletion + +```python +class AgentCompletion(BaseModel): + agent_config: AgentSpec + task: str +``` + +## Code Examples + +### Complete Data Analysis Swarm + +```python +from swarms_client import SwarmsClient +from swarms_client.models import AgentSpec + +# Initialize client +client = SwarmsClient(api_key="your-api-key") + +# Define specialized agents +agents = [ + AgentSpec( + agent_name="data-collector", + model_name="gpt-4", + role="worker", + system_prompt="You collect and organize data from various sources.", + temperature=0.3, + max_tokens=1000 + ), + AgentSpec( + agent_name="statistician", + model_name="gpt-4", + role="worker", + system_prompt="You perform statistical analysis on data.", + temperature=0.2, + max_tokens=1500 + ), + AgentSpec( + agent_name="report-writer", + model_name="gpt-4", + role="leader", + system_prompt="You create comprehensive reports from analysis.", + temperature=0.7, + max_tokens=2000 + ) +] + +# Create and run swarm +swarm = client.create_swarm( + name="data-analysis-swarm", + task="Analyze sales data and create quarterly report", + agents=agents, + swarm_type="sequential", + max_loops=2, + rules="Always include statistical significance in analysis" +) + +print(f"Analysis complete: {swarm['result']}") +``` + +### Async Web Scraping System + +```python +import asyncio +from swarms_client import SwarmsClient + +async def scrape_and_analyze(urls): + async with SwarmsClient(api_key="your-api-key") as client: + # Run scrapers in parallel + scraper_tasks = [] + for i, url in enumerate(urls): + task = client.async_run_agent( + agent_name=f"scraper-{i}", + task=f"Extract main content from {url}", + model_name="gpt-3.5-turbo", + temperature=0.1 + ) + scraper_tasks.append(task) + + # Wait for all scrapers + scraped_data = await asyncio.gather(*scraper_tasks) + + # Analyze aggregated data + analysis = await client.async_run_agent( + agent_name="analyzer", + task=f"Analyze trends in: {scraped_data}", + model_name="gpt-4", + temperature=0.5 + ) + + return analysis + +# Run the async function +urls = ["https://example1.com", "https://example2.com"] +result = asyncio.run(scrape_and_analyze(urls)) +``` + +### Real-time Processing with Streaming + +```python +from swarms_client import SwarmsClient + +client = SwarmsClient(api_key="your-api-key") + +# Create streaming swarm +swarm = client.create_swarm( + name="real-time-processor", + task="Process incoming data stream", + agents=[ + { + "agent_name": "stream-processor", + "model_name": "gpt-3.5-turbo", + "role": "worker" + } + ], + stream=True, # Enable streaming + service_tier="premium" # Use premium tier for better performance +) + +# Process streaming results +for chunk in swarm['stream']: + print(f"Received: {chunk}") + # Process each chunk as it arrives +``` + +### Error Recovery System + +```python +from swarms_client import SwarmsClient, RateLimitError +import time + +class ResilientSwarmSystem: + def __init__(self, api_key): + self.client = SwarmsClient( + api_key=api_key, + max_retries=5, + retry_delay=2.0 + ) + + def run_with_fallback(self, task): + try: + # Try primary model + return self.client.run_agent( + agent_name="primary", + task=task, + model_name="gpt-4" + ) + except RateLimitError: + # Fallback to secondary model + print("Rate limit hit, using fallback model") + return self.client.run_agent( + agent_name="fallback", + task=task, + model_name="gpt-3.5-turbo" + ) + except Exception as e: + # Final fallback + print(f"Error: {e}, using cached response") + return self.get_cached_response(task) + + def get_cached_response(self, task): + # Implement cache lookup logic + return {"cached": True, "response": "Cached response"} + +# Usage +system = ResilientSwarmSystem(api_key="your-api-key") +result = system.run_with_fallback("Analyze market trends") +``` + +## Best Practices + +### 1. API Key Security + +- Never hardcode API keys in your code +- Use environment variables for production +- Rotate keys regularly +- Use different keys for development/production + +### 2. Resource Management + +```python +# Always use context managers +async with SwarmsClient(api_key="key") as client: + result = await client.async_run_agent(...) + +# Or explicitly close +client = SwarmsClient(api_key="key") +try: + result = client.run_agent(...) +finally: + client.close() +``` + +### 3. Error Handling + +```python +# Implement comprehensive error handling +def safe_run_agent(client, **kwargs): + max_attempts = 3 + for attempt in range(max_attempts): + try: + return client.run_agent(**kwargs) + except RateLimitError: + if attempt < max_attempts - 1: + time.sleep(2 ** attempt) # Exponential backoff + else: + raise + except Exception as e: + logger.error(f"Attempt {attempt + 1} failed: {e}") + if attempt == max_attempts - 1: + raise +``` + +### 4. Optimize for Performance + +```python +# Use batch operations when possible +results = client.run_agent_batch(agents=[...]) + +# Enable caching for repeated requests +client = SwarmsClient(api_key="key", enable_cache=True) + +# Use appropriate concurrency limits +client = SwarmsClient( + api_key="key", + max_concurrent_requests=50 # Adjust based on your needs +) +``` + +### 5. Model Selection + +Choose models based on your requirements: +- **GPT-4**: Complex reasoning, analysis, creative tasks +- **GPT-3.5-turbo**: Faster responses, general tasks +- **Claude models**: Extended context, detailed analysis +- **Specialized models**: Domain-specific tasks + +### 6. Prompt Engineering + +```python +# Be specific with system prompts +agent = AgentSpec( + agent_name="researcher", + system_prompt="""You are an expert researcher specializing in: + 1. Academic literature review + 2. Data source verification + 3. Citation formatting (APA style) + + Always cite sources and verify facts.""", + model_name="gpt-4" +) +``` + +## Troubleshooting + +### Common Issues + +1. **Authentication Errors** + - Verify API key is correct + - Check environment variables + - Ensure key has necessary permissions + +2. **Rate Limiting** + - Implement exponential backoff + - Use batch operations + - Consider upgrading service tier + +3. **Timeout Errors** + - Increase timeout setting + - Break large tasks into smaller chunks + - Use streaming for long operations + +4. **Connection Issues** + - Check network connectivity + - Verify firewall settings + - Use retry logic + +### Debug Mode + +Enable detailed logging for troubleshooting: + +```python +import logging +from loguru import logger + +# Enable debug logging +logger.add("swarms_debug.log", level="DEBUG") + +# Create client with debug info +client = SwarmsClient( + api_key="your-key", + base_url="https://api.swarms.world" +) + +# Test connection +try: + health = client.get_health() + logger.info(f"Health check: {health}") +except Exception as e: + logger.error(f"Connection failed: {e}") +``` + +### Performance Monitoring + +```python +import time + +class PerformanceMonitor: + def __init__(self, client): + self.client = client + self.metrics = [] + + def run_with_metrics(self, method, **kwargs): + start_time = time.time() + try: + result = getattr(self.client, method)(**kwargs) + duration = time.time() - start_time + self.metrics.append({ + "method": method, + "duration": duration, + "success": True + }) + return result + except Exception as e: + duration = time.time() - start_time + self.metrics.append({ + "method": method, + "duration": duration, + "success": False, + "error": str(e) + }) + raise + + def get_statistics(self): + successful = [m for m in self.metrics if m["success"]] + if successful: + avg_duration = sum(m["duration"] for m in successful) / len(successful) + return { + "total_requests": len(self.metrics), + "successful": len(successful), + "average_duration": avg_duration, + "error_rate": (len(self.metrics) - len(successful)) / len(self.metrics) + } + return {"error": "No successful requests"} + +# Usage +monitor = PerformanceMonitor(client) +result = monitor.run_with_metrics("run_agent", agent_name="test", task="Analyze") +stats = monitor.get_statistics() +print(f"Performance stats: {stats}") +``` + +## Conclusion + +The Swarms API Client provides a robust, production-ready solution for interacting with the Swarms API. With its dual sync/async interface, comprehensive error handling, and performance optimizations, it enables developers to build scalable AI agent systems efficiently. Whether you're creating simple single-agent tasks or complex multi-agent swarms, this client offers the flexibility and reliability needed for production applications. + +For the latest updates and additional resources, visit the official documentation at [https://swarms.world](https://swarms.world) and obtain your API keys at [https://swarms.world/platform/api-keys](https://swarms.world/platform/api-keys). \ No newline at end of file diff --git a/new/auto_corp.py b/new/auto_corp.py deleted file mode 100644 index 561987d5..00000000 --- a/new/auto_corp.py +++ /dev/null @@ -1,643 +0,0 @@ -""" -CEO -> Finds department leader -Department leader -> Finds employees -Employees -> Do the work - -Todo -- Create schemas that enable the ceo to find the department leader or leaders -- CEO then distributes orders to department leaders or just one leader -- Department leader then distributes orders to employees -- Employees can choose to do the work or delegate to another employee or work together -- When the employees are done, they report back to the department leader -- Department leader then reports back to the ceo -- CEO then reports back to the user - - - -Logic -- dynamically setup conversations for each department -- Feed context to each agent in the department -- Feed context to each agent in the department -""" - -from typing import Callable, List, Union - -from pydantic import BaseModel, Field - -from swarms.structs.agent import Agent -from swarms.structs.conversation import Conversation -from swarms.structs.ma_utils import list_all_agents -from swarms.utils.str_to_dict import str_to_dict -from swarms.utils.any_to_str import any_to_str - - -class Department(BaseModel): - name: str = Field(description="The name of the department") - description: str = Field( - description="A description of the department" - ) - employees: List[Union[Agent, Callable]] = Field( - description="A list of employees in the department" - ) - leader_name: str = Field( - description="The name of the leader of the department" - ) - - class Config: - arbitrary_types_allowed = True - - -CEO_SCHEMA = { - "name": "delegate_task_to_department", - "description": "CEO function to analyze and delegate tasks to appropriate department leaders", - "parameters": { - "type": "object", - "properties": { - "thought": { - "type": "string", - "description": "Reasoning about the task, its requirements, and potential approaches", - }, - "plan": { - "type": "string", - "description": "Structured plan for how to accomplish the task across departments", - }, - "tasks": { - "type": "object", - "properties": { - "task_description": {"type": "string"}, - "selected_departments": { - "type": "array", - "items": {"type": "string"}, - "description": "List of department names that should handle this task", - }, - "selected_leaders": { - "type": "array", - "items": {"type": "string"}, - "description": "List of department leaders to assign the task to", - }, - "success_criteria": {"type": "string"}, - }, - "required": [ - "task_description", - "selected_departments", - "selected_leaders", - ], - }, - }, - "required": ["thought", "plan", "tasks"], - }, -} - -DEPARTMENT_LEADER_SCHEMA = { - "name": "manage_department_task", - "description": "Department leader function to break down and assign tasks to employees", - "parameters": { - "type": "object", - "properties": { - "task_management": { - "type": "object", - "properties": { - "original_task": {"type": "string"}, - "subtasks": { - "type": "array", - "items": { - "type": "object", - "properties": { - "subtask_id": {"type": "string"}, - "description": {"type": "string"}, - "assigned_employees": { - "type": "array", - "items": {"type": "string"}, - }, - "estimated_duration": { - "type": "string" - }, - "dependencies": { - "type": "array", - "items": {"type": "string"}, - }, - }, - }, - }, - "progress_tracking": { - "type": "object", - "properties": { - "status": { - "type": "string", - "enum": [ - "not_started", - "in_progress", - "completed", - ], - }, - "completion_percentage": { - "type": "number" - }, - "blockers": { - "type": "array", - "items": {"type": "string"}, - }, - }, - }, - }, - "required": ["original_task", "subtasks"], - } - }, - "required": ["task_management"], - }, -} - -EMPLOYEE_SCHEMA = { - "name": "handle_assigned_task", - "description": "Employee function to process and execute assigned tasks", - "parameters": { - "type": "object", - "properties": { - "thought": { - "type": "string", - "description": "Reasoning about the task, its requirements, and potential approaches", - }, - "plan": { - "type": "string", - "description": "Structured plan for how to accomplish the task across departments", - }, - "task_execution": { - "type": "object", - "properties": { - "subtask_id": {"type": "string"}, - "action_taken": { - "type": "string", - "enum": [ - "execute", - "delegate", - "collaborate", - ], - }, - "execution_details": { - "type": "object", - "properties": { - "status": { - "type": "string", - "enum": [ - "in_progress", - "completed", - "blocked", - ], - }, - "work_log": {"type": "string"}, - "collaboration_partners": { - "type": "array", - "items": {"type": "string"}, - }, - "delegate_to": {"type": "string"}, - "results": {"type": "string"}, - "issues_encountered": { - "type": "array", - "items": {"type": "string"}, - }, - }, - }, - }, - "required": [ - "thought", - "plan", - "subtask_id", - "action_taken", - "execution_details", - ], - }, - }, - "required": ["task_execution"], - }, -} - -# Status report schemas for the feedback loop -EMPLOYEE_REPORT_SCHEMA = { - "name": "submit_task_report", - "description": "Employee function to report task completion status to department leader", - "parameters": { - "type": "object", - "properties": { - "task_report": { - "type": "object", - "properties": { - "subtask_id": {"type": "string"}, - "completion_status": { - "type": "string", - "enum": ["completed", "partial", "blocked"], - }, - "work_summary": {"type": "string"}, - "time_spent": {"type": "string"}, - "challenges": { - "type": "array", - "items": {"type": "string"}, - }, - "next_steps": {"type": "string"}, - }, - "required": [ - "subtask_id", - "completion_status", - "work_summary", - ], - } - }, - "required": ["task_report"], - }, -} - -DEPARTMENT_REPORT_SCHEMA = { - "name": "submit_department_report", - "description": "Department leader function to report department progress to CEO", - "parameters": { - "type": "object", - "properties": { - "department_report": { - "type": "object", - "properties": { - "department_name": {"type": "string"}, - "task_summary": {"type": "string"}, - "overall_status": { - "type": "string", - "enum": ["on_track", "at_risk", "completed"], - }, - "completion_percentage": {"type": "number"}, - "key_achievements": { - "type": "array", - "items": {"type": "string"}, - }, - "blockers": { - "type": "array", - "items": {"type": "string"}, - }, - "resource_needs": { - "type": "array", - "items": {"type": "string"}, - }, - "next_milestones": { - "type": "array", - "items": {"type": "string"}, - }, - }, - "required": [ - "department_name", - "task_summary", - "overall_status", - ], - } - }, - "required": ["department_report"], - }, -} - -CEO_FINAL_REPORT_SCHEMA = { - "name": "generate_final_report", - "description": "CEO function to compile final report for the user", - "parameters": { - "type": "object", - "properties": { - "final_report": { - "type": "object", - "properties": { - "task_overview": {"type": "string"}, - "overall_status": { - "type": "string", - "enum": ["successful", "partial", "failed"], - }, - "department_summaries": { - "type": "array", - "items": { - "type": "object", - "properties": { - "department": {"type": "string"}, - "contribution": {"type": "string"}, - "performance": {"type": "string"}, - }, - }, - }, - "final_results": {"type": "string"}, - "recommendations": { - "type": "array", - "items": {"type": "string"}, - }, - "next_steps": {"type": "string"}, - }, - "required": [ - "task_overview", - "overall_status", - "final_results", - ], - } - }, - "required": ["final_report"], - }, -} - -# # Example output schemas -# CEO_EXAMPLE_OUTPUT = { -# "thought": "This task requires coordination between the engineering and design departments to create a new feature. The engineering team will handle the backend implementation while design focuses on the user interface.", -# "plan": "1. Assign backend development to engineering department\n2. Assign UI/UX design to design department\n3. Set up regular sync meetings between departments\n4. Establish clear success criteria", -# "tasks": { -# "task_description": "Develop a new user authentication system with social login integration", -# "selected_departments": ["engineering", "design"], -# "selected_leaders": ["engineering_lead", "design_lead"], -# "success_criteria": "1. Social login working with 3 major providers\n2. UI/UX approved by design team\n3. Security audit passed\n4. Performance metrics met" -# } -# } - -# DEPARTMENT_LEADER_EXAMPLE_OUTPUT = { -# "task_management": { -# "original_task": "Develop a new user authentication system with social login integration", -# "subtasks": [ -# { -# "subtask_id": "ENG-001", -# "description": "Implement OAuth2 integration for Google", -# "assigned_employees": ["dev1", "dev2"], -# "estimated_duration": "3 days", -# "dependencies": ["DES-001"] -# }, -# { -# "subtask_id": "ENG-002", -# "description": "Implement OAuth2 integration for Facebook", -# "assigned_employees": ["dev3"], -# "estimated_duration": "2 days", -# "dependencies": ["DES-001"] -# } -# ], -# "progress_tracking": { -# "status": "in_progress", -# "completion_percentage": 0.3, -# "blockers": ["Waiting for design team to provide UI mockups"] -# } -# } -# } - -# EMPLOYEE_EXAMPLE_OUTPUT = { -# "thought": "The Google OAuth2 integration requires careful handling of token management and user data synchronization", -# "plan": "1. Set up Google OAuth2 credentials\n2. Implement token refresh mechanism\n3. Create user data sync pipeline\n4. Add error handling and logging", -# "task_execution": { -# "subtask_id": "ENG-001", -# "action_taken": "execute", -# "execution_details": { -# "status": "in_progress", -# "work_log": "Completed OAuth2 credential setup and initial token handling implementation", -# "collaboration_partners": ["dev2"], -# "delegate_to": None, -# "results": "Successfully implemented basic OAuth2 flow", -# "issues_encountered": ["Need to handle token refresh edge cases"] -# } -# } -# } - -# EMPLOYEE_REPORT_EXAMPLE = { -# "task_report": { -# "subtask_id": "ENG-001", -# "completion_status": "partial", -# "work_summary": "Completed initial OAuth2 implementation, working on token refresh mechanism", -# "time_spent": "2 days", -# "challenges": ["Token refresh edge cases", "Rate limiting considerations"], -# "next_steps": "Implement token refresh mechanism and add rate limiting protection" -# } -# } - -# DEPARTMENT_REPORT_EXAMPLE = { -# "department_report": { -# "department_name": "Engineering", -# "task_summary": "Making good progress on OAuth2 implementation, but waiting on design team for UI components", -# "overall_status": "on_track", -# "completion_percentage": 0.4, -# "key_achievements": [ -# "Completed Google OAuth2 basic flow", -# "Set up secure token storage" -# ], -# "blockers": ["Waiting for UI mockups from design team"], -# "resource_needs": ["Additional QA resources for testing"], -# "next_milestones": [ -# "Complete Facebook OAuth2 integration", -# "Implement token refresh mechanism" -# ] -# } -# } - -# CEO_FINAL_REPORT_EXAMPLE = { -# "final_report": { -# "task_overview": "Successfully implemented new authentication system with social login capabilities", -# "overall_status": "successful", -# "department_summaries": [ -# { -# "department": "Engineering", -# "contribution": "Implemented secure OAuth2 integrations and token management", -# "performance": "Excellent - completed all technical requirements" -# }, -# { -# "department": "Design", -# "contribution": "Created intuitive UI/UX for authentication flows", -# "performance": "Good - delivered all required designs on time" -# } -# ], -# "final_results": "New authentication system is live and processing 1000+ logins per day", -# "recommendations": [ -# "Add more social login providers", -# "Implement biometric authentication", -# "Add two-factor authentication" -# ], -# "next_steps": "Monitor system performance and gather user feedback for improvements" -# } -# } - - -class AutoCorp: - def __init__( - self, - name: str = "AutoCorp", - description: str = "A company that uses agents to automate tasks", - departments: List[Department] = [], - ceo: Agent = None, - ): - self.name = name - self.description = description - self.departments = departments - self.ceo = ceo - self.conversation = Conversation() - - # Check if the CEO and departments are set - self.reliability_check() - - # Add departments to conversation - self.add_departments_to_conversation() - - # Initialize the CEO agent - self.initialize_ceo_agent() - - # Initialize the department leaders - self.setup_department_leaders() - - # Initialize the department employees - self.department_employees_initialize() - - def initialize_ceo_agent(self): - self.ceo.tools_list_dictionary = [ - CEO_SCHEMA, - CEO_FINAL_REPORT_SCHEMA, - ] - - def setup_department_leaders(self): - self.department_leader_initialize() - self.initialize_department_leaders() - - def department_leader_initialize(self): - """Initialize each department leader with their department's context.""" - - for department in self.departments: - # Create a context dictionary for the department - department_context = { - "name": department.name, - "description": department.description, - "employees": list_all_agents( - department.employees, - self.conversation, - department.name, - False, - ), - } - - # Convert the context to a string - context_str = any_to_str(department_context) - - # TODO: Add the department leader's tools and context - department.leader.system_prompt += f""" - You are the leader of the {department.name} department. - - Department Context: - {context_str} - - Your role is to: - 1. Break down tasks into subtasks - 2. Assign subtasks to appropriate employees - 3. Track progress and manage blockers - 4. Report back to the CEO - - Use the provided tools to manage your department effectively. - """ - - def department_employees_initialize(self): - """Initialize each department leader with their department's context.""" - - for department in self.departments: - # Create a context dictionary for the department - department_context = { - "name": department.name, - "description": department.description, - "employees": list_all_agents( - department.employees, - self.conversation, - department.name, - False, - ), - "leader": department.leader_name, - } - - print(department_context) - - # Convert the context to a string - context_str = any_to_str(department_context) - - # Set the department leader's tools and context - department.employees.system_prompt += f""" - You are an employee of the {department.name} department. - - Department Context: - {context_str} - - Your role is to: - 1. Break down tasks into subtasks - 2. Assign subtasks to appropriate employees - 3. Track progress and manage blockers - 4. Report back to the CEO - - Use the provided tools to manage your department effectively. - """ - - def initialize_department_leaders(self): - # Use list comprehension for faster initialization - [ - setattr( - dept.leader, - "tools_list_dictionary", - [DEPARTMENT_LEADER_SCHEMA], - ) - for dept in self.departments - ] - - def reliability_check(self): - if self.ceo is None: - raise ValueError("CEO is not set") - - if self.departments is None: - raise ValueError("No departments are set") - - if len(self.departments) == 0: - raise ValueError("No departments are set") - - def add_departments_to_conversation(self): - # Batch process departments using list comprehension - messages = [ - { - "role": "System", - "content": f"Team: {dept.name}\nDescription: {dept.description}\nLeader: {dept.leader_name}\nAgents: {list_all_agents(dept.employees, self.conversation, dept.name, False)}", - } - for dept in self.departments - ] - self.conversation.batch_add(messages) - - # def add_department(self, department: Department): - # self.departments.append(department) - - # def add_employee(self, employee: Union[Agent, Callable]): - # self.departments[-1].employees.append(employee) - - # def add_ceo(self, ceo: Agent): - # self.ceo = ceo - - # def add_employee_to_department( - # self, employee: Union[Agent, Callable], department: Department - # ): - # department.employees.append(employee) - - # def add_leader_to_department( - # self, leader: Agent, department: Department - # ): - # department.leader = leader - - # def add_department_to_auto_corp(self, department: Department): - # self.departments.append(department) - - # def add_ceo_to_auto_corp(self, ceo: Agent): - # self.ceo = ceo - - # def add_employee_to_ceo(self, employee: Union[Agent, Callable]): - # self.ceo.employees.append(employee) - - def run(self, task: str): - self.ceo_to_department_leaders(task) - - # Then the department leaders to employees - - def ceo_to_department_leaders(self, task: str): - orders = self.ceo.run( - f"History: {self.conversation.get_str()}\n Your Current Task: {task}" - ) - - orders = str_to_dict(orders) - - for department in orders["tasks"]["selected_departments"]: - department_leader = self.departments[department].leader - - # Get the department leader to break down the task - outputs = department_leader.run( - orders["tasks"]["selected_leaders"] - ) - - # Add the department leader's response to the conversation - self.conversation.add( - role=f"{department_leader.name} from {department}", - content=outputs, - ) diff --git a/new/create_agent.py b/new/create_agent.py deleted file mode 100644 index c545aa57..00000000 --- a/new/create_agent.py +++ /dev/null @@ -1,291 +0,0 @@ -import concurrent.futures -from typing import Dict, Optional -import secrets -import string -import uuid - -from dotenv import load_dotenv -from swarms import Agent - -import replicate - -from swarms.utils.str_to_dict import str_to_dict - -load_dotenv() - - -def generate_key(prefix: str = "run") -> str: - """ - Generates an API key similar to OpenAI's format (sk-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX). - - Args: - prefix (str): The prefix for the API key. Defaults to "sk". - - Returns: - str: An API key string in format: prefix-<48 random characters> - """ - # Create random string of letters and numbers - alphabet = string.ascii_letters + string.digits - random_part = "".join(secrets.choice(alphabet) for _ in range(28)) - return f"{prefix}-{random_part}" - - -def _generate_media( - prompt: str = None, modalities: list = None -) -> Dict[str, str]: - """ - Generate media content (images or videos) based on text prompts using AI models. - - Args: - prompt (str): Text description of the content to be generated - modalities (list): List of media types to generate (e.g., ["image", "video"]) - - Returns: - Dict[str, str]: Dictionary containing file paths of generated media - """ - if not prompt or not modalities: - raise ValueError("Prompt and modalities must be provided") - - input = {"prompt": prompt} - results = {} - - def _generate_image(input: Dict) -> str: - """Generate an image and return the file path.""" - output = replicate.run( - "black-forest-labs/flux-dev", input=input - ) - file_paths = [] - - for index, item in enumerate(output): - unique_id = str(uuid.uuid4()) - artifact = item.read() - file_path = f"output_{unique_id}_{index}.webp" - - with open(file_path, "wb") as file: - file.write(artifact) - - file_paths.append(file_path) - - return file_paths - - def _generate_video(input: Dict) -> str: - """Generate a video and return the file path.""" - output = replicate.run("luma/ray", input=input) - unique_id = str(uuid.uuid4()) - artifact = output.read() - file_path = f"output_{unique_id}.mp4" - - with open(file_path, "wb") as file: - file.write(artifact) - - return file_path - - for modality in modalities: - if modality == "image": - results["images"] = _generate_image(input) - elif modality == "video": - results["video"] = _generate_video(input) - else: - raise ValueError(f"Unsupported modality: {modality}") - - print(results) - - return results - - -def generate_media( - modalities: list, - prompt: Optional[str] = None, - count: int = 1, -) -> Dict: - with concurrent.futures.ThreadPoolExecutor( - max_workers=count - ) as executor: - # Create list of identical tasks to run concurrently - futures = [ - executor.submit( - _generate_media, - prompt=prompt, # Fix: Pass as keyword arguments - modalities=modalities, - ) - for _ in range(count) - ] - - # Wait for all tasks to complete and collect results - results = [ - future.result() - for future in concurrent.futures.as_completed(futures) - ] - - return {"results": results} - - -tools = [ - { - "type": "function", - "function": { - "name": "generate_media", - "description": "Generate different types of media content (image, video, or music) based on text prompts using AI models.", - "parameters": { - "type": "object", - "properties": { - "modality": { - "type": "array", - "items": { - "type": "string", - "enum": ["image", "video", "music"], - }, - "description": "The type of media content to generate", - }, - "prompt": { - "type": "string", - "description": "Text description of the content to be generated. Specialize it for the modality at hand. For example, if you are generating an image, the prompt should be a description of the image you want to see. If you are generating a video, the prompt should be a description of the video you want to see. If you are generating music, the prompt should be a description of the music you want to hear.", - }, - "count": { - "type": "integer", - "description": "Number of outputs to generate (1-4)", - }, - }, - "required": [ - "modality", - "prompt", - "count", - ], - }, - }, - } -] - - -MEDIA_GENERATION_SYSTEM_PROMPT = """ -You are an expert AI Media Generation Assistant, specialized in crafting precise and effective prompts for generating images, videos, and music. Your role is to help users create high-quality media content by understanding their requests and translating them into optimal prompts. - -GENERAL GUIDELINES: -- Always analyze the user's request carefully to determine the appropriate modality (image, video, or music) -- Maintain a balanced level of detail in prompts - specific enough to capture the desired outcome but not overly verbose -- Consider the technical limitations and capabilities of AI generation systems -- When unclear, ask for clarification about specific details or preferences - -MODALITY-SPECIFIC GUIDELINES: - -1. IMAGE GENERATION: -- Structure prompts with primary subject first, followed by style, mood, and technical specifications -- Include relevant art styles when specified (e.g., "digital art", "oil painting", "watercolor", "photorealistic") -- Consider composition elements (foreground, background, lighting, perspective) -- Use specific adjectives for clarity (instead of "beautiful", specify "vibrant", "ethereal", "gritty", etc.) - -Example image prompts: -- "A serene Japanese garden at sunset, with cherry blossoms falling, painted in traditional ukiyo-e style, soft pastel colors" -- "Cyberpunk cityscape at night, neon lights reflecting in rain puddles, hyper-realistic digital art style" - -2. VIDEO GENERATION: -- Describe the sequence of events clearly -- Specify camera movements if relevant (pan, zoom, tracking shot) -- Include timing and transitions when necessary -- Focus on dynamic elements and motion - -Example video prompts: -- "Timelapse of a flower blooming in a garden, close-up shot, soft natural lighting, 10-second duration" -- "Drone shot flying through autumn forest, camera slowly rising above the canopy, revealing mountains in the distance" - -3. MUSIC GENERATION: -- Specify genre, tempo, and mood -- Mention key instruments or sounds -- Include emotional qualities and intensity -- Reference similar artists or styles if relevant - -Example music prompts: -- "Calm ambient electronic music with soft synthesizer pads, gentle piano melodies, 80 BPM, suitable for meditation" -- "Upbeat jazz fusion track with prominent bass line, dynamic drums, and horn section, inspired by Weather Report" - -COUNT HANDLING: -- When multiple outputs are requested (1-4), maintain consistency while introducing subtle variations -- For images: Vary composition or perspective while maintaining style -- For videos: Adjust camera angles or timing while keeping the core concept -- For music: Modify instrument arrangements or tempo while preserving the genre and mood - -PROMPT OPTIMIZATION PROCESS: -1. Identify core requirements from user input -2. Determine appropriate modality -3. Add necessary style and technical specifications -4. Adjust detail level based on complexity -5. Consider count and create variations if needed - -EXAMPLES OF HANDLING USER REQUESTS: - -User: "I want a fantasy landscape" -Assistant response: { - "modality": "image", - "prompt": "Majestic fantasy landscape with floating islands, crystal waterfalls, and ancient magical ruins, ethereal lighting, digital art style with rich colors", - "count": 1 -} - -User: "Create 3 variations of a peaceful nature scene" -Assistant response: { - "modality": "image", - "prompt": "Tranquil forest clearing with morning mist, sunbeams filtering through ancient trees, photorealistic style with soft natural lighting", - "count": 1 -} - -IMPORTANT CONSIDERATIONS: -- Avoid harmful, unethical, or inappropriate content -- Respect copyright and intellectual property guidelines -- Maintain consistency with brand guidelines when specified -- Consider technical limitations of current AI generation systems - -""" - -# Initialize the agent with the new system prompt -agent = Agent( - agent_name="Media-Generation-Agent", - agent_description="AI Media Generation Assistant", - system_prompt=MEDIA_GENERATION_SYSTEM_PROMPT, - max_loops=1, - tools_list_dictionary=tools, - output_type="final", -) - - -def create_agent(task: str): - output = str_to_dict(agent.run(task)) - - print(output) - print(type(output)) - - prompt = output["prompt"] - count = output["count"] - modalities = output["modality"] - - output = generate_media( - modalities=modalities, - prompt=prompt, - count=count, - ) - - run_id = generate_key() - - total_cost = 0 - - for modality in modalities: - if modality == "image": - total_cost += 0.1 - elif modality == "video": - total_cost += 1 - - result = { - "id": run_id, - "success": True, - "prompt": prompt, - "count": count, - "modality": modalities, - "total_cost": total_cost, - } - - return result - - -if __name__ == "__main__": - task = "Create 3 super kawaii variations of a magical Chinese mountain garden scene in anime style! 🌸✨ Include adorable elements like: cute koi fish swimming in crystal ponds, fluffy clouds floating around misty peaks, tiny pagodas with twinkling lights, and playful pandas hiding in bamboo groves. Make it extra magical with sparkles and soft pastel colors! Create both a video and an image for each variation. Just 1." - output = create_agent(task) - print("✨ Yay! Here's your super cute creation! ✨") - print(output) diff --git a/pyproject.toml b/pyproject.toml index f778cee0..a236bcb1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "7.7.6" +version = "7.7.8" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/cort_agent.py b/swarms/agents/cort_agent.py similarity index 100% rename from cort_agent.py rename to swarms/agents/cort_agent.py diff --git a/swarms/agents/react_agent.py b/swarms/agents/react_agent.py new file mode 100644 index 00000000..59cd89fd --- /dev/null +++ b/swarms/agents/react_agent.py @@ -0,0 +1,173 @@ +from swarms import Agent +from typing import List + + +# System prompt for REACT agent +REACT_AGENT_PROMPT = """ +You are a REACT (Reason, Act, Observe) agent designed to solve tasks through an iterative process of reasoning and action. You maintain memory of previous steps to build upon past actions and observations. + +Your process follows these key components: + +1. MEMORY: Review and utilize previous steps + - Access and analyze previous observations + - Build upon past thoughts and plans + - Learn from previous actions + - Use historical context to make better decisions + +2. OBSERVE: Analyze current state + - Consider both new information and memory + - Identify relevant patterns from past steps + - Note any changes or progress made + - Evaluate success of previous actions + +3. THINK: Process and reason + - Combine new observations with historical knowledge + - Consider how past steps influence current decisions + - Identify patterns and learning opportunities + - Plan improvements based on previous outcomes + +4. PLAN: Develop next steps + - Create strategies that build on previous success + - Avoid repeating unsuccessful approaches + - Consider long-term goals and progress + - Maintain consistency with previous actions + +5. ACT: Execute with context + - Implement actions that progress from previous steps + - Build upon successful past actions + - Adapt based on learned experiences + - Maintain continuity in approach + +For each step, you should: +- Reference relevant previous steps +- Show how current decisions relate to past actions +- Demonstrate learning and adaptation +- Maintain coherent progression toward the goal + +Your responses should be structured, logical, and show clear reasoning that builds upon previous steps.""" + +# Schema for REACT agent responses +react_agent_schema = { + "type": "function", + "function": { + "name": "generate_react_response", + "description": "Generates a structured REACT agent response with memory of previous steps", + "parameters": { + "type": "object", + "properties": { + "memory_reflection": { + "type": "string", + "description": "Analysis of previous steps and their influence on current thinking", + }, + "observation": { + "type": "string", + "description": "Current state observation incorporating both new information and historical context", + }, + "thought": { + "type": "string", + "description": "Reasoning that builds upon previous steps and current observation", + }, + "plan": { + "type": "string", + "description": "Structured plan that shows progression from previous actions", + }, + "action": { + "type": "string", + "description": "Specific action that builds upon previous steps and advances toward the goal", + }, + }, + "required": [ + "memory_reflection", + "observation", + "thought", + "plan", + "action", + ], + }, + }, +} + + +class ReactAgent: + def __init__( + self, + name: str = "react-agent-o1", + description: str = "A react agent that uses o1 preview to solve tasks", + model_name: str = "openai/gpt-4o", + max_loops: int = 1, + ): + self.name = name + self.description = description + self.model_name = model_name + self.max_loops = max_loops + + self.agent = Agent( + agent_name=self.name, + agent_description=self.description, + model_name=self.model_name, + max_loops=1, + tools_list_dictionary=[react_agent_schema], + output_type="final", + ) + + # Initialize memory for storing steps + self.memory: List[str] = [] + + def step(self, task: str) -> str: + """Execute a single step of the REACT process. + + Args: + task: The task description or current state + + Returns: + String response from the agent + """ + response = self.agent.run(task) + print(response) + return response + + def run(self, task: str, *args, **kwargs) -> List[str]: + """Run the REACT agent for multiple steps with memory. + + Args: + task: The initial task description + *args: Additional positional arguments + **kwargs: Additional keyword arguments + + Returns: + List of all steps taken as strings + """ + # Reset memory at the start of a new run + self.memory = [] + + current_task = task + for i in range(self.max_loops): + print(f"\nExecuting step {i+1}/{self.max_loops}") + step_result = self.step(current_task) + print(step_result) + + # Store step in memory + self.memory.append(step_result) + + # Update task with previous response and memory context + memory_context = ( + "\n\nMemory of previous steps:\n" + + "\n".join( + f"Step {j+1}:\n{step}" + for j, step in enumerate(self.memory) + ) + ) + + current_task = f"Previous response:\n{step_result}\n{memory_context}\n\nContinue with the original task: {task}" + + return self.memory + + +# if __name__ == "__main__": +# agent = ReactAgent( +# max_loops=1 +# ) # Increased max_loops to see the iteration +# result = agent.run( +# "Write a short story about a robot that can fly." +# ) +# print(result) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index d137999a..2e795b7e 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -562,7 +562,7 @@ class Agent: if self.react_on is True: self.system_prompt += REACT_SYS_PROMPT - if len(self.max_loops) > 1: + if self.max_loops >= 2: self.system_prompt += generate_reasoning_prompt( self.max_loops ) @@ -1044,14 +1044,14 @@ class Agent: ): loop_count += 1 - if len(self.max_loops) > 1: + if self.max_loops >= 2: self.short_memory.add( role=self.agent_name, content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}", ) # If it is the final loop, then add the final loop message - if loop_count == self.max_loops: + if loop_count >= 2 and loop_count == self.max_loops: self.short_memory.add( role=self.agent_name, content=f"πŸŽ‰ Final Internal Reasoning Loop: {loop_count}/{self.max_loops} Prepare your comprehensive response.", diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index d6dfd619..c6a653ae 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -4,8 +4,6 @@ from concurrent.futures import ThreadPoolExecutor from functools import lru_cache from typing import Any, Callable, Dict, List, Optional, Union -from tqdm import tqdm - from swarms.structs.agent import Agent from swarms.structs.base_swarm import BaseSwarm from swarms.structs.conversation import Conversation @@ -22,9 +20,7 @@ class ConcurrentWorkflow(BaseSwarm): """ Represents a concurrent workflow that executes multiple agents concurrently in a production-grade manner. Features include: - - Interactive model support - Caching for repeated prompts - - Optional progress tracking - Enhanced error handling and retries - Input validation @@ -39,11 +35,9 @@ class ConcurrentWorkflow(BaseSwarm): return_str_on (bool): Flag indicating whether to return the output as a string. Defaults to False. auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. Defaults to False. return_entire_history (bool): Flag indicating whether to return the entire conversation history. Defaults to False. - interactive (bool): Flag indicating whether to enable interactive mode. Defaults to False. cache_size (int): The size of the cache. Defaults to 100. max_retries (int): The maximum number of retry attempts. Defaults to 3. retry_delay (float): The delay between retry attempts in seconds. Defaults to 1.0. - show_progress (bool): Flag indicating whether to show progress. Defaults to False. Raises: ValueError: If the list of agents is empty or if the description is empty. @@ -59,13 +53,10 @@ class ConcurrentWorkflow(BaseSwarm): return_str_on (bool): Flag indicating whether to return the output as a string. auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. return_entire_history (bool): Flag indicating whether to return the entire conversation history. - interactive (bool): Flag indicating whether to enable interactive mode. cache_size (int): The size of the cache. max_retries (int): The maximum number of retry attempts. retry_delay (float): The delay between retry attempts in seconds. - show_progress (bool): Flag indicating whether to show progress. _cache (dict): The cache for storing agent outputs. - _progress_bar (tqdm): The progress bar for tracking execution. """ def __init__( @@ -80,11 +71,9 @@ class ConcurrentWorkflow(BaseSwarm): return_str_on: bool = False, auto_generate_prompts: bool = False, return_entire_history: bool = False, - interactive: bool = False, cache_size: int = 100, max_retries: int = 3, retry_delay: float = 1.0, - show_progress: bool = False, *args, **kwargs, ): @@ -107,21 +96,14 @@ class ConcurrentWorkflow(BaseSwarm): self.output_type = output_type self.return_entire_history = return_entire_history self.tasks = [] # Initialize tasks list - self.interactive = interactive self.cache_size = cache_size self.max_retries = max_retries self.retry_delay = retry_delay - self.show_progress = show_progress self._cache = {} - self._progress_bar = None self.reliability_check() self.conversation = Conversation() - def disable_agent_prints(self): - for agent in self.agents: - agent.no_print = False - def reliability_check(self): try: formatter.print_panel( @@ -186,44 +168,6 @@ class ConcurrentWorkflow(BaseSwarm): """Cached version of agent execution to avoid redundant computations""" return self.agents[agent_id].run(task=task) - def enable_progress_bar(self): - """Enable progress bar display""" - self.show_progress = True - - def disable_progress_bar(self): - """Disable progress bar display""" - if self._progress_bar: - self._progress_bar.close() - self._progress_bar = None - self.show_progress = False - - def _create_progress_bar(self, total: int): - """Create a progress bar for tracking execution""" - if self.show_progress: - try: - self._progress_bar = tqdm( - total=total, - desc="Processing tasks", - unit="task", - disable=not self.show_progress, - ncols=100, - bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]", - ) - except Exception as e: - logger.warning(f"Failed to create progress bar: {e}") - self.show_progress = False - self._progress_bar = None - return self._progress_bar - - def _update_progress(self, increment: int = 1): - """Update the progress bar""" - if self._progress_bar and self.show_progress: - try: - self._progress_bar.update(increment) - except Exception as e: - logger.warning(f"Failed to update progress bar: {e}") - self.disable_progress_bar() - def _validate_input(self, task: str) -> bool: """Validate input task""" if not isinstance(task, str): @@ -232,38 +176,6 @@ class ConcurrentWorkflow(BaseSwarm): raise ValueError("Task cannot be empty") return True - def _handle_interactive(self, task: str) -> str: - """Handle interactive mode for task input""" - if self.interactive: - from swarms.utils.formatter import formatter - - # Display current task in a panel - formatter.print_panel( - content=f"Current task: {task}", - title="Task Status", - style="bold blue", - ) - - # Get user input with formatted prompt - formatter.print_panel( - content="Do you want to modify this task? (y/n/q to quit): ", - title="User Input", - style="bold green", - ) - response = input().lower() - - if response == "q": - return None - elif response == "y": - formatter.print_panel( - content="Enter new task: ", - title="New Task Input", - style="bold yellow", - ) - new_task = input() - return new_task - return task - def _run_with_retry( self, agent: Agent, task: str, img: str = None ) -> Any: @@ -286,68 +198,69 @@ class ConcurrentWorkflow(BaseSwarm): self.retry_delay * (attempt + 1) ) # Exponential backoff + def _process_agent( + self, agent: Agent, task: str, img: str = None + ) -> Any: + """ + Process a single agent with caching and error handling. + + Args: + agent: The agent to process + task: Task to execute + img: Optional image input + + Returns: + The agent's output + """ + try: + # Fast path - check cache first + cache_key = f"{task}_{agent.agent_name}" + if cache_key in self._cache: + output = self._cache[cache_key] + else: + # Slow path - run agent and update cache + output = self._run_with_retry(agent, task, img) + + if len(self._cache) >= self.cache_size: + self._cache.pop(next(iter(self._cache))) + + self._cache[cache_key] = output + + return output + except Exception as e: + logger.error( + f"Error running agent {agent.agent_name}: {e}" + ) + raise + def _run( self, task: str, img: str = None, *args, **kwargs ) -> Union[Dict[str, Any], str]: """ - Enhanced run method with caching, progress tracking, and better error handling + Enhanced run method with parallel execution. """ - - # Validate and potentially modify task + # Fast validation self._validate_input(task) - task = self._handle_interactive(task) - - # Add task to conversation self.conversation.add("User", task) - # Create progress bar if enabled - if self.show_progress: - self._create_progress_bar(len(self.agents)) - - def run_agent( - agent: Agent, task: str, img: str = None - ) -> Any: - try: - # Check cache first - cache_key = f"{task}_{agent.agent_name}" - if cache_key in self._cache: - output = self._cache[cache_key] - else: - output = self._run_with_retry(agent, task, img) - # Update cache - if len(self._cache) >= self.cache_size: - self._cache.pop(next(iter(self._cache))) - self._cache[cache_key] = output - - self._update_progress() - return output - except Exception as e: - logger.error( - f"Error running agent {agent.agent_name}: {e}" - ) - self._update_progress() - raise - try: + # Parallel execution with optimized thread pool with ThreadPoolExecutor( max_workers=self.max_workers ) as executor: - list( - executor.map( - lambda agent: run_agent(agent, task), - self.agents, + futures = [ + executor.submit( + self._process_agent, agent, task, img ) - ) - finally: - if self._progress_bar and self.show_progress: - try: - self._progress_bar.close() - except Exception as e: - logger.warning( - f"Failed to close progress bar: {e}" - ) - finally: - self._progress_bar = None + for agent in self.agents + ] + # Wait for all futures to complete + for future in futures: + future.result() + + except Exception as e: + logger.error(f"An error occurred during execution: {e}") + raise e return history_output_formatter( self.conversation, @@ -362,20 +275,11 @@ class ConcurrentWorkflow(BaseSwarm): **kwargs, ) -> Any: """ - Executes the agent's run method on a specified device with optional interactive mode. - - This method attempts to execute the agent's run method on a specified device, either CPU or GPU. - It supports both standard execution and interactive mode where users can modify tasks and continue - the workflow interactively. + Executes the agent's run method with parallel execution. Args: task (Optional[str], optional): The task to be executed. Defaults to None. img (Optional[str], optional): The image to be processed. Defaults to None. - is_last (bool, optional): Indicates if this is the last task. Defaults to False. - device (str, optional): The device to use for execution. Defaults to "cpu". - device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0. - all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True. - all_gpus (bool, optional): If True, uses all available GPUS. Defaults to True. *args: Additional positional arguments to be passed to the execution method. **kwargs: Additional keyword arguments to be passed to the execution method. @@ -383,117 +287,27 @@ class ConcurrentWorkflow(BaseSwarm): Any: The result of the execution. Raises: - ValueError: If an invalid device is specified. + ValueError: If task validation fails. Exception: If any other error occurs during execution. """ if task is not None: self.tasks.append(task) try: - # Handle interactive mode - if self.interactive: - current_task = task - loop_count = 0 - - while loop_count < self.max_loops: - if ( - self.max_loops is not None - and loop_count >= self.max_loops - ): - formatter.print_panel( - content=f"Maximum number of loops ({self.max_loops}) reached.", - title="Session Complete", - style="bold red", - ) - break - - if current_task is None: - formatter.print_panel( - content="Enter your task (or 'q' to quit): ", - title="Task Input", - style="bold blue", - ) - current_task = input() - if current_task.lower() == "q": - break - - # Run the workflow with the current task - try: - outputs = self._run( - current_task, img, *args, **kwargs - ) - formatter.print_panel( - content=str(outputs), - title="Workflow Result", - style="bold green", - ) - except Exception as e: - formatter.print_panel( - content=f"Error: {str(e)}", - title="Error", - style="bold red", - ) - - # Ask if user wants to continue - formatter.print_panel( - content="Do you want to continue with a new task? (y/n): ", - title="Continue Session", - style="bold yellow", - ) - if input().lower() != "y": - break - - current_task = None - loop_count += 1 - - formatter.print_panel( - content="Interactive session ended.", - title="Session Complete", - style="bold blue", - ) - return outputs - else: - # Standard non-interactive execution - outputs = self._run(task, img, *args, **kwargs) - return outputs - - except ValueError as e: - logger.error(f"Invalid device specified: {e}") - raise e + outputs = self._run(task, img, *args, **kwargs) + return outputs except Exception as e: logger.error(f"An error occurred during execution: {e}") raise e def run_batched(self, tasks: List[str]) -> Any: """ - Enhanced batched execution with progress tracking + Enhanced batched execution """ if not tasks: raise ValueError("Tasks list cannot be empty") - results = [] - - # Create progress bar if enabled - if self.show_progress: - self._create_progress_bar(len(tasks)) - - try: - for task in tasks: - result = self.run(task) - results.append(result) - self._update_progress() - finally: - if self._progress_bar and self.show_progress: - try: - self._progress_bar.close() - except Exception as e: - logger.warning( - f"Failed to close progress bar: {e}" - ) - finally: - self._progress_bar = None - - return results + return [self.run(task) for task in tasks] def clear_cache(self): """Clear the task cache"""