You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/swarms/structs/base.py

389 lines
11 KiB

import json
import os
from abc import ABC, abstractmethod
from typing import Optional, Any, Dict, List
from datetime import datetime
import asyncio
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import psutil
try:
import gzip
except ImportError as error:
print(f"Error importing gzip: {error}")
class BaseStructure(ABC):
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
save_metadata: bool = True,
save_artifact_path: Optional[str] = "./artifacts",
save_metadata_path: Optional[str] = "./metadata",
save_error_path: Optional[str] = "./errors",
*args,
**kwargs,
):
self.name = name
self.description = description
self.save_metadata = save_metadata
self.save_artifact_path = save_artifact_path
self.save_metadata_path = save_metadata_path
self.save_error_path = save_error_path
@abstractmethod
def run(self, *args, **kwargs):
"""Run the structure."""
pass
def save_to_file(self, data: Any, file_path: str):
"""Save data to file.
Args:
data (Any): _description_
file_path (str): _description_
"""
with open(file_path, "w") as file:
json.dump(data, file)
def load_from_file(self, file_path: str) -> Any:
"""Load data from file.
Args:
file_path (str): _description_
Returns:
Any: _description_
"""
with open(file_path, "r") as file:
return json.load(file)
def save_metadata(self, metadata: Dict[str, Any]):
"""Save metadata to file.
Args:
metadata (Dict[str, Any]): _description_
"""
if self.save_metadata:
file_path = os.path.join(
self.save_metadata_path, f"{self.name}_metadata.json"
)
self.save_to_file(metadata, file_path)
def load_metadata(self) -> Dict[str, Any]:
"""Load metadata from file.
Returns:
Dict[str, Any]: _description_
"""
file_path = os.path.join(
self.save_metadata_path, f"{self.name}_metadata.json"
)
return self.load_from_file(file_path)
def log_error(self, error_message: str):
"""Log error to file.
Args:
error_message (str): _description_
"""
file_path = os.path.join(
self.save_error_path, f"{self.name}_errors.log"
)
with open(file_path, "a") as file:
file.write(f"{error_message}\n")
def save_artifact(self, artifact: Any, artifact_name: str):
"""Save artifact to file.
Args:
artifact (Any): _description_
artifact_name (str): _description_
"""
file_path = os.path.join(
self.save_artifact_path, f"{artifact_name}.json"
)
self.save_to_file(artifact, file_path)
def load_artifact(self, artifact_name: str) -> Any:
"""Load artifact from file.
Args:
artifact_name (str): _description_
Returns:
Any: _description_
"""
file_path = os.path.join(
self.save_artifact_path, f"{artifact_name}.json"
)
return self.load_from_file(file_path)
def _current_timestamp(self):
"""Current timestamp.
Returns:
_type_: _description_
"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def log_event(
self,
event: str,
event_type: str = "INFO",
):
"""Log event to file.
Args:
event (str): _description_
event_type (str, optional): _description_. Defaults to "INFO".
"""
timestamp = self._current_timestamp()
log_message = f"[{timestamp}] [{event_type}] {event}\n"
file = os.path.join(
self.save_metadata_path, f"{self.name}_events.log"
)
with open(file, "a") as file:
file.write(log_message)
async def run_async(self, *args, **kwargs):
"""Run the structure asynchronously."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.run, *args, **kwargs
)
async def save_metadata_async(self, metadata: Dict[str, Any]):
"""Save metadata to file asynchronously.
Args:
metadata (Dict[str, Any]): _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.save_metadata, metadata
)
async def load_metadata_async(self) -> Dict[str, Any]:
"""Load metadata from file asynchronously.
Returns:
Dict[str, Any]: _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.load_metadata)
async def log_error_async(self, error_message: str):
"""Log error to file asynchronously.
Args:
error_message (str): _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.log_error, error_message
)
async def save_artifact_async(
self, artifact: Any, artifact_name: str
):
"""Save artifact to file asynchronously.
Args:
artifact (Any): _description_
artifact_name (str): _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.save_artifact, artifact, artifact_name
)
async def load_artifact_async(self, artifact_name: str) -> Any:
"""Load artifact from file asynchronously.
Args:
artifact_name (str): _description_
Returns:
Any: _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.load_artifact, artifact_name
)
async def log_event_async(
self,
event: str,
event_type: str = "INFO",
):
"""Log event to file asynchronously.
Args:
event (str): _description_
event_type (str, optional): _description_. Defaults to "INFO".
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.log_event, event, event_type
)
async def asave_to_file(
self, data: Any, file: str, *args, **kwargs
):
"""Save data to file asynchronously.
Args:
data (Any): _description_
file (str): _description_
"""
await asyncio.to_thread(
self.save_to_file,
data,
file,
*args,
)
async def aload_from_file(
self,
file: str,
) -> Any:
"""Async load data from file.
Args:
file (str): _description_
Returns:
Any: _description_
"""
return await asyncio.to_thread(self.load_from_file, file)
def run_in_thread(self, *args, **kwargs):
"""Run the structure in a thread."""
with concurrent.futures.ThreadPoolExecutor() as executor:
return executor.submit(self.run, *args, **kwargs)
def save_metadata_in_thread(self, metadata: Dict[str, Any]):
"""Save metadata to file in a thread.
Args:
metadata (Dict[str, Any]): _description_
"""
with concurrent.futures.ThreadPoolExecutor() as executor:
return executor.submit(self.save_metadata, metadata)
def run_concurrent(self, *args, **kwargs):
"""Run the structure concurrently."""
return asyncio.run(self.run_async(*args, **kwargs))
def compress_data(
self,
data: Any,
) -> bytes:
"""Compress data.
Args:
data (Any): _description_
Returns:
bytes: _description_
"""
return gzip.compress(json.dumps(data).encode())
def decompres_data(self, data: bytes) -> Any:
"""Decompress data.
Args:
data (bytes): _description_
Returns:
Any: _description_
"""
return json.loads(gzip.decompress(data).decode())
def run_batched(
self,
batched_data: List[Any],
batch_size: int = 10,
*args,
**kwargs,
):
"""Run batched data.
Args:
batched_data (List[Any]): _description_
batch_size (int, optional): _description_. Defaults to 10.
Returns:
_type_: _description_
"""
with ThreadPoolExecutor(max_workers=batch_size) as executor:
futures = [
executor.submit(self.run, data)
for data in batched_data
]
return [future.result() for future in futures]
def load_config(
self, config: str = None, *args, **kwargs
) -> Dict[str, Any]:
"""Load config from file.
Args:
config (str, optional): _description_. Defaults to None.
Returns:
Dict[str, Any]: _description_
"""
return self.load_from_file(config)
def backup_data(
self, data: Any, backup_path: str = None, *args, **kwargs
):
"""Backup data to file.
Args:
data (Any): _description_
backup_path (str, optional): _description_. Defaults to None.
"""
timestamp = self._current_timestamp()
backup_file_path = f"{backup_path}/{timestamp}.json"
self.save_to_file(data, backup_file_path)
def monitor_resources(self):
"""Monitor resource usage."""
memory = psutil.virtual_memory().percent
cpu_usage = psutil.cpu_percent(interval=1)
self.log_event(
f"Resource usage - Memory: {memory}%, CPU: {cpu_usage}%"
)
def run_with_resources(self, *args, **kwargs):
"""Run the structure with resource monitoring."""
self.monitor_resources()
return self.run(*args, **kwargs)
def run_with_resources_batched(
self,
batched_data: List[Any],
batch_size: int = 10,
*args,
**kwargs,
):
"""Run batched data with resource monitoring.
Args:
batched_data (List[Any]): _description_
batch_size (int, optional): _description_. Defaults to 10.
Returns:
_type_: _description_
"""
self.monitor_resources()
return self.run_batched(
batched_data, batch_size, *args, **kwargs
)