[BaseMultiModalModel] [BaseSwarm] [BaseStructure]

pull/294/head^2
Kye 1 year ago
parent 7c740ad19c
commit ee5786cdf2

@ -38,7 +38,7 @@ faiss-cpu = "*"
backoff = "*"
marshmallow = "*"
datasets = "*"
optimum = "*"
optimum = "1.15.0"
diffusers = "*"
PyPDF2 = "*"
vllm = "*"

@ -317,13 +317,13 @@ class BaseMultiModalModel:
content = colored(content, color)
print(content)
def stream(self, content: str):
def stream_response(self, text: str):
"""Stream the output
Args:
content (str): _description_
"""
for chunk in content:
for chunk in text:
print(chunk)
def meta_prompt(self):

@ -112,10 +112,15 @@ class GPT4VisionAPI(BaseMultiModalModel):
def download_img_then_encode(self, img: str):
"""Download image from URL then encode image to base64 using requests"""
pass
if not os.path.exists(img):
print(f"Image file not found: {img}")
return None
response = requests.get(img)
return base64.b64encode(response.content).decode("utf-8")
# Function to handle vision tasks
def run(self, img: str, task: str, *args, **kwargs):
def run(self, task: str = None, img: str = None, *args, **kwargs):
"""Run the model."""
try:
base64_image = self.encode_image(img)
@ -162,7 +167,10 @@ class GPT4VisionAPI(BaseMultiModalModel):
return None
except Exception as error:
print(f"Error with the request: {error}")
print(
f"Error with the request: {error}, make sure you"
" double check input types and positions"
)
return None
def video_prompt(self, frames):
@ -207,7 +215,7 @@ class GPT4VisionAPI(BaseMultiModalModel):
for chunk in content:
print(chunk)
def process_video(self, video: str):
def process_video(self, video: str = None):
"""
Process a video into a list of base64 frames
@ -252,9 +260,51 @@ class GPT4VisionAPI(BaseMultiModalModel):
*args,
**kwargs,
):
self.video_prompt(self.process_video(video))
prompt = self.video_prompt(self.process_video(video))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {openai_api_key}",
}
payload = {
"model": self.model_name,
"messages": [
{
"role": "system",
"content": [self.system_prompt],
},
{
"role": "user",
"content": [
(task,), # task
*map(
lambda x: {"image": x, "resize": 768},
prompt[0::50],
),
],
},
],
"max_tokens": self.max_tokens,
}
response = requests.post(
self.openai_proxy,
headers=headers,
json=payload,
)
out = response.json()
content = out["choices"][0]["message"]["content"]
if self.streaming_enabled:
content = self.stream_response(content)
else:
pass
if self.beautify:
content = colored(content, "cyan")
print(content)
else:
print(content)
def __call__(
self,
task: Optional[str] = None,

@ -1,5 +1,388 @@
import json
import os
from abc import ABC, abstractmethod
from typing import Optional, Any, Dict, List
from datetime import datetime
import asyncio
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import psutil
try:
import gzip
except ImportError as error:
print(f"Error importing gzip: {error}")
class BaseStructure(ABC):
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
save_metadata: bool = True,
save_artifact_path: Optional[str] = "./artifacts",
save_metadata_path: Optional[str] = "./metadata",
save_error_path: Optional[str] = "./errors",
*args,
**kwargs,
):
self.name = name
self.description = description
self.save_metadata = save_metadata
self.save_artifact_path = save_artifact_path
self.save_metadata_path = save_metadata_path
self.save_error_path = save_error_path
@abstractmethod
def run(self, *args, **kwargs):
"""Run the structure."""
pass
def save_to_file(self, data: Any, file_path: str):
"""Save data to file.
Args:
data (Any): _description_
file_path (str): _description_
"""
with open(file_path, "w") as file:
json.dump(data, file)
def load_from_file(self, file_path: str) -> Any:
"""Load data from file.
Args:
file_path (str): _description_
Returns:
Any: _description_
"""
with open(file_path, "r") as file:
return json.load(file)
def save_metadata(self, metadata: Dict[str, Any]):
"""Save metadata to file.
Args:
metadata (Dict[str, Any]): _description_
"""
if self.save_metadata:
file_path = os.path.join(
self.save_metadata_path, f"{self.name}_metadata.json"
)
self.save_to_file(metadata, file_path)
def load_metadata(self) -> Dict[str, Any]:
"""Load metadata from file.
Returns:
Dict[str, Any]: _description_
"""
file_path = os.path.join(
self.save_metadata_path, f"{self.name}_metadata.json"
)
return self.load_from_file(file_path)
def log_error(self, error_message: str):
"""Log error to file.
Args:
error_message (str): _description_
"""
file_path = os.path.join(
self.save_error_path, f"{self.name}_errors.log"
)
with open(file_path, "a") as file:
file.write(f"{error_message}\n")
def save_artifact(self, artifact: Any, artifact_name: str):
"""Save artifact to file.
Args:
artifact (Any): _description_
artifact_name (str): _description_
"""
file_path = os.path.join(
self.save_artifact_path, f"{artifact_name}.json"
)
self.save_to_file(artifact, file_path)
def load_artifact(self, artifact_name: str) -> Any:
"""Load artifact from file.
Args:
artifact_name (str): _description_
Returns:
Any: _description_
"""
file_path = os.path.join(
self.save_artifact_path, f"{artifact_name}.json"
)
return self.load_from_file(file_path)
def _current_timestamp(self):
"""Current timestamp.
Returns:
_type_: _description_
"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def log_event(
self,
event: str,
event_type: str = "INFO",
):
"""Log event to file.
Args:
event (str): _description_
event_type (str, optional): _description_. Defaults to "INFO".
"""
Base Structure for all Swarm Structures
timestamp = self._current_timestamp()
log_message = f"[{timestamp}] [{event_type}] {event}\n"
file = os.path.join(
self.save_metadata_path, f"{self.name}_events.log"
)
with open(file, "a") as file:
file.write(log_message)
async def run_async(self, *args, **kwargs):
"""Run the structure asynchronously."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.run, *args, **kwargs
)
async def save_metadata_async(self, metadata: Dict[str, Any]):
"""Save metadata to file asynchronously.
Args:
metadata (Dict[str, Any]): _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.save_metadata, metadata
)
async def load_metadata_async(self) -> Dict[str, Any]:
"""Load metadata from file asynchronously.
Returns:
Dict[str, Any]: _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.load_metadata)
async def log_error_async(self, error_message: str):
"""Log error to file asynchronously.
Args:
error_message (str): _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.log_error, error_message
)
async def save_artifact_async(
self, artifact: Any, artifact_name: str
):
"""Save artifact to file asynchronously.
Args:
artifact (Any): _description_
artifact_name (str): _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.save_artifact, artifact, artifact_name
)
async def load_artifact_async(self, artifact_name: str) -> Any:
"""Load artifact from file asynchronously.
Args:
artifact_name (str): _description_
Returns:
Any: _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.load_artifact, artifact_name
)
async def log_event_async(
self,
event: str,
event_type: str = "INFO",
):
"""Log event to file asynchronously.
Args:
event (str): _description_
event_type (str, optional): _description_. Defaults to "INFO".
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.log_event, event, event_type
)
async def asave_to_file(
self, data: Any, file: str, *args, **kwargs
):
"""Save data to file asynchronously.
Args:
data (Any): _description_
file (str): _description_
"""
await asyncio.to_thread(
self.save_to_file,
data,
file,
*args,
)
async def aload_from_file(
self,
file: str,
) -> Any:
"""Async load data from file.
Args:
file (str): _description_
Returns:
Any: _description_
"""
return await asyncio.to_thread(self.load_from_file, file)
def run_in_thread(self, *args, **kwargs):
"""Run the structure in a thread."""
with concurrent.futures.ThreadPoolExecutor() as executor:
return executor.submit(self.run, *args, **kwargs)
def save_metadata_in_thread(self, metadata: Dict[str, Any]):
"""Save metadata to file in a thread.
Args:
metadata (Dict[str, Any]): _description_
"""
with concurrent.futures.ThreadPoolExecutor() as executor:
return executor.submit(self.save_metadata, metadata)
def run_concurrent(self, *args, **kwargs):
"""Run the structure concurrently."""
return asyncio.run(self.run_async(*args, **kwargs))
def compress_data(
self,
data: Any,
) -> bytes:
"""Compress data.
Args:
data (Any): _description_
Returns:
bytes: _description_
"""
return gzip.compress(json.dumps(data).encode())
def decompres_data(self, data: bytes) -> Any:
"""Decompress data.
Args:
data (bytes): _description_
Returns:
Any: _description_
"""
return json.loads(gzip.decompress(data).decode())
def run_batched(
self,
batched_data: List[Any],
batch_size: int = 10,
*args,
**kwargs,
):
"""Run batched data.
Args:
batched_data (List[Any]): _description_
batch_size (int, optional): _description_. Defaults to 10.
Returns:
_type_: _description_
"""
with ThreadPoolExecutor(max_workers=batch_size) as executor:
futures = [
executor.submit(self.run, data)
for data in batched_data
]
return [future.result() for future in futures]
def load_config(
self, config: str = None, *args, **kwargs
) -> Dict[str, Any]:
"""Load config from file.
Args:
config (str, optional): _description_. Defaults to None.
Returns:
Dict[str, Any]: _description_
"""
return self.load_from_file(config)
def backup_data(
self, data: Any, backup_path: str = None, *args, **kwargs
):
"""Backup data to file.
Args:
data (Any): _description_
backup_path (str, optional): _description_. Defaults to None.
"""
timestamp = self._current_timestamp()
backup_file_path = f"{backup_path}/{timestamp}.json"
self.save_to_file(data, backup_file_path)
def monitor_resources(self):
"""Monitor resource usage."""
memory = psutil.virtual_memory().percent
cpu_usage = psutil.cpu_percent(interval=1)
self.log_event(
f"Resource usage - Memory: {memory}%, CPU: {cpu_usage}%"
)
def run_with_resources(self, *args, **kwargs):
"""Run the structure with resource monitoring."""
self.monitor_resources()
return self.run(*args, **kwargs)
def run_with_resources_batched(
self,
batched_data: List[Any],
batch_size: int = 10,
*args,
**kwargs,
):
"""Run batched data with resource monitoring.
Args:
batched_data (List[Any]): _description_
batch_size (int, optional): _description_. Defaults to 10.
Returns:
_type_: _description_
"""
self.monitor_resources()
return self.run_batched(
batched_data, batch_size, *args, **kwargs
)

@ -0,0 +1,259 @@
import pytest
import os
from datetime import datetime
from swarms.swarms.base import BaseStructure
class TestBaseStructure:
def test_init(self):
base_structure = BaseStructure(
name="TestStructure",
description="Test description",
save_metadata=True,
save_artifact_path="./test_artifacts",
save_metadata_path="./test_metadata",
save_error_path="./test_errors",
)
assert base_structure.name == "TestStructure"
assert base_structure.description == "Test description"
assert base_structure.save_metadata is True
assert base_structure.save_artifact_path == "./test_artifacts"
assert base_structure.save_metadata_path == "./test_metadata"
assert base_structure.save_error_path == "./test_errors"
def test_save_to_file_and_load_from_file(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
file_path = os.path.join(tmp_dir, "test_file.json")
data_to_save = {"key": "value"}
base_structure = BaseStructure()
base_structure.save_to_file(data_to_save, file_path)
loaded_data = base_structure.load_from_file(file_path)
assert loaded_data == data_to_save
def test_save_metadata_and_load_metadata(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
base_structure = BaseStructure(save_metadata_path=tmp_dir)
metadata = {"name": "Test", "description": "Test metadata"}
base_structure.save_metadata(metadata)
loaded_metadata = base_structure.load_metadata()
assert loaded_metadata == metadata
def test_log_error(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
base_structure = BaseStructure(save_error_path=tmp_dir)
error_message = "Test error message"
base_structure.log_error(error_message)
log_file = os.path.join(tmp_dir, "TestStructure_errors.log")
with open(log_file, "r") as file:
lines = file.readlines()
assert len(lines) == 1
assert lines[0] == f"{error_message}\n"
def test_save_artifact_and_load_artifact(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
base_structure = BaseStructure(save_artifact_path=tmp_dir)
artifact = {"key": "value"}
artifact_name = "test_artifact"
base_structure.save_artifact(artifact, artifact_name)
loaded_artifact = base_structure.load_artifact(artifact_name)
assert loaded_artifact == artifact
def test_current_timestamp(self):
base_structure = BaseStructure()
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
timestamp = base_structure._current_timestamp()
assert timestamp == current_time
def test_log_event(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
base_structure = BaseStructure(save_metadata_path=tmp_dir)
event = "Test event"
event_type = "INFO"
base_structure.log_event(event, event_type)
log_file = os.path.join(tmp_dir, "TestStructure_events.log")
with open(log_file, "r") as file:
lines = file.readlines()
assert len(lines) == 1
assert lines[0] == f"[{base_structure._current_timestamp()}] [{event_type}] {event}\n"
@pytest.mark.asyncio
async def test_run_async(self):
base_structure = BaseStructure()
async def async_function():
return "Async Test Result"
result = await base_structure.run_async(async_function)
assert result == "Async Test Result"
@pytest.mark.asyncio
async def test_save_metadata_async(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
base_structure = BaseStructure(save_metadata_path=tmp_dir)
metadata = {"name": "Test", "description": "Test metadata"}
await base_structure.save_metadata_async(metadata)
loaded_metadata = base_structure.load_metadata()
assert loaded_metadata == metadata
@pytest.mark.asyncio
async def test_log_error_async(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
base_structure = BaseStructure(save_error_path=tmp_dir)
error_message = "Test error message"
await base_structure.log_error_async(error_message)
log_file = os.path.join(tmp_dir, "TestStructure_errors.log")
with open(log_file, "r") as file:
lines = file.readlines()
assert len(lines) == 1
assert lines[0] == f"{error_message}\n"
@pytest.mark.asyncio
async def test_save_artifact_async(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
base_structure = BaseStructure(save_artifact_path=tmp_dir)
artifact = {"key": "value"}
artifact_name = "test_artifact"
await base_structure.save_artifact_async(artifact, artifact_name)
loaded_artifact = base_structure.load_artifact(artifact_name)
assert loaded_artifact == artifact
@pytest.mark.asyncio
async def test_load_artifact_async(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
base_structure = BaseStructure(save_artifact_path=tmp_dir)
artifact = {"key": "value"}
artifact_name = "test_artifact"
base_structure.save_artifact(artifact, artifact_name)
loaded_artifact = await base_structure.load_artifact_async(artifact_name)
assert loaded_artifact == artifact
@pytest.mark.asyncio
async def test_log_event_async(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
base_structure = BaseStructure(save_metadata_path=tmp_dir)
event = "Test event"
event_type = "INFO"
await base_structure.log_event_async(event, event_type)
log_file = os.path.join(tmp_dir, "TestStructure_events.log")
with open(log_file, "r") as file:
lines = file.readlines()
assert len(lines) == 1
assert lines[0] == f"[{base_structure._current_timestamp()}] [{event_type}] {event}\n"
@pytest.mark.asyncio
async def test_asave_to_file(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
file_path = os.path.join(tmp_dir, "test_file.json")
data_to_save = {"key": "value"}
base_structure = BaseStructure()
await base_structure.asave_to_file(data_to_save, file_path)
loaded_data = base_structure.load_from_file(file_path)
assert loaded_data == data_to_save
@pytest.mark.asyncio
async def test_aload_from_file(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
file_path = os.path.join(tmp_dir, "test_file.json")
data_to_save = {"key": "value"}
base_structure = BaseStructure()
base_structure.save_to_file(data_to_save, file_path)
loaded_data = await base_structure.aload_from_file(file_path)
assert loaded_data == data_to_save
def test_run_in_thread(self):
base_structure = BaseStructure()
result = base_structure.run_in_thread(lambda: "Thread Test Result")
assert result.result() == "Thread Test Result"
def test_save_and_decompress_data(self):
base_structure = BaseStructure()
data = {"key": "value"}
compressed_data = base_structure.compress_data(data)
decompressed_data = base_structure.decompres_data(compressed_data)
assert decompressed_data == data
def test_run_batched(self):
base_structure = BaseStructure()
def run_function(data):
return f"Processed {data}"
batched_data = list(range(10))
result = base_structure.run_batched(batched_data, batch_size=5, func=run_function)
expected_result = [f"Processed {data}" for data in batched_data]
assert result == expected_result
def test_load_config(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
config_file = os.path.join(tmp_dir, "config.json")
config_data = {"key": "value"}
base_structure = BaseStructure()
base_structure.save_to_file(config_data, config_file)
loaded_config = base_structure.load_config(config_file)
assert loaded_config == config_data
def test_backup_data(self, tmpdir):
tmp_dir = tmpdir.mkdir("test_dir")
base_structure = BaseStructure()
data_to_backup = {"key": "value"}
base_structure.backup_data(data_to_backup, backup_path=tmp_dir)
backup_files = os.listdir(tmp_dir)
assert len(backup_files) == 1
loaded_data = base_structure.load_from_file(os.path.join(tmp_dir, backup_files[0]))
assert loaded_data == data_to_backup
def test_monitor_resources(self):
base_structure = BaseStructure()
base_structure.monitor_resources()
def test_run_with_resources(self):
base_structure = BaseStructure()
def run_function():
base_structure.monitor_resources()
return "Resource Test Result"
result = base_structure.run_with_resources(run_function)
assert result == "Resource Test Result"
def test_run_with_resources_batched(self):
base_structure = BaseStructure()
def run_function(data):
base_structure.monitor_resources()
return f"Processed {data}"
batched_data = list(range(10))
result = base_structure.run_with_resources_batched(batched_data, batch_size=5, func=run_function)
expected_result = [f"Processed {data}" for data in batched_data]
assert result == expected_result
Loading…
Cancel
Save