From ee5786cdf2a4287e36f74cf807ff80e4e798c019 Mon Sep 17 00:00:00 2001 From: Kye Date: Tue, 12 Dec 2023 14:59:45 -0800 Subject: [PATCH] [BaseMultiModalModel] [BaseSwarm] [BaseStructure] --- pyproject.toml | 2 +- swarms/models/base_multimodal_model.py | 4 +- swarms/models/gpt4_vision_api.py | 62 +++- swarms/structs/base.py | 389 ++++++++++++++++++++++++- tests/swarms/test_base.py | 259 ++++++++++++++++ 5 files changed, 704 insertions(+), 12 deletions(-) create mode 100644 tests/swarms/test_base.py diff --git a/pyproject.toml b/pyproject.toml index faf387fa..4e1c1c7c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,7 @@ faiss-cpu = "*" backoff = "*" marshmallow = "*" datasets = "*" -optimum = "*" +optimum = "1.15.0" diffusers = "*" PyPDF2 = "*" vllm = "*" diff --git a/swarms/models/base_multimodal_model.py b/swarms/models/base_multimodal_model.py index cbf4f163..396bd56f 100644 --- a/swarms/models/base_multimodal_model.py +++ b/swarms/models/base_multimodal_model.py @@ -317,13 +317,13 @@ class BaseMultiModalModel: content = colored(content, color) print(content) - def stream(self, content: str): + def stream_response(self, text: str): """Stream the output Args: content (str): _description_ """ - for chunk in content: + for chunk in text: print(chunk) def meta_prompt(self): diff --git a/swarms/models/gpt4_vision_api.py b/swarms/models/gpt4_vision_api.py index 6bab073f..7496cf32 100644 --- a/swarms/models/gpt4_vision_api.py +++ b/swarms/models/gpt4_vision_api.py @@ -112,10 +112,15 @@ class GPT4VisionAPI(BaseMultiModalModel): def download_img_then_encode(self, img: str): """Download image from URL then encode image to base64 using requests""" - pass + if not os.path.exists(img): + print(f"Image file not found: {img}") + return None + + response = requests.get(img) + return base64.b64encode(response.content).decode("utf-8") # Function to handle vision tasks - def run(self, img: str, task: str, *args, **kwargs): + def run(self, task: str = None, img: str = None, *args, **kwargs): """Run the model.""" try: base64_image = self.encode_image(img) @@ -162,7 +167,10 @@ class GPT4VisionAPI(BaseMultiModalModel): return None except Exception as error: - print(f"Error with the request: {error}") + print( + f"Error with the request: {error}, make sure you" + " double check input types and positions" + ) return None def video_prompt(self, frames): @@ -207,7 +215,7 @@ class GPT4VisionAPI(BaseMultiModalModel): for chunk in content: print(chunk) - def process_video(self, video: str): + def process_video(self, video: str = None): """ Process a video into a list of base64 frames @@ -252,8 +260,50 @@ class GPT4VisionAPI(BaseMultiModalModel): *args, **kwargs, ): - self.video_prompt(self.process_video(video)) - pass + prompt = self.video_prompt(self.process_video(video)) + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {openai_api_key}", + } + payload = { + "model": self.model_name, + "messages": [ + { + "role": "system", + "content": [self.system_prompt], + }, + { + "role": "user", + "content": [ + (task,), # task + *map( + lambda x: {"image": x, "resize": 768}, + prompt[0::50], + ), + ], + }, + ], + "max_tokens": self.max_tokens, + } + response = requests.post( + self.openai_proxy, + headers=headers, + json=payload, + ) + + out = response.json() + content = out["choices"][0]["message"]["content"] + + if self.streaming_enabled: + content = self.stream_response(content) + else: + pass + + if self.beautify: + content = colored(content, "cyan") + print(content) + else: + print(content) def __call__( self, diff --git a/swarms/structs/base.py b/swarms/structs/base.py index 559416f0..9f6da9af 100644 --- a/swarms/structs/base.py +++ b/swarms/structs/base.py @@ -1,5 +1,388 @@ -""" -Base Structure for all Swarm Structures +import json +import os +from abc import ABC, abstractmethod +from typing import Optional, Any, Dict, List +from datetime import datetime +import asyncio +import concurrent.futures +from concurrent.futures import ThreadPoolExecutor +import psutil +try: + import gzip +except ImportError as error: + print(f"Error importing gzip: {error}") -""" + +class BaseStructure(ABC): + + def __init__( + self, + name: Optional[str] = None, + description: Optional[str] = None, + save_metadata: bool = True, + save_artifact_path: Optional[str] = "./artifacts", + save_metadata_path: Optional[str] = "./metadata", + save_error_path: Optional[str] = "./errors", + *args, + **kwargs, + ): + self.name = name + self.description = description + self.save_metadata = save_metadata + self.save_artifact_path = save_artifact_path + self.save_metadata_path = save_metadata_path + self.save_error_path = save_error_path + + @abstractmethod + def run(self, *args, **kwargs): + """Run the structure.""" + pass + + def save_to_file(self, data: Any, file_path: str): + """Save data to file. + + Args: + data (Any): _description_ + file_path (str): _description_ + """ + with open(file_path, "w") as file: + json.dump(data, file) + + def load_from_file(self, file_path: str) -> Any: + """Load data from file. + + Args: + file_path (str): _description_ + + Returns: + Any: _description_ + """ + with open(file_path, "r") as file: + return json.load(file) + + def save_metadata(self, metadata: Dict[str, Any]): + """Save metadata to file. + + Args: + metadata (Dict[str, Any]): _description_ + """ + if self.save_metadata: + file_path = os.path.join( + self.save_metadata_path, f"{self.name}_metadata.json" + ) + self.save_to_file(metadata, file_path) + + def load_metadata(self) -> Dict[str, Any]: + """Load metadata from file. + + Returns: + Dict[str, Any]: _description_ + """ + file_path = os.path.join( + self.save_metadata_path, f"{self.name}_metadata.json" + ) + return self.load_from_file(file_path) + + def log_error(self, error_message: str): + """Log error to file. + + Args: + error_message (str): _description_ + """ + file_path = os.path.join( + self.save_error_path, f"{self.name}_errors.log" + ) + with open(file_path, "a") as file: + file.write(f"{error_message}\n") + + def save_artifact(self, artifact: Any, artifact_name: str): + """Save artifact to file. + + Args: + artifact (Any): _description_ + artifact_name (str): _description_ + """ + file_path = os.path.join( + self.save_artifact_path, f"{artifact_name}.json" + ) + self.save_to_file(artifact, file_path) + + def load_artifact(self, artifact_name: str) -> Any: + """Load artifact from file. + + Args: + artifact_name (str): _description_ + + Returns: + Any: _description_ + """ + file_path = os.path.join( + self.save_artifact_path, f"{artifact_name}.json" + ) + return self.load_from_file(file_path) + + def _current_timestamp(self): + """Current timestamp. + + Returns: + _type_: _description_ + """ + return datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + def log_event( + self, + event: str, + event_type: str = "INFO", + ): + """Log event to file. + + Args: + event (str): _description_ + event_type (str, optional): _description_. Defaults to "INFO". + """ + timestamp = self._current_timestamp() + log_message = f"[{timestamp}] [{event_type}] {event}\n" + file = os.path.join( + self.save_metadata_path, f"{self.name}_events.log" + ) + with open(file, "a") as file: + file.write(log_message) + + async def run_async(self, *args, **kwargs): + """Run the structure asynchronously.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, self.run, *args, **kwargs + ) + + async def save_metadata_async(self, metadata: Dict[str, Any]): + """Save metadata to file asynchronously. + + Args: + metadata (Dict[str, Any]): _description_ + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, self.save_metadata, metadata + ) + + async def load_metadata_async(self) -> Dict[str, Any]: + """Load metadata from file asynchronously. + + Returns: + Dict[str, Any]: _description_ + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, self.load_metadata) + + async def log_error_async(self, error_message: str): + """Log error to file asynchronously. + + Args: + error_message (str): _description_ + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, self.log_error, error_message + ) + + async def save_artifact_async( + self, artifact: Any, artifact_name: str + ): + """Save artifact to file asynchronously. + + Args: + artifact (Any): _description_ + artifact_name (str): _description_ + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, self.save_artifact, artifact, artifact_name + ) + + async def load_artifact_async(self, artifact_name: str) -> Any: + """Load artifact from file asynchronously. + + Args: + artifact_name (str): _description_ + + Returns: + Any: _description_ + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, self.load_artifact, artifact_name + ) + + async def log_event_async( + self, + event: str, + event_type: str = "INFO", + ): + """Log event to file asynchronously. + + Args: + event (str): _description_ + event_type (str, optional): _description_. Defaults to "INFO". + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, self.log_event, event, event_type + ) + + async def asave_to_file( + self, data: Any, file: str, *args, **kwargs + ): + """Save data to file asynchronously. + + Args: + data (Any): _description_ + file (str): _description_ + """ + await asyncio.to_thread( + self.save_to_file, + data, + file, + *args, + ) + + async def aload_from_file( + self, + file: str, + ) -> Any: + """Async load data from file. + + Args: + file (str): _description_ + + Returns: + Any: _description_ + """ + return await asyncio.to_thread(self.load_from_file, file) + + def run_in_thread(self, *args, **kwargs): + """Run the structure in a thread.""" + with concurrent.futures.ThreadPoolExecutor() as executor: + return executor.submit(self.run, *args, **kwargs) + + def save_metadata_in_thread(self, metadata: Dict[str, Any]): + """Save metadata to file in a thread. + + Args: + metadata (Dict[str, Any]): _description_ + """ + with concurrent.futures.ThreadPoolExecutor() as executor: + return executor.submit(self.save_metadata, metadata) + + def run_concurrent(self, *args, **kwargs): + """Run the structure concurrently.""" + return asyncio.run(self.run_async(*args, **kwargs)) + + def compress_data( + self, + data: Any, + ) -> bytes: + """Compress data. + + Args: + data (Any): _description_ + + Returns: + bytes: _description_ + """ + return gzip.compress(json.dumps(data).encode()) + + def decompres_data(self, data: bytes) -> Any: + """Decompress data. + + Args: + data (bytes): _description_ + + Returns: + Any: _description_ + """ + return json.loads(gzip.decompress(data).decode()) + + def run_batched( + self, + batched_data: List[Any], + batch_size: int = 10, + *args, + **kwargs, + ): + """Run batched data. + + Args: + batched_data (List[Any]): _description_ + batch_size (int, optional): _description_. Defaults to 10. + + Returns: + _type_: _description_ + """ + with ThreadPoolExecutor(max_workers=batch_size) as executor: + futures = [ + executor.submit(self.run, data) + for data in batched_data + ] + return [future.result() for future in futures] + + def load_config( + self, config: str = None, *args, **kwargs + ) -> Dict[str, Any]: + """Load config from file. + + Args: + config (str, optional): _description_. Defaults to None. + + Returns: + Dict[str, Any]: _description_ + """ + return self.load_from_file(config) + + def backup_data( + self, data: Any, backup_path: str = None, *args, **kwargs + ): + """Backup data to file. + + Args: + data (Any): _description_ + backup_path (str, optional): _description_. Defaults to None. + """ + timestamp = self._current_timestamp() + backup_file_path = f"{backup_path}/{timestamp}.json" + self.save_to_file(data, backup_file_path) + + def monitor_resources(self): + """Monitor resource usage.""" + memory = psutil.virtual_memory().percent + cpu_usage = psutil.cpu_percent(interval=1) + self.log_event( + f"Resource usage - Memory: {memory}%, CPU: {cpu_usage}%" + ) + + def run_with_resources(self, *args, **kwargs): + """Run the structure with resource monitoring.""" + self.monitor_resources() + return self.run(*args, **kwargs) + + def run_with_resources_batched( + self, + batched_data: List[Any], + batch_size: int = 10, + *args, + **kwargs, + ): + """Run batched data with resource monitoring. + + Args: + batched_data (List[Any]): _description_ + batch_size (int, optional): _description_. Defaults to 10. + + Returns: + _type_: _description_ + """ + self.monitor_resources() + return self.run_batched( + batched_data, batch_size, *args, **kwargs + ) diff --git a/tests/swarms/test_base.py b/tests/swarms/test_base.py new file mode 100644 index 00000000..5ec21b3f --- /dev/null +++ b/tests/swarms/test_base.py @@ -0,0 +1,259 @@ +import pytest +import os +from datetime import datetime +from swarms.swarms.base import BaseStructure + + +class TestBaseStructure: + def test_init(self): + base_structure = BaseStructure( + name="TestStructure", + description="Test description", + save_metadata=True, + save_artifact_path="./test_artifacts", + save_metadata_path="./test_metadata", + save_error_path="./test_errors", + ) + + assert base_structure.name == "TestStructure" + assert base_structure.description == "Test description" + assert base_structure.save_metadata is True + assert base_structure.save_artifact_path == "./test_artifacts" + assert base_structure.save_metadata_path == "./test_metadata" + assert base_structure.save_error_path == "./test_errors" + + def test_save_to_file_and_load_from_file(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + file_path = os.path.join(tmp_dir, "test_file.json") + + data_to_save = {"key": "value"} + base_structure = BaseStructure() + + base_structure.save_to_file(data_to_save, file_path) + loaded_data = base_structure.load_from_file(file_path) + + assert loaded_data == data_to_save + + def test_save_metadata_and_load_metadata(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + base_structure = BaseStructure(save_metadata_path=tmp_dir) + + metadata = {"name": "Test", "description": "Test metadata"} + base_structure.save_metadata(metadata) + loaded_metadata = base_structure.load_metadata() + + assert loaded_metadata == metadata + + def test_log_error(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + base_structure = BaseStructure(save_error_path=tmp_dir) + + error_message = "Test error message" + base_structure.log_error(error_message) + + log_file = os.path.join(tmp_dir, "TestStructure_errors.log") + with open(log_file, "r") as file: + lines = file.readlines() + assert len(lines) == 1 + assert lines[0] == f"{error_message}\n" + + def test_save_artifact_and_load_artifact(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + base_structure = BaseStructure(save_artifact_path=tmp_dir) + + artifact = {"key": "value"} + artifact_name = "test_artifact" + base_structure.save_artifact(artifact, artifact_name) + loaded_artifact = base_structure.load_artifact(artifact_name) + + assert loaded_artifact == artifact + + def test_current_timestamp(self): + base_structure = BaseStructure() + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + timestamp = base_structure._current_timestamp() + assert timestamp == current_time + + def test_log_event(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + base_structure = BaseStructure(save_metadata_path=tmp_dir) + + event = "Test event" + event_type = "INFO" + base_structure.log_event(event, event_type) + + log_file = os.path.join(tmp_dir, "TestStructure_events.log") + with open(log_file, "r") as file: + lines = file.readlines() + assert len(lines) == 1 + assert lines[0] == f"[{base_structure._current_timestamp()}] [{event_type}] {event}\n" + + @pytest.mark.asyncio + async def test_run_async(self): + base_structure = BaseStructure() + + async def async_function(): + return "Async Test Result" + + result = await base_structure.run_async(async_function) + assert result == "Async Test Result" + + @pytest.mark.asyncio + async def test_save_metadata_async(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + base_structure = BaseStructure(save_metadata_path=tmp_dir) + + metadata = {"name": "Test", "description": "Test metadata"} + await base_structure.save_metadata_async(metadata) + loaded_metadata = base_structure.load_metadata() + + assert loaded_metadata == metadata + + @pytest.mark.asyncio + async def test_log_error_async(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + base_structure = BaseStructure(save_error_path=tmp_dir) + + error_message = "Test error message" + await base_structure.log_error_async(error_message) + + log_file = os.path.join(tmp_dir, "TestStructure_errors.log") + with open(log_file, "r") as file: + lines = file.readlines() + assert len(lines) == 1 + assert lines[0] == f"{error_message}\n" + + @pytest.mark.asyncio + async def test_save_artifact_async(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + base_structure = BaseStructure(save_artifact_path=tmp_dir) + + artifact = {"key": "value"} + artifact_name = "test_artifact" + await base_structure.save_artifact_async(artifact, artifact_name) + loaded_artifact = base_structure.load_artifact(artifact_name) + + assert loaded_artifact == artifact + + @pytest.mark.asyncio + async def test_load_artifact_async(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + base_structure = BaseStructure(save_artifact_path=tmp_dir) + + artifact = {"key": "value"} + artifact_name = "test_artifact" + base_structure.save_artifact(artifact, artifact_name) + loaded_artifact = await base_structure.load_artifact_async(artifact_name) + + assert loaded_artifact == artifact + + @pytest.mark.asyncio + async def test_log_event_async(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + base_structure = BaseStructure(save_metadata_path=tmp_dir) + + event = "Test event" + event_type = "INFO" + await base_structure.log_event_async(event, event_type) + + log_file = os.path.join(tmp_dir, "TestStructure_events.log") + with open(log_file, "r") as file: + lines = file.readlines() + assert len(lines) == 1 + assert lines[0] == f"[{base_structure._current_timestamp()}] [{event_type}] {event}\n" + + @pytest.mark.asyncio + async def test_asave_to_file(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + file_path = os.path.join(tmp_dir, "test_file.json") + data_to_save = {"key": "value"} + base_structure = BaseStructure() + + await base_structure.asave_to_file(data_to_save, file_path) + loaded_data = base_structure.load_from_file(file_path) + + assert loaded_data == data_to_save + + @pytest.mark.asyncio + async def test_aload_from_file(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + file_path = os.path.join(tmp_dir, "test_file.json") + data_to_save = {"key": "value"} + base_structure = BaseStructure() + base_structure.save_to_file(data_to_save, file_path) + + loaded_data = await base_structure.aload_from_file(file_path) + assert loaded_data == data_to_save + + def test_run_in_thread(self): + base_structure = BaseStructure() + result = base_structure.run_in_thread(lambda: "Thread Test Result") + assert result.result() == "Thread Test Result" + + def test_save_and_decompress_data(self): + base_structure = BaseStructure() + data = {"key": "value"} + compressed_data = base_structure.compress_data(data) + decompressed_data = base_structure.decompres_data(compressed_data) + assert decompressed_data == data + + def test_run_batched(self): + base_structure = BaseStructure() + + def run_function(data): + return f"Processed {data}" + + batched_data = list(range(10)) + result = base_structure.run_batched(batched_data, batch_size=5, func=run_function) + + expected_result = [f"Processed {data}" for data in batched_data] + assert result == expected_result + + def test_load_config(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + config_file = os.path.join(tmp_dir, "config.json") + config_data = {"key": "value"} + base_structure = BaseStructure() + + base_structure.save_to_file(config_data, config_file) + loaded_config = base_structure.load_config(config_file) + + assert loaded_config == config_data + + def test_backup_data(self, tmpdir): + tmp_dir = tmpdir.mkdir("test_dir") + base_structure = BaseStructure() + data_to_backup = {"key": "value"} + base_structure.backup_data(data_to_backup, backup_path=tmp_dir) + backup_files = os.listdir(tmp_dir) + + assert len(backup_files) == 1 + loaded_data = base_structure.load_from_file(os.path.join(tmp_dir, backup_files[0])) + assert loaded_data == data_to_backup + + def test_monitor_resources(self): + base_structure = BaseStructure() + base_structure.monitor_resources() + + def test_run_with_resources(self): + base_structure = BaseStructure() + + def run_function(): + base_structure.monitor_resources() + return "Resource Test Result" + + result = base_structure.run_with_resources(run_function) + assert result == "Resource Test Result" + + def test_run_with_resources_batched(self): + base_structure = BaseStructure() + + def run_function(data): + base_structure.monitor_resources() + return f"Processed {data}" + + batched_data = list(range(10)) + result = base_structure.run_with_resources_batched(batched_data, batch_size=5, func=run_function) + + expected_result = [f"Processed {data}" for data in batched_data] + assert result == expected_result