diff --git a/concurrent_example_dashboard.py b/concurrent_example_dashboard.py index 17177b6e..9bdc12aa 100644 --- a/concurrent_example_dashboard.py +++ b/concurrent_example_dashboard.py @@ -1,3 +1,4 @@ +# Get your packages from swarms import Agent, ConcurrentWorkflow # Initialize market research agent @@ -11,7 +12,7 @@ market_researcher = Agent( 5. Providing actionable market insights""", model_name="claude-3-sonnet-20240229", max_loops=1, - temperature=0.7, + dynamic_temperature_enabled=True, # streaming_on=True, ) @@ -24,10 +25,9 @@ financial_analyst = Agent( 3. Assessing risk factors 4. Providing financial forecasts 5. Recommending financial strategies""", - model_name="claude-3-sonnet-20240229", + model_name="gpt-4.1", max_loops=1, - # streaming_on=True, - temperature=0.7, + dynamic_temperature_enabled=True, ) # Initialize technical analyst agent @@ -51,14 +51,15 @@ agents = [market_researcher, financial_analyst, technical_analyst] router = ConcurrentWorkflow( name="market-analysis-router", + description="This concurrent workflow is used to analyze the market, financial, and technical aspects of a stock.", agents=agents, max_loops=1, - # output_type="all", + output_type="all", show_dashboard=True, ) result = router.run( - "Analyze Tesla (TSLA) stock from market, financial, and technical perspectives" + task="What are the best 3 ETFS for energy sector in the US?" ) print(result) diff --git a/docs/swarms_cloud/rate_limits.md b/docs/swarms_cloud/rate_limits.md index d0fb9759..69f68688 100644 --- a/docs/swarms_cloud/rate_limits.md +++ b/docs/swarms_cloud/rate_limits.md @@ -1,59 +1,196 @@ -# Swarms API Rate Limits +# Swarms API Rate Limits -The Swarms API implements rate limiting to ensure fair usage and system stability. Here are the current limits: +The Swarms API implements a comprehensive rate limiting system that tracks API requests across multiple time windows and enforces various limits to ensure fair usage and system stability. -## Standard Rate Limits +## Rate Limits Summary -- **General API Requests**: 100 requests per minute -- **Batch Operations**: Maximum 10 requests per batch for agent/swarm batch operations +| Rate Limit Type | Free Tier | Premium Tier | Time Window | Description | +|----------------|-----------|--------------|-------------|-------------| +| **Requests per Minute** | 100 | 2,000 | 1 minute | Maximum API calls per minute | +| **Requests per Hour** | 50 | 10,000 | 1 hour | Maximum API calls per hour | +| **Requests per Day** | 1,200 | 100,000 | 24 hours | Maximum API calls per day | +| **Tokens per Agent** | 200,000 | 2,000,000 | Per request | Maximum tokens per agent | +| **Prompt Length** | 200,000 | 200,000 | Per request | Maximum input tokens per request | +| **Batch Size** | 10 | 10 | Per request | Maximum agents in batch requests | +| **IP-based Fallback** | 100 | 100 | 60 seconds | For requests without API keys | -## Rate Limit Response +## Detailed Rate Limit Explanations -When you exceed the rate limit, the API will return a 429 (Too Many Requests) status code with the following message: -```json -{ - "detail": "Rate limit exceeded. Please try again later." -} -``` +### 1. **Request Rate Limits** + +These limits control how many API calls you can make within specific time windows. + +#### **Per-Minute Limit** + +| Tier | Requests per Minute | Reset Interval | Applies To | +|--------------|--------------------|------------------------|--------------------| +| Free | 100 | Every minute (sliding) | All API endpoints | +| Premium | 2,000 | Every minute (sliding) | All API endpoints | + +#### **Per-Hour Limit** + +- **Free Tier**: 50 requests per hour +- **Premium Tier**: 10,000 requests per hour +- **Reset**: Every hour (sliding window) +- **Applies to**: All API endpoints + +#### **Per-Day Limit** + +- **Free Tier**: 1,200 requests per day (50 × 24) + +- **Premium Tier**: 100,000 requests per day + +- **Reset**: Every 24 hours (sliding window) + +- **Applies to**: All API endpoints + +### 2. **Token Limits** + +These limits control the amount of text processing allowed per request. + +#### **Tokens per Agent** + +- **Free Tier**: 200,000 tokens per agent + +- **Premium Tier**: 2,000,000 tokens per agent + +- **Applies to**: Individual agent configurations + +- **Includes**: System prompts, task descriptions, and agent names + +#### **Prompt Length Limit** + +- **All Tiers**: 200,000 tokens maximum + +- **Applies to**: Combined input text (task + history + system prompts) + +- **Error**: Returns 400 error if exceeded + +- **Message**: "Prompt is too long. Please provide a prompt that is less than 10000 tokens." -## Batch Operation Limits +### 3. **Batch Processing Limits** -For batch operations (`/v1/agent/batch/completions` and `/v1/swarm/batch/completions`): +These limits control concurrent processing capabilities. -- Maximum 10 concurrent requests per batch +#### **Batch Size Limit** -- Exceeding this limit will result in a 400 (Bad Request) error +- **All Tiers**: 10 agents maximum per batch -## Increasing Your Rate Limits +- **Applies to**: `/v1/agent/batch/completions` endpoint -Need higher rate limits for your application? You can increase your limits by subscribing to a higher tier plan at [swarms.world/pricing](https://swarms.world/pricing). +- **Error**: Returns 400 error if exceeded -Higher tier plans offer: +- **Message**: "ERROR: BATCH SIZE EXCEEDED - You can only run up to 10 batch agents at a time." -- Increased rate limits +## How Rate Limiting Works -- Higher batch operation limits +### Database-Based Tracking -- Priority processing +The system uses a database-based approach for API key requests: + +1. **Request Logging**: Every API request is logged to the `swarms_api_logs` table +2. **Time Window Queries**: The system queries for requests in the last minute, hour, and day +3. **Limit Comparison**: Current counts are compared against configured limits +4. **Request Blocking**: Requests are blocked if any limit is exceeded + +### Sliding Windows + +Rate limits use sliding windows rather than fixed windows: + +- **Minute**: Counts requests in the last 60 seconds + +- **Hour**: Counts requests in the last 60 minutes + +- **Day**: Counts requests in the last 24 hours + +This provides more accurate rate limiting compared to fixed time windows. + +## Checking Your Rate Limits + +### API Endpoint + +Use the `/v1/rate/limits` endpoint to check your current usage: + +```bash +curl -H "x-api-key: your-api-key" \ + https://api.swarms.world/v1/rate/limits +``` + +### Response Format + +```json +{ + "success": true, + "rate_limits": { + "minute": { + "count": 5, + "limit": 100, + "exceeded": false, + "remaining": 95, + "reset_time": "2024-01-15T10:30:00Z" + }, + "hour": { + "count": 25, + "limit": 50, + "exceeded": false, + "remaining": 25, + "reset_time": "2024-01-15T11:00:00Z" + }, + "day": { + "count": 150, + "limit": 1200, + "exceeded": false, + "remaining": 1050, + "reset_time": "2024-01-16T10:00:00Z" + } + }, + "limits": { + "maximum_requests_per_minute": 100, + "maximum_requests_per_hour": 50, + "maximum_requests_per_day": 1200, + "tokens_per_agent": 200000 + }, + "timestamp": "2024-01-15T10:29:30Z" +} +``` + +## Handling Rate Limit Errors + +### Error Response + +When rate limits are exceeded, you'll receive a 429 status code: + +```json +{ + "detail": "Rate limit exceeded for minute window(s). Upgrade to Premium for increased limits (2,000/min, 10,000/hour, 100,000/day) at https://swarms.world/platform/account for just $99/month." +} +``` -- Dedicated support +### Best Practices -## Best Practices +1. **Monitor Usage**: Regularly check your rate limits using the `/v1/rate/limits` endpoint +2. **Implement Retry Logic**: Use exponential backoff when hitting rate limits +3. **Optimize Requests**: Combine multiple operations into single requests when possible +4. **Upgrade When Needed**: Consider upgrading to Premium for higher limits -To make the most of your rate limits: +## Premium Tier Benefits -1. Implement proper error handling for rate limit responses +Upgrade to Premium for significantly higher limits: -2. Use batch operations when processing multiple requests +- **20x more requests per minute** (2,000 vs 100) -3. Add appropriate retry logic with exponential backoff +- **200x more requests per hour** (10,000 vs 50) -4. Monitor your API usage to stay within limits +- **83x more requests per day** (100,000 vs 1,200) -## Rate Limit Headers +- **10x more tokens per agent** (2M vs 200K) -The API does not currently expose rate limit headers. We recommend implementing your own request tracking to stay within the limits. +Visit [Swarms Platform Account](https://swarms.world/platform/account) to upgrade for just $99/month. ---- +## Performance Considerations -For questions about rate limits or to request a custom plan for higher limits, please contact our support team or visit [swarms.world/pricing](https://swarms.world/pricing). \ No newline at end of file +- Database queries are optimized to only count request IDs +- Rate limit checks are cached per request +- Fallback mechanisms ensure system reliability +- Minimal impact on request latency +- Persistent tracking across server restarts \ No newline at end of file diff --git a/examples/misc/concurrent_wrapper_examples.py b/examples/misc/concurrent_wrapper_examples.py new file mode 100644 index 00000000..45d734bc --- /dev/null +++ b/examples/misc/concurrent_wrapper_examples.py @@ -0,0 +1,354 @@ +""" +Examples demonstrating the concurrent wrapper decorator functionality. + +This file shows how to use the concurrent and concurrent_class_executor +decorators to enable concurrent execution of functions and class methods. +""" + +import time +import asyncio +from typing import Dict, Any +import requests + +from swarms.utils.concurrent_wrapper import ( + concurrent, + concurrent_class_executor, + thread_executor, + process_executor, + async_executor, + batch_executor, + ExecutorType, +) + + +# Example 1: Basic concurrent function execution +@concurrent( + name="data_processor", + description="Process data concurrently", + max_workers=4, + timeout=30, + retry_on_failure=True, + max_retries=2, +) +def process_data(data: str) -> str: + """Simulate data processing with a delay.""" + time.sleep(1) # Simulate work + return f"processed_{data}" + + +# Example 2: Thread-based executor for I/O bound tasks +@thread_executor(max_workers=8, timeout=60) +def fetch_url(url: str) -> Dict[str, Any]: + """Fetch data from a URL.""" + try: + response = requests.get(url, timeout=10) + return { + "url": url, + "status_code": response.status_code, + "content_length": len(response.content), + "success": response.status_code == 200, + } + except Exception as e: + return {"url": url, "error": str(e), "success": False} + + +# Example 3: Process-based executor for CPU-intensive tasks +@process_executor(max_workers=2, timeout=120) +def cpu_intensive_task(n: int) -> float: + """Perform CPU-intensive computation.""" + result = 0.0 + for i in range(n): + result += (i**0.5) * (i**0.3) + return result + + +# Example 4: Async executor for async functions +@async_executor(max_workers=5) +async def async_task(task_id: int) -> str: + """Simulate an async task.""" + await asyncio.sleep(0.5) # Simulate async work + return f"async_result_{task_id}" + + +# Example 5: Batch processing +@batch_executor(batch_size=10, max_workers=3) +def process_item(item: str) -> str: + """Process a single item.""" + time.sleep(0.1) # Simulate work + return item.upper() + + +# Example 6: Class with concurrent methods +@concurrent_class_executor( + name="DataProcessor", + max_workers=4, + methods=["process_batch", "validate_data"], +) +class DataProcessor: + """A class with concurrent processing capabilities.""" + + def __init__(self, config: Dict[str, Any]): + self.config = config + + def process_batch(self, data: str) -> str: + """Process a batch of data.""" + time.sleep(0.5) # Simulate processing + return f"processed_{data}" + + def validate_data(self, data: str) -> bool: + """Validate data.""" + time.sleep(0.2) # Simulate validation + return len(data) > 0 + + def normal_method(self, x: int) -> int: + """A normal method (not concurrent).""" + return x * 2 + + +# Example 7: Function with custom configuration +@concurrent( + name="custom_processor", + description="Custom concurrent processor", + max_workers=6, + timeout=45, + executor_type=ExecutorType.THREAD, + return_exceptions=True, + ordered=False, + retry_on_failure=True, + max_retries=3, + retry_delay=0.5, +) +def custom_processor(item: str, multiplier: int = 1) -> str: + """Custom processor with parameters.""" + time.sleep(0.3) + return f"{item}_{multiplier}" * multiplier + + +def example_1_basic_concurrent_execution(): + """Example 1: Basic concurrent execution.""" + print("=== Example 1: Basic Concurrent Execution ===") + + # Prepare data + data_items = [f"item_{i}" for i in range(10)] + + # Execute concurrently + results = process_data.concurrent_execute(*data_items) + + # Process results + successful_results = [r.value for r in results if r.success] + failed_results = [r for r in results if not r.success] + + print(f"Successfully processed: {len(successful_results)} items") + print(f"Failed: {len(failed_results)} items") + print(f"Sample results: {successful_results[:3]}") + print() + + +def example_2_thread_based_execution(): + """Example 2: Thread-based execution for I/O bound tasks.""" + print("=== Example 2: Thread-based Execution ===") + + # URLs to fetch + urls = [ + "https://httpbin.org/get", + "https://httpbin.org/status/200", + "https://httpbin.org/status/404", + "https://httpbin.org/delay/1", + "https://httpbin.org/delay/2", + ] + + # Execute concurrently + results = fetch_url.concurrent_execute(*urls) + + # Process results + successful_fetches = [ + r.value + for r in results + if r.success and r.value.get("success") + ] + failed_fetches = [ + r.value + for r in results + if r.success and not r.value.get("success") + ] + + print(f"Successful fetches: {len(successful_fetches)}") + print(f"Failed fetches: {len(failed_fetches)}") + print( + f"Sample successful result: {successful_fetches[0] if successful_fetches else 'None'}" + ) + print() + + +def example_3_process_based_execution(): + """Example 3: Process-based execution for CPU-intensive tasks.""" + print("=== Example 3: Process-based Execution ===") + + # CPU-intensive tasks + tasks = [100000, 200000, 300000, 400000] + + # Execute concurrently + results = cpu_intensive_task.concurrent_execute(*tasks) + + # Process results + successful_results = [r.value for r in results if r.success] + execution_times = [r.execution_time for r in results if r.success] + + print(f"Completed {len(successful_results)} CPU-intensive tasks") + print( + f"Average execution time: {sum(execution_times) / len(execution_times):.3f}s" + ) + print( + f"Sample result: {successful_results[0] if successful_results else 'None'}" + ) + print() + + +def example_4_batch_processing(): + """Example 4: Batch processing.""" + print("=== Example 4: Batch Processing ===") + + # Items to process + items = [f"item_{i}" for i in range(25)] + + # Process in batches + results = process_item.concurrent_batch(items, batch_size=5) + + # Process results + successful_results = [r.value for r in results if r.success] + + print(f"Processed {len(successful_results)} items in batches") + print(f"Sample results: {successful_results[:5]}") + print() + + +def example_5_class_concurrent_execution(): + """Example 5: Class with concurrent methods.""" + print("=== Example 5: Class Concurrent Execution ===") + + # Create processor instance + processor = DataProcessor({"batch_size": 10}) + + # Prepare data + data_items = [f"data_{i}" for i in range(8)] + + # Execute concurrent methods + process_results = processor.process_batch.concurrent_execute( + *data_items + ) + validate_results = processor.validate_data.concurrent_execute( + *data_items + ) + + # Process results + processed_items = [r.value for r in process_results if r.success] + valid_items = [r.value for r in validate_results if r.success] + + print(f"Processed {len(processed_items)} items") + print(f"Validated {len(valid_items)} items") + print(f"Sample processed: {processed_items[:3]}") + print(f"Sample validation: {valid_items[:3]}") + print() + + +def example_6_custom_configuration(): + """Example 6: Custom configuration with exceptions and retries.""" + print("=== Example 6: Custom Configuration ===") + + # Items with different multipliers + items = [f"item_{i}" for i in range(6)] + multipliers = [1, 2, 3, 1, 2, 3] + + # Execute with custom configuration + results = custom_processor.concurrent_execute( + *items, **{"multiplier": multipliers} + ) + + # Process results + successful_results = [r.value for r in results if r.success] + failed_results = [r for r in results if not r.success] + + print(f"Successful: {len(successful_results)}") + print(f"Failed: {len(failed_results)}") + print(f"Sample results: {successful_results[:3]}") + print() + + +def example_7_concurrent_mapping(): + """Example 7: Concurrent mapping over a list.""" + print("=== Example 7: Concurrent Mapping ===") + + # Items to map over + items = [f"map_item_{i}" for i in range(15)] + + # Map function over items + results = process_data.concurrent_map(items) + + # Process results + mapped_results = [r.value for r in results if r.success] + + print(f"Mapped over {len(mapped_results)} items") + print(f"Sample mapped results: {mapped_results[:5]}") + print() + + +def example_8_error_handling(): + """Example 8: Error handling and retries.""" + print("=== Example 8: Error Handling ===") + + @concurrent( + max_workers=3, + return_exceptions=True, + retry_on_failure=True, + max_retries=2, + ) + def unreliable_function(x: int) -> int: + """A function that sometimes fails.""" + if x % 3 == 0: + raise ValueError(f"Failed for {x}") + time.sleep(0.1) + return x * 2 + + # Execute with potential failures + results = unreliable_function.concurrent_execute(*range(10)) + + # Process results + successful_results = [r.value for r in results if r.success] + failed_results = [r.exception for r in results if not r.success] + + print(f"Successful: {len(successful_results)}") + print(f"Failed: {len(failed_results)}") + print(f"Sample successful: {successful_results[:3]}") + print( + f"Sample failures: {[type(e).__name__ for e in failed_results[:3]]}" + ) + print() + + +def main(): + """Run all examples.""" + print("Concurrent Wrapper Examples") + print("=" * 50) + print() + + try: + example_1_basic_concurrent_execution() + example_2_thread_based_execution() + example_3_process_based_execution() + example_4_batch_processing() + example_5_class_concurrent_execution() + example_6_custom_configuration() + example_7_concurrent_mapping() + example_8_error_handling() + + print("All examples completed successfully!") + + except Exception as e: + print(f"Error running examples: {e}") + import traceback + + traceback.print_exc() + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 1ad3588f..30d58aa0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "7.9.9" +version = "8.0.0" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/simple_agent.py b/simple_agent.py index 6964dac2..38bfc59f 100644 --- a/simple_agent.py +++ b/simple_agent.py @@ -3,15 +3,14 @@ from swarms import Agent agent = Agent( name="Research Agent", description="A research agent that can answer questions", - model_name="groq/moonshotai/kimi-k2-instruct", - verbose=True, + model_name="claude-3-5-sonnet-20241022", streaming_on=True, - max_loops=2, + max_loops=1, interactive=True, ) out = agent.run( - "What are the best AI wechat groups in hangzhou and beijing? give me the links" + "What are the best arbitrage trading strategies for altcoins? Give me research papers and articles on the topic." ) print(out) diff --git a/swarms/utils/concurrent_wrapper.py b/swarms/utils/concurrent_wrapper.py new file mode 100644 index 00000000..41615d02 --- /dev/null +++ b/swarms/utils/concurrent_wrapper.py @@ -0,0 +1,629 @@ +import asyncio +import concurrent.futures +import inspect +import time +from concurrent.futures import ( + ThreadPoolExecutor, + ProcessPoolExecutor, + as_completed, +) +from functools import wraps +from typing import ( + Any, + Callable, + List, + Optional, + TypeVar, + Generic, +) +from dataclasses import dataclass +from enum import Enum + +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger("concurrent_wrapper") + +T = TypeVar("T") +R = TypeVar("R") + + +# Global function for process pool execution (must be picklable) +def _execute_task_in_process(task_data): + """ + Execute a task in a separate process. + This function must be at module level to be picklable. + """ + ( + func, + task_args, + task_kwargs, + task_id, + max_retries, + retry_on_failure, + retry_delay, + return_exceptions, + ) = task_data + + start_time = time.time() + + for attempt in range(max_retries + 1): + try: + result = func(*task_args, **task_kwargs) + execution_time = time.time() - start_time + return ConcurrentResult( + value=result, + execution_time=execution_time, + worker_id=task_id, + ) + except Exception as e: + if attempt == max_retries or not retry_on_failure: + execution_time = time.time() - start_time + if return_exceptions: + return ConcurrentResult( + exception=e, + execution_time=execution_time, + worker_id=task_id, + ) + else: + raise + else: + time.sleep(retry_delay * (2**attempt)) + + # This should never be reached, but just in case + return ConcurrentResult( + exception=Exception("Max retries exceeded") + ) + + +class ExecutorType(Enum): + """Enum for different types of executors.""" + + THREAD = "thread" + PROCESS = "process" + ASYNC = "async" + + +@dataclass +class ConcurrentConfig: + """Configuration for concurrent execution.""" + + name: Optional[str] = None + description: Optional[str] = None + max_workers: int = 4 + timeout: Optional[float] = None + executor_type: ExecutorType = ExecutorType.THREAD + return_exceptions: bool = False + chunk_size: Optional[int] = None + ordered: bool = True + retry_on_failure: bool = False + max_retries: int = 3 + retry_delay: float = 1.0 + + +class ConcurrentResult(Generic[T]): + """Result wrapper for concurrent execution.""" + + def __init__( + self, + value: T = None, + exception: Exception = None, + execution_time: float = 0.0, + worker_id: Optional[int] = None, + ): + self.value = value + self.exception = exception + self.execution_time = execution_time + self.worker_id = worker_id + self.success = exception is None + + def __repr__(self): + if self.success: + return f"ConcurrentResult(value={self.value}, time={self.execution_time:.3f}s)" + else: + return f"ConcurrentResult(exception={type(self.exception).__name__}: {self.exception})" + + +def concurrent( + name: Optional[str] = None, + description: Optional[str] = None, + max_workers: int = 4, + timeout: Optional[float] = None, + executor_type: ExecutorType = ExecutorType.THREAD, + return_exceptions: bool = False, + chunk_size: Optional[int] = None, + ordered: bool = True, + retry_on_failure: bool = False, + max_retries: int = 3, + retry_delay: float = 1.0, +): + """ + A decorator that enables concurrent execution of functions. + + Args: + name (Optional[str]): Name for the concurrent operation + description (Optional[str]): Description of the operation + max_workers (int): Maximum number of worker threads/processes + timeout (Optional[float]): Timeout in seconds for each task + executor_type (ExecutorType): Type of executor (thread, process, async) + return_exceptions (bool): Whether to return exceptions instead of raising + chunk_size (Optional[int]): Size of chunks for batch processing + ordered (bool): Whether to maintain order of results + retry_on_failure (bool): Whether to retry failed tasks + max_retries (int): Maximum number of retries per task + retry_delay (float): Delay between retries in seconds + + Returns: + Callable: Decorated function that can execute concurrently + """ + + def decorator(func: Callable[..., T]) -> Callable[..., T]: + config = ConcurrentConfig( + name=name or func.__name__, + description=description + or f"Concurrent execution of {func.__name__}", + max_workers=max_workers, + timeout=timeout, + executor_type=executor_type, + return_exceptions=return_exceptions, + chunk_size=chunk_size, + ordered=ordered, + retry_on_failure=retry_on_failure, + max_retries=max_retries, + retry_delay=retry_delay, + ) + + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + def _execute_single_task( + task_args, task_kwargs, task_id=None + ): + """Execute a single task with retry logic.""" + start_time = time.time() + + for attempt in range(config.max_retries + 1): + try: + result = func(*task_args, **task_kwargs) + execution_time = time.time() - start_time + return ConcurrentResult( + value=result, + execution_time=execution_time, + worker_id=task_id, + ) + except Exception as e: + if ( + attempt == config.max_retries + or not config.retry_on_failure + ): + execution_time = time.time() - start_time + if config.return_exceptions: + return ConcurrentResult( + exception=e, + execution_time=execution_time, + worker_id=task_id, + ) + else: + raise + else: + logger.warning( + f"Task {task_id} failed (attempt {attempt + 1}/{config.max_retries + 1}): {e}" + ) + time.sleep(config.retry_delay * (2**attempt)) + + def concurrent_execute(*args_list, **kwargs_list): + """Execute the function concurrently with multiple argument sets.""" + if not args_list and not kwargs_list: + raise ValueError( + "At least one set of arguments must be provided" + ) + + # Prepare tasks + tasks = [] + if args_list: + for args in args_list: + if isinstance(args, (list, tuple)): + tasks.append((args, {})) + else: + tasks.append(([args], {})) + + if kwargs_list: + for kwargs in kwargs_list: + if isinstance(kwargs, dict): + tasks.append(((), kwargs)) + else: + raise ValueError( + "kwargs_list must contain dictionaries" + ) + + logger.info( + f"Starting concurrent execution of {len(tasks)} tasks with {config.max_workers} workers" + ) + start_time = time.time() + + try: + if config.executor_type == ExecutorType.THREAD: + results = _execute_with_thread_pool(tasks) + elif config.executor_type == ExecutorType.PROCESS: + results = _execute_with_process_pool(tasks) + elif config.executor_type == ExecutorType.ASYNC: + results = _execute_with_async(tasks) + else: + raise ValueError( + f"Unsupported executor type: {config.executor_type}" + ) + + total_time = time.time() - start_time + successful_tasks = sum( + 1 for r in results if r.success + ) + + logger.info( + f"Completed {len(tasks)} tasks in {total_time:.3f}s " + f"({successful_tasks}/{len(tasks)} successful)" + ) + + return results + + except Exception as e: + logger.error(f"Concurrent execution failed: {e}") + raise + + def _execute_with_thread_pool(tasks): + """Execute tasks using ThreadPoolExecutor.""" + results = [] + + with ThreadPoolExecutor( + max_workers=config.max_workers + ) as executor: + if config.ordered: + future_to_task = { + executor.submit( + _execute_single_task, task[0], task[1], i + ): i + for i, task in enumerate(tasks) + } + + for future in as_completed( + future_to_task, timeout=config.timeout + ): + try: + result = future.result( + timeout=config.timeout + ) + results.append(result) + except Exception as e: + if config.return_exceptions: + results.append( + ConcurrentResult(exception=e) + ) + else: + raise + else: + futures = [ + executor.submit( + _execute_single_task, task[0], task[1], i + ) + for i, task in enumerate(tasks) + ] + + for future in as_completed( + futures, timeout=config.timeout + ): + try: + result = future.result( + timeout=config.timeout + ) + results.append(result) + except Exception as e: + if config.return_exceptions: + results.append( + ConcurrentResult(exception=e) + ) + else: + raise + + return results + + def _execute_with_process_pool(tasks): + """Execute tasks using ProcessPoolExecutor.""" + results = [] + + # Prepare task data for process execution + task_data_list = [] + for i, task in enumerate(tasks): + task_data = ( + func, # The function to execute + task[0], # args + task[1], # kwargs + i, # task_id + config.max_retries, + config.retry_on_failure, + config.retry_delay, + config.return_exceptions, + ) + task_data_list.append(task_data) + + with ProcessPoolExecutor( + max_workers=config.max_workers + ) as executor: + if config.ordered: + future_to_task = { + executor.submit( + _execute_task_in_process, task_data + ): i + for i, task_data in enumerate(task_data_list) + } + + for future in as_completed( + future_to_task, timeout=config.timeout + ): + try: + result = future.result( + timeout=config.timeout + ) + results.append(result) + except Exception as e: + if config.return_exceptions: + results.append( + ConcurrentResult(exception=e) + ) + else: + raise + else: + futures = [ + executor.submit( + _execute_task_in_process, task_data + ) + for task_data in task_data_list + ] + + for future in as_completed( + futures, timeout=config.timeout + ): + try: + result = future.result( + timeout=config.timeout + ) + results.append(result) + except Exception as e: + if config.return_exceptions: + results.append( + ConcurrentResult(exception=e) + ) + else: + raise + + return results + + async def _execute_with_async(tasks): + """Execute tasks using asyncio.""" + + async def _async_task( + task_args, task_kwargs, task_id=None + ): + start_time = time.time() + + for attempt in range(config.max_retries + 1): + try: + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, + lambda: func(*task_args, **task_kwargs), + ) + execution_time = time.time() - start_time + return ConcurrentResult( + value=result, + execution_time=execution_time, + worker_id=task_id, + ) + except Exception as e: + if ( + attempt == config.max_retries + or not config.retry_on_failure + ): + execution_time = time.time() - start_time + if config.return_exceptions: + return ConcurrentResult( + exception=e, + execution_time=execution_time, + worker_id=task_id, + ) + else: + raise + else: + logger.warning( + f"Async task {task_id} failed (attempt {attempt + 1}/{config.max_retries + 1}): {e}" + ) + await asyncio.sleep( + config.retry_delay * (2**attempt) + ) + + semaphore = asyncio.Semaphore(config.max_workers) + + async def _limited_task(task_args, task_kwargs, task_id): + async with semaphore: + return await _async_task( + task_args, task_kwargs, task_id + ) + + tasks_coros = [ + _limited_task(task[0], task[1], i) + for i, task in enumerate(tasks) + ] + + if config.ordered: + results = [] + for coro in asyncio.as_completed(tasks_coros): + try: + result = await coro + results.append(result) + except Exception as e: + if config.return_exceptions: + results.append( + ConcurrentResult(exception=e) + ) + else: + raise + return results + else: + return await asyncio.gather( + *tasks_coros, + return_exceptions=config.return_exceptions, + ) + + def concurrent_batch( + items: List[Any], + batch_size: Optional[int] = None, + **kwargs, + ) -> List[ConcurrentResult]: + """Execute the function concurrently on a batch of items.""" + batch_size = batch_size or config.chunk_size or len(items) + + tasks = [] + for item in items: + if isinstance(item, (list, tuple)): + tasks.append((item, kwargs)) + else: + tasks.append(([item], kwargs)) + + return concurrent_execute( + *[task[0] for task in tasks], + **[task[1] for task in tasks], + ) + + def concurrent_map( + items: List[Any], **kwargs + ) -> List[ConcurrentResult]: + """Map the function over a list of items concurrently.""" + return concurrent_batch(items, **kwargs) + + # Attach methods to the wrapper + wrapper.concurrent_execute = concurrent_execute + wrapper.concurrent_batch = concurrent_batch + wrapper.concurrent_map = concurrent_map + wrapper.config = config + + # Add metadata + wrapper.__concurrent_config__ = config + wrapper.__concurrent_enabled__ = True + + return wrapper + + return decorator + + +def concurrent_class_executor( + name: Optional[str] = None, + description: Optional[str] = None, + max_workers: int = 4, + timeout: Optional[float] = None, + executor_type: ExecutorType = ExecutorType.THREAD, + return_exceptions: bool = False, + chunk_size: Optional[int] = None, + ordered: bool = True, + retry_on_failure: bool = False, + max_retries: int = 3, + retry_delay: float = 1.0, + methods: Optional[List[str]] = None, +): + """ + A decorator that enables concurrent execution for class methods. + + Args: + name (Optional[str]): Name for the concurrent operation + description (Optional[str]): Description of the operation + max_workers (int): Maximum number of worker threads/processes + timeout (Optional[float]): Timeout in seconds for each task + executor_type (ExecutorType): Type of executor (thread, process, async) + return_exceptions (bool): Whether to return exceptions instead of raising + chunk_size (Optional[int]): Size of chunks for batch processing + ordered (bool): Whether to maintain order of results + retry_on_failure (bool): Whether to retry failed tasks + max_retries (int): Maximum number of retries per task + retry_delay (float): Delay between retries in seconds + methods (Optional[List[str]]): List of method names to make concurrent + + Returns: + Class: Class with concurrent execution capabilities + """ + + def decorator(cls): + config = ConcurrentConfig( + name=name or f"{cls.__name__}_concurrent", + description=description + or f"Concurrent execution for {cls.__name__}", + max_workers=max_workers, + timeout=timeout, + executor_type=executor_type, + return_exceptions=return_exceptions, + chunk_size=chunk_size, + ordered=ordered, + retry_on_failure=retry_on_failure, + max_retries=max_retries, + retry_delay=retry_delay, + ) + + # Get methods to make concurrent + target_methods = methods or [ + name + for name, method in inspect.getmembers( + cls, inspect.isfunction + ) + if not name.startswith("_") + ] + + for method_name in target_methods: + if hasattr(cls, method_name): + original_method = getattr(cls, method_name) + + # Create concurrent version of the method + concurrent_decorator = concurrent( + name=f"{cls.__name__}.{method_name}", + description=f"Concurrent execution of {cls.__name__}.{method_name}", + max_workers=config.max_workers, + timeout=config.timeout, + executor_type=config.executor_type, + return_exceptions=config.return_exceptions, + chunk_size=config.chunk_size, + ordered=config.ordered, + retry_on_failure=config.retry_on_failure, + max_retries=config.max_retries, + retry_delay=config.retry_delay, + ) + + # Apply the concurrent decorator to the method + setattr( + cls, + method_name, + concurrent_decorator(original_method), + ) + + # Add class-level concurrent configuration + cls.__concurrent_config__ = config + cls.__concurrent_enabled__ = True + + return cls + + return decorator + + +# Convenience functions for common use cases +def thread_executor(**kwargs): + """Convenience decorator for thread-based concurrent execution.""" + return concurrent(executor_type=ExecutorType.THREAD, **kwargs) + + +def process_executor(**kwargs): + """Convenience decorator for process-based concurrent execution.""" + return concurrent(executor_type=ExecutorType.PROCESS, **kwargs) + + +def async_executor(**kwargs): + """Convenience decorator for async-based concurrent execution.""" + return concurrent(executor_type=ExecutorType.ASYNC, **kwargs) + + +def batch_executor(batch_size: int = 10, **kwargs): + """Convenience decorator for batch processing.""" + return concurrent(chunk_size=batch_size, **kwargs)