From e719e83912641711d7d5e8620a2a7b0e3e0b9e5d Mon Sep 17 00:00:00 2001 From: Kye Date: Mon, 11 Dec 2023 12:36:21 -0800 Subject: [PATCH] [OpenAITTS][AbstractWorker FIX] [CHORES] --- .../personal_stylist/personal_stylist.py | 30 ++++- swarms/models/__init__.py | 7 +- swarms/models/base_multimodal_model.py | 44 ++++++++ swarms/models/gpt4_vision_api.py | 99 ++++------------- swarms/models/openai_tts.py | 87 +++++++++++++++ swarms/swarms/base.py | 104 ++++++++++-------- tests/models/test_openaitts.py | 79 +++++++++++++ tts_speech.py | 10 ++ 8 files changed, 329 insertions(+), 131 deletions(-) create mode 100644 swarms/models/openai_tts.py create mode 100644 tests/models/test_openaitts.py create mode 100644 tts_speech.py diff --git a/playground/demos/personal_stylist/personal_stylist.py b/playground/demos/personal_stylist/personal_stylist.py index 838c72be..b8641aa3 100644 --- a/playground/demos/personal_stylist/personal_stylist.py +++ b/playground/demos/personal_stylist/personal_stylist.py @@ -64,23 +64,43 @@ accessories_stylist_agent = Agent( # Run agents with respective tasks haircut_suggestions = haircut_stylist_agent.run( - "Suggest suitable haircuts for this user, considering their face shape and hair type.", user_selfie + ( + "Suggest suitable haircuts for this user, considering their" + " face shape and hair type." + ), + user_selfie, ) # Run Makeup or Beard agent based on gender if user_gender == "woman": makeup_suggestions = makeup_or_beard_stylist_agent.run( - "Recommend makeup styles for this user, complementing their features.", user_selfie + ( + "Recommend makeup styles for this user, complementing" + " their features." + ), + user_selfie, ) elif user_gender == "man": beard_suggestions = makeup_or_beard_stylist_agent.run( - "Provide beard styling advice for this user, considering their face shape.", user_selfie + ( + "Provide beard styling advice for this user, considering" + " their face shape." + ), + user_selfie, ) clothing_suggestions = clothing_stylist_agent.run( - "Match clothing styles and colors for this user, using color matching principles.", clothes_image + ( + "Match clothing styles and colors for this user, using color" + " matching principles." + ), + clothes_image, ) accessories_suggestions = accessories_stylist_agent.run( - "Suggest accessories to complement this user's outfit, considering the overall style.", clothes_image + ( + "Suggest accessories to complement this user's outfit," + " considering the overall style." + ), + clothes_image, ) diff --git a/swarms/models/__init__.py b/swarms/models/__init__.py index caccc7ab..ab82b1f4 100644 --- a/swarms/models/__init__.py +++ b/swarms/models/__init__.py @@ -8,7 +8,7 @@ from swarms.models.openai_models import ( AzureOpenAI, OpenAIChat, ) # noqa: E402 -from swarms.models.vllm import vLLM # noqa: E402 +# from swarms.models.vllm import vLLM # noqa: E402 # from swarms.models.zephyr import Zephyr # noqa: E402 from swarms.models.biogpt import BioGPT # noqa: E402 @@ -29,7 +29,7 @@ from swarms.models.layoutlm_document_qa import ( LayoutLMDocumentQA, ) # noqa: E402 from swarms.models.gpt4_vision_api import GPT4VisionAPI # noqa: E402 - +from swarms.models.openai_tts import OpenAITTS # noqa: E402 # from swarms.models.gpt4v import GPT4Vision # from swarms.models.dalle3 import Dalle3 @@ -60,5 +60,6 @@ __all__ = [ # "Dalle3", # "DistilWhisperModel", "GPT4VisionAPI", - "vLLM", + # "vLLM", + "OpenAITTS", ] diff --git a/swarms/models/base_multimodal_model.py b/swarms/models/base_multimodal_model.py index 2f6110d6..cbf4f163 100644 --- a/swarms/models/base_multimodal_model.py +++ b/swarms/models/base_multimodal_model.py @@ -11,6 +11,14 @@ import requests from PIL import Image from termcolor import colored +try: + import cv2 +except ImportError: + print( + "Error importing cv2 try installing it with `pip install" + " opencv-python`" + ) + class BaseMultiModalModel: """ @@ -126,6 +134,42 @@ class BaseMultiModalModel: image_pil = Image.open(img) return image_pil + def process_video( + self, + video_path: str = None, + type_img: str = ".jpg", + *args, + **kwargs, + ): + """Process a video + + Args: + video_path (str, optional): _description_. Defaults to None. + type_img (str, optional): _description_. Defaults to ".jpg". + *args: _description_. + **kwargs: _description_. + + """ + try: + video = cv2.VideoCapture(video_path) + + base64Frames = [] + while video.isOpened(): + success, frame = video.read() + if not success: + break + _, buffer = cv2.imencode(type_img, frame) + base64Frames.append( + base64.b64encode(buffer).decode("utf-8") + ) + + video.release() + print(len(base64Frames), "frames read") + return base64Frames + except Exception as error: + print(f"Error processing video {error} try again") + raise error + def clear_chat_history(self): """Clear the chat history""" self.chat_history = [] diff --git a/swarms/models/gpt4_vision_api.py b/swarms/models/gpt4_vision_api.py index cd6e5ddb..6bab073f 100644 --- a/swarms/models/gpt4_vision_api.py +++ b/swarms/models/gpt4_vision_api.py @@ -1,16 +1,14 @@ -import asyncio import base64 -import concurrent.futures import json import logging import os -from concurrent.futures import ThreadPoolExecutor -from typing import List, Optional, Tuple +from typing import Optional import aiohttp import requests from dotenv import load_dotenv from termcolor import colored +from swarms.models.base_multimodal_model import BaseMultiModalModel try: import cv2 @@ -32,7 +30,7 @@ You are an multi-modal autonomous agent. You are given a task and an image. You """ -class GPT4VisionAPI: +class GPT4VisionAPI(BaseMultiModalModel): """ GPT-4 Vision API @@ -81,7 +79,7 @@ class GPT4VisionAPI: *args, **kwargs, ): - super().__init__() + super(GPT4VisionAPI).__init__(*args, **kwargs) self.openai_api_key = openai_api_key self.logging_enabled = logging_enabled self.model_name = model_name @@ -117,7 +115,7 @@ class GPT4VisionAPI: pass # Function to handle vision tasks - def run(self, img, task): + def run(self, img: str, task: str, *args, **kwargs): """Run the model.""" try: base64_image = self.encode_image(img) @@ -245,9 +243,17 @@ class GPT4VisionAPI: video.release() print(len(base64_frames), "frames read.") + return base64_frames - for img in base64_frames: - base64.b64decode(img.encode("utf-8")) + def run_with_video( + self, + task: str = None, + video: str = None, + *args, + **kwargs, + ): + self.video_prompt(self.process_video(video)) + pass def __call__( self, @@ -256,7 +262,15 @@ class GPT4VisionAPI: *args, **kwargs, ): - """Run the model.""" + """Call the model + + Args: + task (Optional[str], optional): _description_. Defaults to None. + img (Optional[str], optional): _description_. Defaults to None. + + Raises: + error: _description_ + """ try: base64_image = self.encode_image(img) headers = { @@ -309,35 +323,6 @@ class GPT4VisionAPI: print(f"Error with the request: {error}") raise error - def run_many( - self, - tasks: List[str], - imgs: List[str], - ): - """ - Run the model on multiple tasks and images all at once using concurrent - - Args: - tasks (List[str]): List of tasks - imgs (List[str]): List of image paths - - Returns: - List[str]: List of responses - - - """ - # Instantiate the thread pool executor - with ThreadPoolExecutor( - max_workers=self.max_workers - ) as executor: - results = executor.map(self.run, tasks, imgs) - - # Print the results for debugging - for result in results: - print(result) - - return list(results) - async def arun( self, task: Optional[str] = None, @@ -396,42 +381,6 @@ class GPT4VisionAPI: print(f"Error with the request {error}") raise error - def run_batch( - self, tasks_images: List[Tuple[str, str]] - ) -> List[str]: - """Process a batch of tasks and images""" - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [ - executor.submit(self.run, task, img) - for task, img in tasks_images - ] - results = [future.result() for future in futures] - return results - - async def run_batch_async( - self, tasks_images: List[Tuple[str, str]] - ) -> List[str]: - """Process a batch of tasks and images asynchronously""" - loop = asyncio.get_event_loop() - futures = [ - loop.run_in_executor(None, self.run, task, img) - for task, img in tasks_images - ] - return await asyncio.gather(*futures) - - async def run_batch_async_with_retries( - self, tasks_images: List[Tuple[str, str]] - ) -> List[str]: - """Process a batch of tasks and images asynchronously with retries""" - loop = asyncio.get_event_loop() - futures = [ - loop.run_in_executor( - None, self.run_with_retries, task, img - ) - for task, img in tasks_images - ] - return await asyncio.gather(*futures) - def health_check(self): """Health check for the GPT4Vision model""" try: diff --git a/swarms/models/openai_tts.py b/swarms/models/openai_tts.py new file mode 100644 index 00000000..c51e3e98 --- /dev/null +++ b/swarms/models/openai_tts.py @@ -0,0 +1,87 @@ +import os + +import openai +import requests +from dotenv import load_dotenv + +from swarms.models.base_llm import AbstractLLM + +# Load .env file +load_dotenv() + +# OpenAI API Key env +def openai_api_key_env(): + openai_api_key = os.getenv("OPENAI_API_KEY") + return openai_api_key + + +class OpenAITTS(AbstractLLM): + """OpenAI TTS model + + Attributes: + model_name (str): _description_ + proxy_url (str): _description_ + openai_api_key (str): _description_ + voice (str): _description_ + chunk_size (_type_): _description_ + + Methods: + run: _description_ + + + Examples: + >>> from swarms.models.openai_tts import OpenAITTS + >>> tts = OpenAITTS( + ... model_name = "tts-1-1106", + ... proxy_url = "https://api.openai.com/v1/audio/speech", + ... openai_api_key = openai_api_key_env, + ... voice = "onyx", + ... ) + >>> tts.run("Hello world") + + """ + def __init__( + self, + model_name: str = "tts-1-1106", + proxy_url: str = "https://api.openai.com/v1/audio/speech", + openai_api_key: str = openai_api_key_env, + voice: str = "onyx", + chunk_size = 1024 * 1024, + ): + super().__init__() + self.model_name = model_name + self.proxy_url = proxy_url + self.openai_api_key = openai_api_key + self.voice = voice + self.chunk_size = chunk_size + + def run( + self, + task: str, + *args, + **kwargs + ): + """Run the tts model + + Args: + task (str): _description_ + + Returns: + _type_: _description_ + """ + response = requests.post( + self.proxy_url, + headers = { + "Authorization": f"Bearer {self.openai_api_key}", + }, + json={ + "model": self.model_name, + "input": task, + "voice": self.voice, + }, + ) + + audio = b"" + for chunk in response.iter_content(chunk_size = 1024 * 1024): + audio += chunk + return audio \ No newline at end of file diff --git a/swarms/swarms/base.py b/swarms/swarms/base.py index d5c6eea6..afd2b9fe 100644 --- a/swarms/swarms/base.py +++ b/swarms/swarms/base.py @@ -1,14 +1,28 @@ +""" + +Paid + +# TODO: Pass in abstract LLM class that can utilize Hf or Anthropic models, Move away from OPENAI +# TODO: ADD Universal Communication Layer, a ocean vectorstore instance +# TODO: BE MORE EXPLICIT ON TOOL USE, TASK DECOMPOSITION AND TASK COMPLETETION AND ALLOCATION +# TODO: Add RLHF Data collection, ask user how the swarm is performing +# TODO: Create an onboarding process if not settings are preconfigured like `from swarms import Swarm, Swarm()` => then initiate onboarding name your swarm + provide purpose + etc + +""" + import asyncio import concurrent.futures +import logging import time from abc import ABC, abstractmethod -from typing import Any, Dict, List, Optional, Callable -from swarms.structs.agent import Agent -from swarms.agents.base import AbstractWorker from concurrent.futures import ThreadPoolExecutor, as_completed -import logging +from typing import Any, Callable, Dict, List, Optional + from termcolor import colored +from swarms.structs.agent import Agent + + class AbstractSwarm(ABC): """ @@ -23,23 +37,23 @@ class AbstractSwarm(ABC): communicate: Communicate with the swarm through the orchestrator, protocols, and the universal communication layer run: Run the swarm step: Step the swarm - add_worker: Add a worker to the swarm - remove_worker: Remove a worker from the swarm + add_agent: Add a agent to the swarm + remove_agent: Remove a agent from the swarm broadcast: Broadcast a message to all agents reset: Reset the swarm plan: agents must individually plan using a workflow or pipeline - direct_message: Send a direct message to a worker + direct_message: Send a direct message to a agent autoscaler: Autoscaler that acts like kubernetes for autonomous agents - get_worker_by_id: Locate a worker by id - get_worker_by_name: Locate a worker by name - assign_task: Assign a task to a worker + get_agent_by_id: Locate a agent by id + get_agent_by_name: Locate a agent by name + assign_task: Assign a task to a agent get_all_tasks: Get all tasks get_finished_tasks: Get all finished tasks get_pending_tasks: Get all pending tasks - pause_worker: Pause a worker - resume_worker: Resume a worker - stop_worker: Stop a worker - restart_worker: Restart worker + pause_agent: Pause a agent + resume_agent: Resume a agent + stop_agent: Stop a agent + restart_agent: Restart agent scale_up: Scale up the number of agents scale_down: Scale down the number of agents scale_to: Scale to a specific number of agents @@ -57,12 +71,6 @@ class AbstractSwarm(ABC): """ - # TODO: Pass in abstract LLM class that can utilize Hf or Anthropic models, Move away from OPENAI - # TODO: ADD Universal Communication Layer, a ocean vectorstore instance - # TODO: BE MORE EXPLICIT ON TOOL USE, TASK DECOMPOSITION AND TASK COMPLETETION AND ALLOCATION - # TODO: Add RLHF Data collection, ask user how the swarm is performing - # TODO: Create an onboarding process if not settings are preconfigured like `from swarms import Swarm, Swarm()` => then initiate onboarding name your swarm + provide purpose + etc - # @abstractmethod def __init__(self, agents: List[Agent], max_loops: int = 200): """Initialize the swarm with agents""" @@ -101,18 +109,18 @@ class AbstractSwarm(ABC): pass # @abstractmethod - def add_worker(self, worker: "AbstractWorker"): - """Add a worker to the swarm""" + def add_agent(self, agent: "Agent"): + """Add a agent to the swarm""" pass # @abstractmethod - def remove_worker(self, worker: "AbstractWorker"): - """Remove a worker from the swarm""" + def remove_agent(self, agent: "Agent"): + """Remove a agent from the swarm""" pass # @abstractmethod def broadcast( - self, message: str, sender: Optional["AbstractWorker"] = None + self, message: str, sender: Optional["Agent"] = None ): """Broadcast a message to all agents""" pass @@ -131,36 +139,36 @@ class AbstractSwarm(ABC): def direct_message( self, message: str, - sender: "AbstractWorker", - recipient: "AbstractWorker", + sender: "Agent", + recipient: "Agent", ): - """Send a direct message to a worker""" + """Send a direct message to a agent""" pass # @abstractmethod - def autoscaler(self, num_agents: int, worker: ["AbstractWorker"]): + def autoscaler(self, num_agents: int, agent: ["Agent"]): """Autoscaler that acts like kubernetes for autonomous agents""" pass # @abstractmethod - def get_worker_by_id(self, id: str) -> "AbstractWorker": - """Locate a worker by id""" + def get_agent_by_id(self, id: str) -> "Agent": + """Locate a agent by id""" pass # @abstractmethod - def get_worker_by_name(self, name: str) -> "AbstractWorker": - """Locate a worker by name""" + def get_agent_by_name(self, name: str) -> "Agent": + """Locate a agent by name""" pass # @abstractmethod def assign_task( - self, worker: "AbstractWorker", task: Any + self, agent: "Agent", task: Any ) -> Dict: - """Assign a task to a worker""" + """Assign a task to a agent""" pass # @abstractmethod - def get_all_tasks(self, worker: "AbstractWorker", task: Any): + def get_all_tasks(self, agent: "Agent", task: Any): """Get all tasks""" # @abstractmethod @@ -174,42 +182,42 @@ class AbstractSwarm(ABC): pass # @abstractmethod - def pause_worker(self, worker: "AbstractWorker", worker_id: str): - """Pause a worker""" + def pause_agent(self, agent: "Agent", agent_id: str): + """Pause a agent""" pass # @abstractmethod - def resume_worker(self, worker: "AbstractWorker", worker_id: str): - """Resume a worker""" + def resume_agent(self, agent: "Agent", agent_id: str): + """Resume a agent""" pass # @abstractmethod - def stop_worker(self, worker: "AbstractWorker", worker_id: str): - """Stop a worker""" + def stop_agent(self, agent: "Agent", agent_id: str): + """Stop a agent""" pass # @abstractmethod - def restart_worker(self, worker: "AbstractWorker"): - """Restart worker""" + def restart_agent(self, agent: "Agent"): + """Restart agent""" pass # @abstractmethod - def scale_up(self, num_worker: int): + def scale_up(self, num_agent: int): """Scale up the number of agents""" pass # @abstractmethod - def scale_down(self, num_worker: int): + def scale_down(self, num_agent: int): """Scale down the number of agents""" pass # @abstractmethod - def scale_to(self, num_worker: int): + def scale_to(self, num_agent: int): """Scale to a specific number of agents""" pass # @abstractmethod - def get_all_agents(self) -> List["AbstractWorker"]: + def get_all_agents(self) -> List["Agent"]: """Get all agents""" pass diff --git a/tests/models/test_openaitts.py b/tests/models/test_openaitts.py new file mode 100644 index 00000000..941005f8 --- /dev/null +++ b/tests/models/test_openaitts.py @@ -0,0 +1,79 @@ +import pytest +from unittest.mock import patch, MagicMock +from swarms.models.openai_tts import OpenAITTS + +def test_openaitts_initialization(): + tts = OpenAITTS() + assert isinstance(tts, OpenAITTS) + assert tts.model_name == "tts-1-1106" + assert tts.proxy_url == "https://api.openai.com/v1/audio/speech" + assert tts.voice == "onyx" + assert tts.chunk_size == 1024 * 1024 + +def test_openaitts_initialization_custom_parameters(): + tts = OpenAITTS("custom_model", "custom_url", "custom_key", "custom_voice", 2048) + assert tts.model_name == "custom_model" + assert tts.proxy_url == "custom_url" + assert tts.openai_api_key == "custom_key" + assert tts.voice == "custom_voice" + assert tts.chunk_size == 2048 + +@patch("requests.post") +def test_run(mock_post): + mock_response = MagicMock() + mock_response.iter_content.return_value = [b"chunk1", b"chunk2"] + mock_post.return_value = mock_response + tts = OpenAITTS() + audio = tts.run("Hello world") + assert audio == b"chunk1chunk2" + mock_post.assert_called_once_with( + "https://api.openai.com/v1/audio/speech", + headers={"Authorization": f"Bearer {tts.openai_api_key}"}, + json={"model": "tts-1-1106", "input": "Hello world", "voice": "onyx"}, + ) + +@patch("requests.post") +def test_run_empty_task(mock_post): + tts = OpenAITTS() + with pytest.raises(Exception): + tts.run("") + +@patch("requests.post") +def test_run_very_long_task(mock_post): + tts = OpenAITTS() + with pytest.raises(Exception): + tts.run("A" * 10000) + +@patch("requests.post") +def test_run_invalid_task(mock_post): + tts = OpenAITTS() + with pytest.raises(Exception): + tts.run(None) + +@patch("requests.post") +def test_run_custom_model(mock_post): + mock_response = MagicMock() + mock_response.iter_content.return_value = [b"chunk1", b"chunk2"] + mock_post.return_value = mock_response + tts = OpenAITTS("custom_model") + audio = tts.run("Hello world") + assert audio == b"chunk1chunk2" + mock_post.assert_called_once_with( + "https://api.openai.com/v1/audio/speech", + headers={"Authorization": f"Bearer {tts.openai_api_key}"}, + json={"model": "custom_model", "input": "Hello world", "voice": "onyx"}, + ) + +@patch("requests.post") +def test_run_custom_voice(mock_post): + mock_response = MagicMock() + mock_response.iter_content.return_value = [b"chunk1", b"chunk2"] + mock_post.return_value = mock_response + tts = OpenAITTS(voice="custom_voice") + audio = tts.run("Hello world") + assert audio == b"chunk1chunk2" + mock_post.assert_called_once_with( + "https://api.openai.com/v1/audio/speech", + headers={"Authorization": f"Bearer {tts.openai_api_key}"}, + json={"model": "tts-1-1106", "input": "Hello world", "voice": "custom_voice"}, + ) \ No newline at end of file diff --git a/tts_speech.py b/tts_speech.py new file mode 100644 index 00000000..881cca95 --- /dev/null +++ b/tts_speech.py @@ -0,0 +1,10 @@ +from swarms import OpenAITTS + +tts = OpenAITTS( + model_name = "tts-1-1106", + voice = "onyx", + openai_api_key="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" +) + +out = tts("Hello world") +print(out)