diff --git a/swarms/memory/qdrant.py b/swarms/memory/qdrant.py index 40f9979c..0a553a16 100644 --- a/swarms/memory/qdrant.py +++ b/swarms/memory/qdrant.py @@ -1,4 +1,3 @@ -import subprocess from typing import List from httpx import RequestError @@ -8,9 +7,6 @@ try: except ImportError: print("Please install the sentence-transformers package") print("pip install sentence-transformers") - print("pip install qdrant-client") - subprocess.run(["pip", "install", "sentence-transformers"]) - try: from qdrant_client import QdrantClient @@ -22,7 +18,6 @@ try: except ImportError: print("Please install the qdrant-client package") print("pip install qdrant-client") - subprocess.run(["pip", "install", "qdrant-client"]) class Qdrant: diff --git a/swarms/models/__init__.py b/swarms/models/__init__.py index 9ab84e5e..58701f64 100644 --- a/swarms/models/__init__.py +++ b/swarms/models/__init__.py @@ -41,6 +41,14 @@ from swarms.models.zeroscope import ZeroscopeTTV # noqa: E402 # from swarms.models.whisperx_model import WhisperX # noqa: E402 # from swarms.models.kosmos_two import Kosmos # noqa: E402 +from swarms.models.types import ( + TextModality, + ImageModality, + AudioModality, + VideoModality, + MultimodalData, +) # noqa: E402 + __all__ = [ "AbstractLLM", "Anthropic", @@ -49,7 +57,7 @@ __all__ = [ "OpenAI", "AzureOpenAI", "OpenAIChat", - # "Zephyr", + "Zephyr", "BaseMultiModalModel", "Idefics", # "Kosmos", @@ -70,4 +78,9 @@ __all__ = [ "Gigabind", "Mixtral", "ZeroscopeTTV", + "TextModality", + "ImageModality", + "AudioModality", + "VideoModality", + "MultimodalData", ] diff --git a/swarms/models/diffusers_general.py b/swarms/models/diffusers_general.py new file mode 100644 index 00000000..9d7ea250 --- /dev/null +++ b/swarms/models/diffusers_general.py @@ -0,0 +1 @@ +# Base implementation for the diffusers library diff --git a/swarms/models/types.py b/swarms/models/types.py new file mode 100644 index 00000000..460d0ef7 --- /dev/null +++ b/swarms/models/types.py @@ -0,0 +1,28 @@ +from pydantic import BaseModel +from typing import List, Optional + + +class TextModality(BaseModel): + content: str + + +class ImageModality(BaseModel): + url: str + alt_text: Optional[str] + + +class AudioModality(BaseModel): + url: str + transcript: Optional[str] + + +class VideoModality(BaseModel): + url: str + transcript: Optional[str] + + +class MultimodalData(BaseModel): + text: Optional[List[TextModality]] + images: Optional[List[ImageModality]] + audio: Optional[List[AudioModality]] + video: Optional[List[VideoModality]] diff --git a/swarms/structs/README.md b/swarms/structs/SWARMS.md similarity index 100% rename from swarms/structs/README.md rename to swarms/structs/SWARMS.md diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 5de3b948..f0388493 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -21,6 +21,7 @@ from swarms.structs.utils import ( find_token_in_text, parse_tasks, ) +from swarms.structs.concurrent_workflow import ConcurrentWorkflow __all__ = [ "Agent", @@ -43,4 +44,5 @@ __all__ = [ "find_token_in_text", "extract_key_from_json", "extract_tokens_from_text", + "ConcurrentWorkflow", ] diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py new file mode 100644 index 00000000..05a595e6 --- /dev/null +++ b/swarms/structs/concurrent_workflow.py @@ -0,0 +1,96 @@ +import concurrent.futures +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +from swarms.structs.base import BaseStruct +from swarms.structs.task import Task + + +@dataclass +class ConcurrentWorkflow(BaseStruct): + """ + ConcurrentWorkflow class for running a set of tasks concurrently using N number of autonomous agents. + + Args: + max_workers (int): The maximum number of workers to use for concurrent execution. + autosave (bool): Whether to autosave the workflow state. + saved_state_filepath (Optional[str]): The file path to save the workflow state. + + Attributes: + tasks (List[Task]): The list of tasks to execute. + max_workers (int): The maximum number of workers to use for concurrent execution. + autosave (bool): Whether to autosave the workflow state. + saved_state_filepath (Optional[str]): The file path to save the workflow state. + + Examples: + >>> from swarms.models import OpenAIChat + >>> from swarms.structs import ConcurrentWorkflow + >>> llm = OpenAIChat(openai_api_key="") + >>> workflow = ConcurrentWorkflow(max_workers=5) + >>> workflow.add("What's the weather in miami", llm) + >>> workflow.add("Create a report on these metrics", llm) + >>> workflow.run() + >>> workflow.tasks + """ + + tasks: List[Dict] = field(default_factory=list) + max_workers: int = 5 + autosave: bool = False + saved_state_filepath: Optional[str] = ( + "runs/concurrent_workflow.json" + ) + print_results: bool = False + return_results: bool = False + use_processes: bool = False + + def add(self, task: Task): + """Adds a task to the workflow. + + Args: + task (Task): _description_ + """ + self.tasks.append(task) + + def run(self): + """ + Executes the tasks in parallel using a ThreadPoolExecutor. + + Args: + print_results (bool): Whether to print the results of each task. Default is False. + return_results (bool): Whether to return the results of each task. Default is False. + + Returns: + List[Any]: A list of the results of each task, if return_results is True. Otherwise, returns None. + """ + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.max_workers + ) as executor: + futures = { + executor.submit(task.execute): task + for task in self.tasks + } + results = [] + + for future in concurrent.futures.as_completed(futures): + task = futures[future] + try: + result = future.result() + if self.print_results: + print(f"Task {task}: {result}") + if self.return_results: + results.append(result) + except Exception as e: + print(f"Task {task} generated an exception: {e}") + + return results if self.return_results else None + + def _execute_task(self, task: Task): + """Executes a task. + + Args: + task (Task): _description_ + + Returns: + _type_: _description_ + """ + return task.run() diff --git a/swarms/structs/groupchat.py b/swarms/structs/groupchat.py index f3677023..21fff944 100644 --- a/swarms/structs/groupchat.py +++ b/swarms/structs/groupchat.py @@ -140,7 +140,6 @@ class GroupChatManager: >>> from swarms import GroupChatManager >>> from swarms.structs.agent import Agent >>> agents = Agent() - >>> output = GroupChatManager(agents, lambda x: x) """ diff --git a/tests/structs/test_concurrent_workflow.py b/tests/structs/test_concurrent_workflow.py new file mode 100644 index 00000000..206e8e2a --- /dev/null +++ b/tests/structs/test_concurrent_workflow.py @@ -0,0 +1,56 @@ +from unittest.mock import Mock, create_autospec, patch +from concurrent.futures import Future +from swarms.structs import ConcurrentWorkflow, Task, Agent + + +def test_add(): + workflow = ConcurrentWorkflow(max_workers=2) + task = Mock(spec=Task) + workflow.add(task) + assert task in workflow.tasks + + +def test_run(): + workflow = ConcurrentWorkflow(max_workers=2) + task1 = create_autospec(Task) + task2 = create_autospec(Task) + workflow.add(task1) + workflow.add(task2) + + with patch( + "concurrent.futures.ThreadPoolExecutor" + ) as mock_executor: + future1 = Future() + future1.set_result(None) + future2 = Future() + future2.set_result(None) + + mock_executor.return_value.__enter__.return_value.submit.side_effect = [ + future1, + future2, + ] + mock_executor.return_value.__enter__.return_value.as_completed.return_value = [ + future1, + future2, + ] + + workflow.run() + + task1.execute.assert_called_once() + task2.execute.assert_called_once() + + +def test_execute_task(): + workflow = ConcurrentWorkflow(max_workers=2) + task = create_autospec(Task) + workflow._execute_task(task) + task.execute.assert_called_once() + + +def test_agent_execution(): + workflow = ConcurrentWorkflow(max_workers=2) + agent = create_autospec(Agent) + task = Task(agent) + workflow.add(task) + workflow._execute_task(task) + agent.execute.assert_called_once()