[DOCS][Swarms API Rate Limits] [concurrent wrapper in swarms.utils]

dependabot/pip/docstring-parser-0.17.0
Kye Gomez 6 days ago
parent f916c89cc1
commit 4a86e28893

@ -1,3 +1,4 @@
# Get your packages
from swarms import Agent, ConcurrentWorkflow from swarms import Agent, ConcurrentWorkflow
# Initialize market research agent # Initialize market research agent
@ -11,7 +12,7 @@ market_researcher = Agent(
5. Providing actionable market insights""", 5. Providing actionable market insights""",
model_name="claude-3-sonnet-20240229", model_name="claude-3-sonnet-20240229",
max_loops=1, max_loops=1,
temperature=0.7, dynamic_temperature_enabled=True,
# streaming_on=True, # streaming_on=True,
) )
@ -24,10 +25,9 @@ financial_analyst = Agent(
3. Assessing risk factors 3. Assessing risk factors
4. Providing financial forecasts 4. Providing financial forecasts
5. Recommending financial strategies""", 5. Recommending financial strategies""",
model_name="claude-3-sonnet-20240229", model_name="gpt-4.1",
max_loops=1, max_loops=1,
# streaming_on=True, dynamic_temperature_enabled=True,
temperature=0.7,
) )
# Initialize technical analyst agent # Initialize technical analyst agent
@ -51,14 +51,15 @@ agents = [market_researcher, financial_analyst, technical_analyst]
router = ConcurrentWorkflow( router = ConcurrentWorkflow(
name="market-analysis-router", name="market-analysis-router",
description="This concurrent workflow is used to analyze the market, financial, and technical aspects of a stock.",
agents=agents, agents=agents,
max_loops=1, max_loops=1,
# output_type="all", output_type="all",
show_dashboard=True, show_dashboard=True,
) )
result = router.run( result = router.run(
"Analyze Tesla (TSLA) stock from market, financial, and technical perspectives" task="What are the best 3 ETFS for energy sector in the US?"
) )
print(result) print(result)

@ -1,59 +1,196 @@
# Swarms API Rate Limits # Swarms API Rate Limits
The Swarms API implements rate limiting to ensure fair usage and system stability. Here are the current limits: The Swarms API implements a comprehensive rate limiting system that tracks API requests across multiple time windows and enforces various limits to ensure fair usage and system stability.
## Standard Rate Limits ## Rate Limits Summary
- **General API Requests**: 100 requests per minute | Rate Limit Type | Free Tier | Premium Tier | Time Window | Description |
- **Batch Operations**: Maximum 10 requests per batch for agent/swarm batch operations |----------------|-----------|--------------|-------------|-------------|
| **Requests per Minute** | 100 | 2,000 | 1 minute | Maximum API calls per minute |
| **Requests per Hour** | 50 | 10,000 | 1 hour | Maximum API calls per hour |
| **Requests per Day** | 1,200 | 100,000 | 24 hours | Maximum API calls per day |
| **Tokens per Agent** | 200,000 | 2,000,000 | Per request | Maximum tokens per agent |
| **Prompt Length** | 200,000 | 200,000 | Per request | Maximum input tokens per request |
| **Batch Size** | 10 | 10 | Per request | Maximum agents in batch requests |
| **IP-based Fallback** | 100 | 100 | 60 seconds | For requests without API keys |
## Rate Limit Response ## Detailed Rate Limit Explanations
When you exceed the rate limit, the API will return a 429 (Too Many Requests) status code with the following message: ### 1. **Request Rate Limits**
```json
{ These limits control how many API calls you can make within specific time windows.
"detail": "Rate limit exceeded. Please try again later."
} #### **Per-Minute Limit**
```
| Tier | Requests per Minute | Reset Interval | Applies To |
|--------------|--------------------|------------------------|--------------------|
| Free | 100 | Every minute (sliding) | All API endpoints |
| Premium | 2,000 | Every minute (sliding) | All API endpoints |
#### **Per-Hour Limit**
- **Free Tier**: 50 requests per hour
- **Premium Tier**: 10,000 requests per hour
- **Reset**: Every hour (sliding window)
- **Applies to**: All API endpoints
#### **Per-Day Limit**
- **Free Tier**: 1,200 requests per day (50 × 24)
- **Premium Tier**: 100,000 requests per day
- **Reset**: Every 24 hours (sliding window)
- **Applies to**: All API endpoints
### 2. **Token Limits**
These limits control the amount of text processing allowed per request.
#### **Tokens per Agent**
- **Free Tier**: 200,000 tokens per agent
- **Premium Tier**: 2,000,000 tokens per agent
- **Applies to**: Individual agent configurations
- **Includes**: System prompts, task descriptions, and agent names
#### **Prompt Length Limit**
- **All Tiers**: 200,000 tokens maximum
- **Applies to**: Combined input text (task + history + system prompts)
- **Error**: Returns 400 error if exceeded
- **Message**: "Prompt is too long. Please provide a prompt that is less than 10000 tokens."
## Batch Operation Limits ### 3. **Batch Processing Limits**
For batch operations (`/v1/agent/batch/completions` and `/v1/swarm/batch/completions`): These limits control concurrent processing capabilities.
- Maximum 10 concurrent requests per batch #### **Batch Size Limit**
- Exceeding this limit will result in a 400 (Bad Request) error - **All Tiers**: 10 agents maximum per batch
## Increasing Your Rate Limits - **Applies to**: `/v1/agent/batch/completions` endpoint
Need higher rate limits for your application? You can increase your limits by subscribing to a higher tier plan at [swarms.world/pricing](https://swarms.world/pricing). - **Error**: Returns 400 error if exceeded
Higher tier plans offer: - **Message**: "ERROR: BATCH SIZE EXCEEDED - You can only run up to 10 batch agents at a time."
- Increased rate limits ## How Rate Limiting Works
- Higher batch operation limits ### Database-Based Tracking
- Priority processing The system uses a database-based approach for API key requests:
1. **Request Logging**: Every API request is logged to the `swarms_api_logs` table
2. **Time Window Queries**: The system queries for requests in the last minute, hour, and day
3. **Limit Comparison**: Current counts are compared against configured limits
4. **Request Blocking**: Requests are blocked if any limit is exceeded
### Sliding Windows
Rate limits use sliding windows rather than fixed windows:
- **Minute**: Counts requests in the last 60 seconds
- **Hour**: Counts requests in the last 60 minutes
- **Day**: Counts requests in the last 24 hours
This provides more accurate rate limiting compared to fixed time windows.
## Checking Your Rate Limits
### API Endpoint
Use the `/v1/rate/limits` endpoint to check your current usage:
```bash
curl -H "x-api-key: your-api-key" \
https://api.swarms.world/v1/rate/limits
```
### Response Format
```json
{
"success": true,
"rate_limits": {
"minute": {
"count": 5,
"limit": 100,
"exceeded": false,
"remaining": 95,
"reset_time": "2024-01-15T10:30:00Z"
},
"hour": {
"count": 25,
"limit": 50,
"exceeded": false,
"remaining": 25,
"reset_time": "2024-01-15T11:00:00Z"
},
"day": {
"count": 150,
"limit": 1200,
"exceeded": false,
"remaining": 1050,
"reset_time": "2024-01-16T10:00:00Z"
}
},
"limits": {
"maximum_requests_per_minute": 100,
"maximum_requests_per_hour": 50,
"maximum_requests_per_day": 1200,
"tokens_per_agent": 200000
},
"timestamp": "2024-01-15T10:29:30Z"
}
```
## Handling Rate Limit Errors
### Error Response
When rate limits are exceeded, you'll receive a 429 status code:
```json
{
"detail": "Rate limit exceeded for minute window(s). Upgrade to Premium for increased limits (2,000/min, 10,000/hour, 100,000/day) at https://swarms.world/platform/account for just $99/month."
}
```
- Dedicated support ### Best Practices
## Best Practices 1. **Monitor Usage**: Regularly check your rate limits using the `/v1/rate/limits` endpoint
2. **Implement Retry Logic**: Use exponential backoff when hitting rate limits
3. **Optimize Requests**: Combine multiple operations into single requests when possible
4. **Upgrade When Needed**: Consider upgrading to Premium for higher limits
To make the most of your rate limits: ## Premium Tier Benefits
1. Implement proper error handling for rate limit responses Upgrade to Premium for significantly higher limits:
2. Use batch operations when processing multiple requests - **20x more requests per minute** (2,000 vs 100)
3. Add appropriate retry logic with exponential backoff - **200x more requests per hour** (10,000 vs 50)
4. Monitor your API usage to stay within limits - **83x more requests per day** (100,000 vs 1,200)
## Rate Limit Headers - **10x more tokens per agent** (2M vs 200K)
The API does not currently expose rate limit headers. We recommend implementing your own request tracking to stay within the limits. Visit [Swarms Platform Account](https://swarms.world/platform/account) to upgrade for just $99/month.
--- ## Performance Considerations
For questions about rate limits or to request a custom plan for higher limits, please contact our support team or visit [swarms.world/pricing](https://swarms.world/pricing). - Database queries are optimized to only count request IDs
- Rate limit checks are cached per request
- Fallback mechanisms ensure system reliability
- Minimal impact on request latency
- Persistent tracking across server restarts

@ -0,0 +1,354 @@
"""
Examples demonstrating the concurrent wrapper decorator functionality.
This file shows how to use the concurrent and concurrent_class_executor
decorators to enable concurrent execution of functions and class methods.
"""
import time
import asyncio
from typing import Dict, Any
import requests
from swarms.utils.concurrent_wrapper import (
concurrent,
concurrent_class_executor,
thread_executor,
process_executor,
async_executor,
batch_executor,
ExecutorType,
)
# Example 1: Basic concurrent function execution
@concurrent(
name="data_processor",
description="Process data concurrently",
max_workers=4,
timeout=30,
retry_on_failure=True,
max_retries=2,
)
def process_data(data: str) -> str:
"""Simulate data processing with a delay."""
time.sleep(1) # Simulate work
return f"processed_{data}"
# Example 2: Thread-based executor for I/O bound tasks
@thread_executor(max_workers=8, timeout=60)
def fetch_url(url: str) -> Dict[str, Any]:
"""Fetch data from a URL."""
try:
response = requests.get(url, timeout=10)
return {
"url": url,
"status_code": response.status_code,
"content_length": len(response.content),
"success": response.status_code == 200,
}
except Exception as e:
return {"url": url, "error": str(e), "success": False}
# Example 3: Process-based executor for CPU-intensive tasks
@process_executor(max_workers=2, timeout=120)
def cpu_intensive_task(n: int) -> float:
"""Perform CPU-intensive computation."""
result = 0.0
for i in range(n):
result += (i**0.5) * (i**0.3)
return result
# Example 4: Async executor for async functions
@async_executor(max_workers=5)
async def async_task(task_id: int) -> str:
"""Simulate an async task."""
await asyncio.sleep(0.5) # Simulate async work
return f"async_result_{task_id}"
# Example 5: Batch processing
@batch_executor(batch_size=10, max_workers=3)
def process_item(item: str) -> str:
"""Process a single item."""
time.sleep(0.1) # Simulate work
return item.upper()
# Example 6: Class with concurrent methods
@concurrent_class_executor(
name="DataProcessor",
max_workers=4,
methods=["process_batch", "validate_data"],
)
class DataProcessor:
"""A class with concurrent processing capabilities."""
def __init__(self, config: Dict[str, Any]):
self.config = config
def process_batch(self, data: str) -> str:
"""Process a batch of data."""
time.sleep(0.5) # Simulate processing
return f"processed_{data}"
def validate_data(self, data: str) -> bool:
"""Validate data."""
time.sleep(0.2) # Simulate validation
return len(data) > 0
def normal_method(self, x: int) -> int:
"""A normal method (not concurrent)."""
return x * 2
# Example 7: Function with custom configuration
@concurrent(
name="custom_processor",
description="Custom concurrent processor",
max_workers=6,
timeout=45,
executor_type=ExecutorType.THREAD,
return_exceptions=True,
ordered=False,
retry_on_failure=True,
max_retries=3,
retry_delay=0.5,
)
def custom_processor(item: str, multiplier: int = 1) -> str:
"""Custom processor with parameters."""
time.sleep(0.3)
return f"{item}_{multiplier}" * multiplier
def example_1_basic_concurrent_execution():
"""Example 1: Basic concurrent execution."""
print("=== Example 1: Basic Concurrent Execution ===")
# Prepare data
data_items = [f"item_{i}" for i in range(10)]
# Execute concurrently
results = process_data.concurrent_execute(*data_items)
# Process results
successful_results = [r.value for r in results if r.success]
failed_results = [r for r in results if not r.success]
print(f"Successfully processed: {len(successful_results)} items")
print(f"Failed: {len(failed_results)} items")
print(f"Sample results: {successful_results[:3]}")
print()
def example_2_thread_based_execution():
"""Example 2: Thread-based execution for I/O bound tasks."""
print("=== Example 2: Thread-based Execution ===")
# URLs to fetch
urls = [
"https://httpbin.org/get",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
]
# Execute concurrently
results = fetch_url.concurrent_execute(*urls)
# Process results
successful_fetches = [
r.value
for r in results
if r.success and r.value.get("success")
]
failed_fetches = [
r.value
for r in results
if r.success and not r.value.get("success")
]
print(f"Successful fetches: {len(successful_fetches)}")
print(f"Failed fetches: {len(failed_fetches)}")
print(
f"Sample successful result: {successful_fetches[0] if successful_fetches else 'None'}"
)
print()
def example_3_process_based_execution():
"""Example 3: Process-based execution for CPU-intensive tasks."""
print("=== Example 3: Process-based Execution ===")
# CPU-intensive tasks
tasks = [100000, 200000, 300000, 400000]
# Execute concurrently
results = cpu_intensive_task.concurrent_execute(*tasks)
# Process results
successful_results = [r.value for r in results if r.success]
execution_times = [r.execution_time for r in results if r.success]
print(f"Completed {len(successful_results)} CPU-intensive tasks")
print(
f"Average execution time: {sum(execution_times) / len(execution_times):.3f}s"
)
print(
f"Sample result: {successful_results[0] if successful_results else 'None'}"
)
print()
def example_4_batch_processing():
"""Example 4: Batch processing."""
print("=== Example 4: Batch Processing ===")
# Items to process
items = [f"item_{i}" for i in range(25)]
# Process in batches
results = process_item.concurrent_batch(items, batch_size=5)
# Process results
successful_results = [r.value for r in results if r.success]
print(f"Processed {len(successful_results)} items in batches")
print(f"Sample results: {successful_results[:5]}")
print()
def example_5_class_concurrent_execution():
"""Example 5: Class with concurrent methods."""
print("=== Example 5: Class Concurrent Execution ===")
# Create processor instance
processor = DataProcessor({"batch_size": 10})
# Prepare data
data_items = [f"data_{i}" for i in range(8)]
# Execute concurrent methods
process_results = processor.process_batch.concurrent_execute(
*data_items
)
validate_results = processor.validate_data.concurrent_execute(
*data_items
)
# Process results
processed_items = [r.value for r in process_results if r.success]
valid_items = [r.value for r in validate_results if r.success]
print(f"Processed {len(processed_items)} items")
print(f"Validated {len(valid_items)} items")
print(f"Sample processed: {processed_items[:3]}")
print(f"Sample validation: {valid_items[:3]}")
print()
def example_6_custom_configuration():
"""Example 6: Custom configuration with exceptions and retries."""
print("=== Example 6: Custom Configuration ===")
# Items with different multipliers
items = [f"item_{i}" for i in range(6)]
multipliers = [1, 2, 3, 1, 2, 3]
# Execute with custom configuration
results = custom_processor.concurrent_execute(
*items, **{"multiplier": multipliers}
)
# Process results
successful_results = [r.value for r in results if r.success]
failed_results = [r for r in results if not r.success]
print(f"Successful: {len(successful_results)}")
print(f"Failed: {len(failed_results)}")
print(f"Sample results: {successful_results[:3]}")
print()
def example_7_concurrent_mapping():
"""Example 7: Concurrent mapping over a list."""
print("=== Example 7: Concurrent Mapping ===")
# Items to map over
items = [f"map_item_{i}" for i in range(15)]
# Map function over items
results = process_data.concurrent_map(items)
# Process results
mapped_results = [r.value for r in results if r.success]
print(f"Mapped over {len(mapped_results)} items")
print(f"Sample mapped results: {mapped_results[:5]}")
print()
def example_8_error_handling():
"""Example 8: Error handling and retries."""
print("=== Example 8: Error Handling ===")
@concurrent(
max_workers=3,
return_exceptions=True,
retry_on_failure=True,
max_retries=2,
)
def unreliable_function(x: int) -> int:
"""A function that sometimes fails."""
if x % 3 == 0:
raise ValueError(f"Failed for {x}")
time.sleep(0.1)
return x * 2
# Execute with potential failures
results = unreliable_function.concurrent_execute(*range(10))
# Process results
successful_results = [r.value for r in results if r.success]
failed_results = [r.exception for r in results if not r.success]
print(f"Successful: {len(successful_results)}")
print(f"Failed: {len(failed_results)}")
print(f"Sample successful: {successful_results[:3]}")
print(
f"Sample failures: {[type(e).__name__ for e in failed_results[:3]]}"
)
print()
def main():
"""Run all examples."""
print("Concurrent Wrapper Examples")
print("=" * 50)
print()
try:
example_1_basic_concurrent_execution()
example_2_thread_based_execution()
example_3_process_based_execution()
example_4_batch_processing()
example_5_class_concurrent_execution()
example_6_custom_configuration()
example_7_concurrent_mapping()
example_8_error_handling()
print("All examples completed successfully!")
except Exception as e:
print(f"Error running examples: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "7.9.9" version = "8.0.0"
description = "Swarms - TGSC" description = "Swarms - TGSC"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]

@ -3,15 +3,14 @@ from swarms import Agent
agent = Agent( agent = Agent(
name="Research Agent", name="Research Agent",
description="A research agent that can answer questions", description="A research agent that can answer questions",
model_name="groq/moonshotai/kimi-k2-instruct", model_name="claude-3-5-sonnet-20241022",
verbose=True,
streaming_on=True, streaming_on=True,
max_loops=2, max_loops=1,
interactive=True, interactive=True,
) )
out = agent.run( out = agent.run(
"What are the best AI wechat groups in hangzhou and beijing? give me the links" "What are the best arbitrage trading strategies for altcoins? Give me research papers and articles on the topic."
) )
print(out) print(out)

@ -0,0 +1,629 @@
import asyncio
import concurrent.futures
import inspect
import time
from concurrent.futures import (
ThreadPoolExecutor,
ProcessPoolExecutor,
as_completed,
)
from functools import wraps
from typing import (
Any,
Callable,
List,
Optional,
TypeVar,
Generic,
)
from dataclasses import dataclass
from enum import Enum
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger("concurrent_wrapper")
T = TypeVar("T")
R = TypeVar("R")
# Global function for process pool execution (must be picklable)
def _execute_task_in_process(task_data):
"""
Execute a task in a separate process.
This function must be at module level to be picklable.
"""
(
func,
task_args,
task_kwargs,
task_id,
max_retries,
retry_on_failure,
retry_delay,
return_exceptions,
) = task_data
start_time = time.time()
for attempt in range(max_retries + 1):
try:
result = func(*task_args, **task_kwargs)
execution_time = time.time() - start_time
return ConcurrentResult(
value=result,
execution_time=execution_time,
worker_id=task_id,
)
except Exception as e:
if attempt == max_retries or not retry_on_failure:
execution_time = time.time() - start_time
if return_exceptions:
return ConcurrentResult(
exception=e,
execution_time=execution_time,
worker_id=task_id,
)
else:
raise
else:
time.sleep(retry_delay * (2**attempt))
# This should never be reached, but just in case
return ConcurrentResult(
exception=Exception("Max retries exceeded")
)
class ExecutorType(Enum):
"""Enum for different types of executors."""
THREAD = "thread"
PROCESS = "process"
ASYNC = "async"
@dataclass
class ConcurrentConfig:
"""Configuration for concurrent execution."""
name: Optional[str] = None
description: Optional[str] = None
max_workers: int = 4
timeout: Optional[float] = None
executor_type: ExecutorType = ExecutorType.THREAD
return_exceptions: bool = False
chunk_size: Optional[int] = None
ordered: bool = True
retry_on_failure: bool = False
max_retries: int = 3
retry_delay: float = 1.0
class ConcurrentResult(Generic[T]):
"""Result wrapper for concurrent execution."""
def __init__(
self,
value: T = None,
exception: Exception = None,
execution_time: float = 0.0,
worker_id: Optional[int] = None,
):
self.value = value
self.exception = exception
self.execution_time = execution_time
self.worker_id = worker_id
self.success = exception is None
def __repr__(self):
if self.success:
return f"ConcurrentResult(value={self.value}, time={self.execution_time:.3f}s)"
else:
return f"ConcurrentResult(exception={type(self.exception).__name__}: {self.exception})"
def concurrent(
name: Optional[str] = None,
description: Optional[str] = None,
max_workers: int = 4,
timeout: Optional[float] = None,
executor_type: ExecutorType = ExecutorType.THREAD,
return_exceptions: bool = False,
chunk_size: Optional[int] = None,
ordered: bool = True,
retry_on_failure: bool = False,
max_retries: int = 3,
retry_delay: float = 1.0,
):
"""
A decorator that enables concurrent execution of functions.
Args:
name (Optional[str]): Name for the concurrent operation
description (Optional[str]): Description of the operation
max_workers (int): Maximum number of worker threads/processes
timeout (Optional[float]): Timeout in seconds for each task
executor_type (ExecutorType): Type of executor (thread, process, async)
return_exceptions (bool): Whether to return exceptions instead of raising
chunk_size (Optional[int]): Size of chunks for batch processing
ordered (bool): Whether to maintain order of results
retry_on_failure (bool): Whether to retry failed tasks
max_retries (int): Maximum number of retries per task
retry_delay (float): Delay between retries in seconds
Returns:
Callable: Decorated function that can execute concurrently
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
config = ConcurrentConfig(
name=name or func.__name__,
description=description
or f"Concurrent execution of {func.__name__}",
max_workers=max_workers,
timeout=timeout,
executor_type=executor_type,
return_exceptions=return_exceptions,
chunk_size=chunk_size,
ordered=ordered,
retry_on_failure=retry_on_failure,
max_retries=max_retries,
retry_delay=retry_delay,
)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
def _execute_single_task(
task_args, task_kwargs, task_id=None
):
"""Execute a single task with retry logic."""
start_time = time.time()
for attempt in range(config.max_retries + 1):
try:
result = func(*task_args, **task_kwargs)
execution_time = time.time() - start_time
return ConcurrentResult(
value=result,
execution_time=execution_time,
worker_id=task_id,
)
except Exception as e:
if (
attempt == config.max_retries
or not config.retry_on_failure
):
execution_time = time.time() - start_time
if config.return_exceptions:
return ConcurrentResult(
exception=e,
execution_time=execution_time,
worker_id=task_id,
)
else:
raise
else:
logger.warning(
f"Task {task_id} failed (attempt {attempt + 1}/{config.max_retries + 1}): {e}"
)
time.sleep(config.retry_delay * (2**attempt))
def concurrent_execute(*args_list, **kwargs_list):
"""Execute the function concurrently with multiple argument sets."""
if not args_list and not kwargs_list:
raise ValueError(
"At least one set of arguments must be provided"
)
# Prepare tasks
tasks = []
if args_list:
for args in args_list:
if isinstance(args, (list, tuple)):
tasks.append((args, {}))
else:
tasks.append(([args], {}))
if kwargs_list:
for kwargs in kwargs_list:
if isinstance(kwargs, dict):
tasks.append(((), kwargs))
else:
raise ValueError(
"kwargs_list must contain dictionaries"
)
logger.info(
f"Starting concurrent execution of {len(tasks)} tasks with {config.max_workers} workers"
)
start_time = time.time()
try:
if config.executor_type == ExecutorType.THREAD:
results = _execute_with_thread_pool(tasks)
elif config.executor_type == ExecutorType.PROCESS:
results = _execute_with_process_pool(tasks)
elif config.executor_type == ExecutorType.ASYNC:
results = _execute_with_async(tasks)
else:
raise ValueError(
f"Unsupported executor type: {config.executor_type}"
)
total_time = time.time() - start_time
successful_tasks = sum(
1 for r in results if r.success
)
logger.info(
f"Completed {len(tasks)} tasks in {total_time:.3f}s "
f"({successful_tasks}/{len(tasks)} successful)"
)
return results
except Exception as e:
logger.error(f"Concurrent execution failed: {e}")
raise
def _execute_with_thread_pool(tasks):
"""Execute tasks using ThreadPoolExecutor."""
results = []
with ThreadPoolExecutor(
max_workers=config.max_workers
) as executor:
if config.ordered:
future_to_task = {
executor.submit(
_execute_single_task, task[0], task[1], i
): i
for i, task in enumerate(tasks)
}
for future in as_completed(
future_to_task, timeout=config.timeout
):
try:
result = future.result(
timeout=config.timeout
)
results.append(result)
except Exception as e:
if config.return_exceptions:
results.append(
ConcurrentResult(exception=e)
)
else:
raise
else:
futures = [
executor.submit(
_execute_single_task, task[0], task[1], i
)
for i, task in enumerate(tasks)
]
for future in as_completed(
futures, timeout=config.timeout
):
try:
result = future.result(
timeout=config.timeout
)
results.append(result)
except Exception as e:
if config.return_exceptions:
results.append(
ConcurrentResult(exception=e)
)
else:
raise
return results
def _execute_with_process_pool(tasks):
"""Execute tasks using ProcessPoolExecutor."""
results = []
# Prepare task data for process execution
task_data_list = []
for i, task in enumerate(tasks):
task_data = (
func, # The function to execute
task[0], # args
task[1], # kwargs
i, # task_id
config.max_retries,
config.retry_on_failure,
config.retry_delay,
config.return_exceptions,
)
task_data_list.append(task_data)
with ProcessPoolExecutor(
max_workers=config.max_workers
) as executor:
if config.ordered:
future_to_task = {
executor.submit(
_execute_task_in_process, task_data
): i
for i, task_data in enumerate(task_data_list)
}
for future in as_completed(
future_to_task, timeout=config.timeout
):
try:
result = future.result(
timeout=config.timeout
)
results.append(result)
except Exception as e:
if config.return_exceptions:
results.append(
ConcurrentResult(exception=e)
)
else:
raise
else:
futures = [
executor.submit(
_execute_task_in_process, task_data
)
for task_data in task_data_list
]
for future in as_completed(
futures, timeout=config.timeout
):
try:
result = future.result(
timeout=config.timeout
)
results.append(result)
except Exception as e:
if config.return_exceptions:
results.append(
ConcurrentResult(exception=e)
)
else:
raise
return results
async def _execute_with_async(tasks):
"""Execute tasks using asyncio."""
async def _async_task(
task_args, task_kwargs, task_id=None
):
start_time = time.time()
for attempt in range(config.max_retries + 1):
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: func(*task_args, **task_kwargs),
)
execution_time = time.time() - start_time
return ConcurrentResult(
value=result,
execution_time=execution_time,
worker_id=task_id,
)
except Exception as e:
if (
attempt == config.max_retries
or not config.retry_on_failure
):
execution_time = time.time() - start_time
if config.return_exceptions:
return ConcurrentResult(
exception=e,
execution_time=execution_time,
worker_id=task_id,
)
else:
raise
else:
logger.warning(
f"Async task {task_id} failed (attempt {attempt + 1}/{config.max_retries + 1}): {e}"
)
await asyncio.sleep(
config.retry_delay * (2**attempt)
)
semaphore = asyncio.Semaphore(config.max_workers)
async def _limited_task(task_args, task_kwargs, task_id):
async with semaphore:
return await _async_task(
task_args, task_kwargs, task_id
)
tasks_coros = [
_limited_task(task[0], task[1], i)
for i, task in enumerate(tasks)
]
if config.ordered:
results = []
for coro in asyncio.as_completed(tasks_coros):
try:
result = await coro
results.append(result)
except Exception as e:
if config.return_exceptions:
results.append(
ConcurrentResult(exception=e)
)
else:
raise
return results
else:
return await asyncio.gather(
*tasks_coros,
return_exceptions=config.return_exceptions,
)
def concurrent_batch(
items: List[Any],
batch_size: Optional[int] = None,
**kwargs,
) -> List[ConcurrentResult]:
"""Execute the function concurrently on a batch of items."""
batch_size = batch_size or config.chunk_size or len(items)
tasks = []
for item in items:
if isinstance(item, (list, tuple)):
tasks.append((item, kwargs))
else:
tasks.append(([item], kwargs))
return concurrent_execute(
*[task[0] for task in tasks],
**[task[1] for task in tasks],
)
def concurrent_map(
items: List[Any], **kwargs
) -> List[ConcurrentResult]:
"""Map the function over a list of items concurrently."""
return concurrent_batch(items, **kwargs)
# Attach methods to the wrapper
wrapper.concurrent_execute = concurrent_execute
wrapper.concurrent_batch = concurrent_batch
wrapper.concurrent_map = concurrent_map
wrapper.config = config
# Add metadata
wrapper.__concurrent_config__ = config
wrapper.__concurrent_enabled__ = True
return wrapper
return decorator
def concurrent_class_executor(
name: Optional[str] = None,
description: Optional[str] = None,
max_workers: int = 4,
timeout: Optional[float] = None,
executor_type: ExecutorType = ExecutorType.THREAD,
return_exceptions: bool = False,
chunk_size: Optional[int] = None,
ordered: bool = True,
retry_on_failure: bool = False,
max_retries: int = 3,
retry_delay: float = 1.0,
methods: Optional[List[str]] = None,
):
"""
A decorator that enables concurrent execution for class methods.
Args:
name (Optional[str]): Name for the concurrent operation
description (Optional[str]): Description of the operation
max_workers (int): Maximum number of worker threads/processes
timeout (Optional[float]): Timeout in seconds for each task
executor_type (ExecutorType): Type of executor (thread, process, async)
return_exceptions (bool): Whether to return exceptions instead of raising
chunk_size (Optional[int]): Size of chunks for batch processing
ordered (bool): Whether to maintain order of results
retry_on_failure (bool): Whether to retry failed tasks
max_retries (int): Maximum number of retries per task
retry_delay (float): Delay between retries in seconds
methods (Optional[List[str]]): List of method names to make concurrent
Returns:
Class: Class with concurrent execution capabilities
"""
def decorator(cls):
config = ConcurrentConfig(
name=name or f"{cls.__name__}_concurrent",
description=description
or f"Concurrent execution for {cls.__name__}",
max_workers=max_workers,
timeout=timeout,
executor_type=executor_type,
return_exceptions=return_exceptions,
chunk_size=chunk_size,
ordered=ordered,
retry_on_failure=retry_on_failure,
max_retries=max_retries,
retry_delay=retry_delay,
)
# Get methods to make concurrent
target_methods = methods or [
name
for name, method in inspect.getmembers(
cls, inspect.isfunction
)
if not name.startswith("_")
]
for method_name in target_methods:
if hasattr(cls, method_name):
original_method = getattr(cls, method_name)
# Create concurrent version of the method
concurrent_decorator = concurrent(
name=f"{cls.__name__}.{method_name}",
description=f"Concurrent execution of {cls.__name__}.{method_name}",
max_workers=config.max_workers,
timeout=config.timeout,
executor_type=config.executor_type,
return_exceptions=config.return_exceptions,
chunk_size=config.chunk_size,
ordered=config.ordered,
retry_on_failure=config.retry_on_failure,
max_retries=config.max_retries,
retry_delay=config.retry_delay,
)
# Apply the concurrent decorator to the method
setattr(
cls,
method_name,
concurrent_decorator(original_method),
)
# Add class-level concurrent configuration
cls.__concurrent_config__ = config
cls.__concurrent_enabled__ = True
return cls
return decorator
# Convenience functions for common use cases
def thread_executor(**kwargs):
"""Convenience decorator for thread-based concurrent execution."""
return concurrent(executor_type=ExecutorType.THREAD, **kwargs)
def process_executor(**kwargs):
"""Convenience decorator for process-based concurrent execution."""
return concurrent(executor_type=ExecutorType.PROCESS, **kwargs)
def async_executor(**kwargs):
"""Convenience decorator for async-based concurrent execution."""
return concurrent(executor_type=ExecutorType.ASYNC, **kwargs)
def batch_executor(batch_size: int = 10, **kwargs):
"""Convenience decorator for batch processing."""
return concurrent(chunk_size=batch_size, **kwargs)
Loading…
Cancel
Save