parent
54dfa692a8
commit
14fc0a9f8a
@ -1,393 +0,0 @@
|
||||
from typing import List, Callable, Union, Optional
|
||||
from loguru import logger
|
||||
from swarms.structs.base_swarm import BaseSwarm
|
||||
from queue import PriorityQueue
|
||||
from concurrent.futures import (
|
||||
ThreadPoolExecutor,
|
||||
as_completed,
|
||||
)
|
||||
import time
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class SwarmRunData(BaseModel):
|
||||
"""
|
||||
Pydantic model to capture metadata about each swarm's execution.
|
||||
"""
|
||||
|
||||
swarm_name: str
|
||||
task: str
|
||||
priority: int
|
||||
start_time: Optional[float] = None
|
||||
end_time: Optional[float] = None
|
||||
duration: Optional[float] = None
|
||||
status: str = "Pending"
|
||||
retries: int = 0
|
||||
result: Optional[str] = None
|
||||
exception: Optional[str] = None
|
||||
|
||||
|
||||
class FederatedSwarmModel(BaseModel):
|
||||
"""
|
||||
Pydantic base model to capture and log data for the FederatedSwarm system.
|
||||
"""
|
||||
|
||||
task: str
|
||||
swarms_data: List[SwarmRunData] = Field(default_factory=list)
|
||||
|
||||
def add_swarm(self, swarm_name: str, task: str, priority: int):
|
||||
swarm_data = SwarmRunData(
|
||||
swarm_name=swarm_name, task=task, priority=priority
|
||||
)
|
||||
self.swarms_data.append(swarm_data)
|
||||
|
||||
def update_swarm_status(
|
||||
self,
|
||||
swarm_name: str,
|
||||
status: str,
|
||||
start_time: float = None,
|
||||
end_time: float = None,
|
||||
retries: int = 0,
|
||||
result: str = None,
|
||||
exception: str = None,
|
||||
):
|
||||
for swarm in self.swarms_data:
|
||||
if swarm.name == swarm_name:
|
||||
swarm.status = status
|
||||
if start_time:
|
||||
swarm.start_time = start_time
|
||||
if end_time:
|
||||
swarm.end_time = end_time
|
||||
swarm.duration = end_time - swarm.start_time
|
||||
swarm.retries = retries
|
||||
swarm.result = result
|
||||
swarm.exception = exception
|
||||
break
|
||||
|
||||
|
||||
class FederatedSwarm:
|
||||
def __init__(
|
||||
self,
|
||||
swarms: List[Union[BaseSwarm, Callable]],
|
||||
max_workers: int = 4,
|
||||
):
|
||||
"""
|
||||
Initializes the FederatedSwarm with a list of swarms or callable objects and
|
||||
sets up a priority queue and thread pool for concurrency.
|
||||
|
||||
Args:
|
||||
swarms (List[Union[BaseSwarm, Callable]]): A list of swarms (BaseSwarm) or callable objects.
|
||||
max_workers (int): The maximum number of concurrent workers (threads) to run swarms in parallel.
|
||||
"""
|
||||
self.swarms = PriorityQueue()
|
||||
self.max_workers = max_workers
|
||||
self.thread_pool = ThreadPoolExecutor(
|
||||
max_workers=self.max_workers
|
||||
)
|
||||
self.task_queue = []
|
||||
self.future_to_swarm = {}
|
||||
self.results = {}
|
||||
self.validate_swarms(swarms)
|
||||
|
||||
def init_metadata(self, task: str):
|
||||
"""
|
||||
Initializes the Pydantic base model to capture metadata about the current task and swarms.
|
||||
"""
|
||||
self.metadata = FederatedSwarmModel(task=task)
|
||||
for priority, swarm in list(self.swarms.queue):
|
||||
swarm_name = (
|
||||
swarm.__class__.__name__
|
||||
if hasattr(swarm, "__class__")
|
||||
else str(swarm)
|
||||
)
|
||||
self.metadata.add_swarm(
|
||||
swarm_name=swarm_name, task=task, priority=priority
|
||||
)
|
||||
logger.info(f"Metadata initialized for task '{task}'.")
|
||||
|
||||
def validate_swarms(
|
||||
self, swarms: List[Union[BaseSwarm, Callable]]
|
||||
):
|
||||
"""
|
||||
Validates and adds swarms to the priority queue, ensuring each swarm has a `run(task)` method.
|
||||
|
||||
Args:
|
||||
swarms (List[Union[BaseSwarm, Callable]]): List of swarms with an optional priority value.
|
||||
"""
|
||||
for swarm, priority in swarms:
|
||||
if not callable(swarm):
|
||||
raise TypeError(f"{swarm} is not callable.")
|
||||
|
||||
if hasattr(swarm, "run"):
|
||||
logger.info(f"{swarm} has a 'run' method.")
|
||||
else:
|
||||
raise AttributeError(
|
||||
f"{swarm} does not have a 'run(task)' method."
|
||||
)
|
||||
|
||||
self.swarms.put((priority, swarm))
|
||||
logger.info(
|
||||
f"Swarm {swarm} added with priority {priority}."
|
||||
)
|
||||
|
||||
def run_parallel(
|
||||
self,
|
||||
task: str,
|
||||
timeout: Optional[float] = None,
|
||||
retries: int = 0,
|
||||
):
|
||||
"""
|
||||
Runs all swarms in parallel with prioritization and optional timeout.
|
||||
|
||||
Args:
|
||||
task (str): The task to be passed to the `run` method of each swarm.
|
||||
timeout (Optional[float]): Maximum time allowed for each swarm to run.
|
||||
retries (int): Number of retries allowed for failed swarms.
|
||||
"""
|
||||
logger.info(
|
||||
f"Running task '{task}' in parallel with timeout: {timeout}, retries: {retries}"
|
||||
)
|
||||
self.init_metadata(task)
|
||||
|
||||
while not self.swarms.empty():
|
||||
priority, swarm = self.swarms.get()
|
||||
swarm_name = (
|
||||
swarm.__class__.__name__
|
||||
if hasattr(swarm, "__class__")
|
||||
else str(swarm)
|
||||
)
|
||||
future = self.thread_pool.submit(
|
||||
self._run_with_retry,
|
||||
swarm,
|
||||
task,
|
||||
retries,
|
||||
timeout,
|
||||
swarm_name,
|
||||
)
|
||||
self.future_to_swarm[future] = swarm
|
||||
|
||||
for future in as_completed(self.future_to_swarm):
|
||||
swarm = self.future_to_swarm[future]
|
||||
try:
|
||||
result = future.result()
|
||||
swarm_name = (
|
||||
swarm.__class__.__name__
|
||||
if hasattr(swarm, "__class__")
|
||||
else str(swarm)
|
||||
)
|
||||
self.metadata.update_swarm_status(
|
||||
swarm_name=swarm_name,
|
||||
status="Completed",
|
||||
result=result,
|
||||
)
|
||||
logger.info(
|
||||
f"Swarm {swarm_name} completed successfully."
|
||||
)
|
||||
except Exception as e:
|
||||
swarm_name = (
|
||||
swarm.__class__.__name__
|
||||
if hasattr(swarm, "__class__")
|
||||
else str(swarm)
|
||||
)
|
||||
self.metadata.update_swarm_status(
|
||||
swarm_name=swarm_name,
|
||||
status="Failed",
|
||||
exception=str(e),
|
||||
)
|
||||
logger.error(f"Swarm {swarm_name} failed: {e}")
|
||||
self.results[swarm] = "Failed"
|
||||
|
||||
def run_sequentially(
|
||||
self,
|
||||
task: str,
|
||||
retries: int = 0,
|
||||
timeout: Optional[float] = None,
|
||||
):
|
||||
"""
|
||||
Runs all swarms sequentially in order of priority.
|
||||
|
||||
Args:
|
||||
task (str): The task to pass to the `run` method of each swarm.
|
||||
retries (int): Number of retries for failed swarms.
|
||||
timeout (Optional[float]): Optional time limit for each swarm.
|
||||
"""
|
||||
logger.info(f"Running task '{task}' sequentially.")
|
||||
|
||||
while not self.swarms.empty():
|
||||
priority, swarm = self.swarms.get()
|
||||
try:
|
||||
logger.info(
|
||||
f"Running swarm {swarm} with priority {priority}."
|
||||
)
|
||||
self._run_with_retry(swarm, task, retries, timeout)
|
||||
logger.info(f"Swarm {swarm} completed successfully.")
|
||||
except Exception as e:
|
||||
logger.error(f"Swarm {swarm} failed with error: {e}")
|
||||
|
||||
def _run_with_retry(
|
||||
self,
|
||||
swarm: Union[BaseSwarm, Callable],
|
||||
task: str,
|
||||
retries: int,
|
||||
timeout: Optional[float],
|
||||
swarm_name: str,
|
||||
):
|
||||
"""
|
||||
Helper function to run a swarm with a retry mechanism and optional timeout.
|
||||
|
||||
Args:
|
||||
swarm (Union[BaseSwarm, Callable]): The swarm to run.
|
||||
task (str): The task to pass to the swarm.
|
||||
retries (int): The number of retries allowed for the swarm in case of failure.
|
||||
timeout (Optional[float]): Maximum time allowed for the swarm to run.
|
||||
swarm_name (str): Name of the swarm (used for metadata).
|
||||
"""
|
||||
attempts = 0
|
||||
start_time = time.time()
|
||||
while attempts <= retries:
|
||||
try:
|
||||
logger.info(
|
||||
f"Running swarm {swarm}. Attempt: {attempts + 1}"
|
||||
)
|
||||
self.metadata.update_swarm_status(
|
||||
swarm_name=swarm_name,
|
||||
status="Running",
|
||||
start_time=start_time,
|
||||
)
|
||||
if hasattr(swarm, "run"):
|
||||
if timeout:
|
||||
start_time = time.time()
|
||||
swarm.run(task)
|
||||
duration = time.time() - start_time
|
||||
if duration > timeout:
|
||||
raise TimeoutError(
|
||||
f"Swarm {swarm} timed out after {duration:.2f}s."
|
||||
)
|
||||
else:
|
||||
swarm.run(task)
|
||||
else:
|
||||
swarm(task)
|
||||
end_time = time.time()
|
||||
self.metadata.update_swarm_status(
|
||||
swarm_name=swarm_name,
|
||||
status="Completed",
|
||||
end_time=end_time,
|
||||
retries=attempts,
|
||||
)
|
||||
return "Success"
|
||||
except Exception as e:
|
||||
logger.error(f"Swarm {swarm} failed: {e}")
|
||||
attempts += 1
|
||||
if attempts > retries:
|
||||
end_time = time.time()
|
||||
self.metadata.update_swarm_status(
|
||||
swarm_name=swarm_name,
|
||||
status="Failed",
|
||||
end_time=end_time,
|
||||
retries=attempts,
|
||||
exception=str(e),
|
||||
)
|
||||
logger.error(f"Swarm {swarm} exhausted retries.")
|
||||
raise
|
||||
|
||||
def add_swarm(
|
||||
self, swarm: Union[BaseSwarm, Callable], priority: int
|
||||
):
|
||||
"""
|
||||
Adds a new swarm to the FederatedSwarm at runtime.
|
||||
|
||||
Args:
|
||||
swarm (Union[BaseSwarm, Callable]): The swarm to add.
|
||||
priority (int): The priority level for the swarm.
|
||||
"""
|
||||
self.swarms.put((priority, swarm))
|
||||
logger.info(
|
||||
f"Swarm {swarm} added dynamically with priority {priority}."
|
||||
)
|
||||
|
||||
def queue_task(self, task: str):
|
||||
"""
|
||||
Adds a task to the internal task queue for batch processing.
|
||||
|
||||
Args:
|
||||
task (str): The task to queue.
|
||||
"""
|
||||
self.task_queue.append(task)
|
||||
logger.info(f"Task '{task}' added to the queue.")
|
||||
|
||||
def process_task_queue(self):
|
||||
"""
|
||||
Processes all tasks in the task queue.
|
||||
"""
|
||||
for task in self.task_queue:
|
||||
logger.info(f"Processing task: {task}")
|
||||
self.run_parallel(task)
|
||||
self.task_queue = []
|
||||
|
||||
def log_swarm_results(self):
|
||||
"""
|
||||
Logs the results of all swarms after execution.
|
||||
"""
|
||||
logger.info("Logging swarm results...")
|
||||
for swarm, result in self.results.items():
|
||||
logger.info(f"Swarm {swarm}: {result}")
|
||||
|
||||
def get_swarm_status(self) -> dict:
|
||||
"""
|
||||
Retrieves the status of each swarm (completed, running, failed).
|
||||
|
||||
Returns:
|
||||
dict: Dictionary containing swarm statuses.
|
||||
"""
|
||||
status = {}
|
||||
for future, swarm in self.future_to_swarm.items():
|
||||
if future.done():
|
||||
status[swarm] = "Completed"
|
||||
elif future.running():
|
||||
status[swarm] = "Running"
|
||||
else:
|
||||
status[swarm] = "Failed"
|
||||
return status
|
||||
|
||||
def cancel_running_swarms(self):
|
||||
"""
|
||||
Cancels all currently running swarms by shutting down the thread pool.
|
||||
"""
|
||||
logger.warning("Cancelling all running swarms...")
|
||||
self.thread_pool.shutdown(wait=False)
|
||||
logger.info("All running swarms cancelled.")
|
||||
|
||||
|
||||
# Example Usage:
|
||||
|
||||
|
||||
# class ExampleSwarm(BaseSwarm):
|
||||
# def run(self, task: str):
|
||||
# logger.info(f"ExampleSwarm is processing task: {task}")
|
||||
|
||||
|
||||
# def example_callable(task: str):
|
||||
# logger.info(f"Callable is processing task: {task}")
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# swarms = [(ExampleSwarm(), 1), (example_callable, 2)]
|
||||
# federated_swarm = FederatedSwarm(swarms)
|
||||
|
||||
# # Run in parallel
|
||||
# federated_swarm.run_parallel(
|
||||
# "Process data", timeout=10, retries=3
|
||||
# )
|
||||
|
||||
# # Run sequentially
|
||||
# federated_swarm.run_sequentially("Process data sequentially")
|
||||
|
||||
# # Log results
|
||||
# federated_swarm.log_swarm_results()
|
||||
|
||||
# # Get status of swarms
|
||||
# status = federated_swarm.get_swarm_status()
|
||||
# logger.info(f"Swarm statuses: {status}")
|
||||
|
||||
# # Cancel running swarms (if needed)
|
||||
# # federated_swarm.cancel_running_swarms()
|
@ -1,98 +0,0 @@
|
||||
from functools import wraps
|
||||
from loguru import logger
|
||||
import tracemalloc
|
||||
import psutil
|
||||
import time
|
||||
from typing import Callable, Any
|
||||
|
||||
|
||||
def profile_all(func: Callable) -> Callable:
|
||||
"""
|
||||
A decorator to profile memory usage, CPU usage, and I/O operations
|
||||
of a function and log the data using loguru.
|
||||
|
||||
It combines tracemalloc for memory profiling, psutil for CPU and I/O operations,
|
||||
and measures execution time.
|
||||
|
||||
Args:
|
||||
func (Callable): The function to be profiled.
|
||||
|
||||
Returns:
|
||||
Callable: The wrapped function with profiling enabled.
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
||||
# Start memory tracking
|
||||
tracemalloc.start()
|
||||
|
||||
# Get initial CPU stats
|
||||
process = psutil.Process()
|
||||
initial_cpu_times = process.cpu_times()
|
||||
|
||||
# Get initial I/O stats if available
|
||||
try:
|
||||
initial_io_counters = process.io_counters()
|
||||
io_tracking_available = True
|
||||
except AttributeError:
|
||||
logger.warning(
|
||||
"I/O counters not available on this platform."
|
||||
)
|
||||
io_tracking_available = False
|
||||
|
||||
# Start timing the function execution
|
||||
start_time = time.time()
|
||||
|
||||
# Execute the function
|
||||
result = func(*args, **kwargs)
|
||||
|
||||
# Stop timing
|
||||
end_time = time.time()
|
||||
execution_time = end_time - start_time
|
||||
|
||||
# Get final CPU stats
|
||||
final_cpu_times = process.cpu_times()
|
||||
|
||||
# Get final I/O stats if available
|
||||
if io_tracking_available:
|
||||
final_io_counters = process.io_counters()
|
||||
io_read_count = (
|
||||
final_io_counters.read_count
|
||||
- initial_io_counters.read_count
|
||||
)
|
||||
io_write_count = (
|
||||
final_io_counters.write_count
|
||||
- initial_io_counters.write_count
|
||||
)
|
||||
else:
|
||||
io_read_count = io_write_count = 0
|
||||
|
||||
# Get memory usage statistics
|
||||
snapshot = tracemalloc.take_snapshot()
|
||||
top_stats = snapshot.statistics("lineno")
|
||||
|
||||
# Calculate CPU usage
|
||||
cpu_usage = (
|
||||
final_cpu_times.user
|
||||
- initial_cpu_times.user
|
||||
+ final_cpu_times.system
|
||||
- initial_cpu_times.system
|
||||
)
|
||||
|
||||
# Log the data
|
||||
logger.info(f"Execution time: {execution_time:.4f} seconds")
|
||||
logger.info(f"CPU usage: {cpu_usage:.2f} seconds")
|
||||
if io_tracking_available:
|
||||
logger.info(
|
||||
f"I/O Operations - Read: {io_read_count}, Write: {io_write_count}"
|
||||
)
|
||||
logger.info("Top memory usage:")
|
||||
for stat in top_stats[:10]:
|
||||
logger.info(stat)
|
||||
|
||||
# Stop memory tracking
|
||||
tracemalloc.stop()
|
||||
|
||||
return result
|
||||
|
||||
return wrapper
|
Loading…
Reference in new issue