diff --git a/playground/structs/concurrent_workflow.py b/playground/structs/concurrent_workflow.py new file mode 100644 index 00000000..f152e4bb --- /dev/null +++ b/playground/structs/concurrent_workflow.py @@ -0,0 +1,26 @@ +import os +from dotenv import load_dotenv +from swarms import OpenAIChat, Task, ConcurrentWorkflow, Agent + +# Load environment variables from .env file +load_dotenv() + +# Load environment variables +llm = OpenAIChat(openai_api_key=os.getenv("OPENAI_API_KEY")) +agent = Agent(llm=llm, max_loops=1) + +# Create a workflow +workflow = ConcurrentWorkflow(max_workers=5) + +# Create tasks +task1 = Task(agent, "What's the weather in miami") +task2 = Task(agent, "What's the weather in new york") +task3 = Task(agent, "What's the weather in london") + +# Add tasks to the workflow +workflow.add(task1) +workflow.add(task2) +workflow.add(task3) + +# Run the workflow +workflow.run() diff --git a/scripts/auto_tests_docs/auto_docs_omni.py b/scripts/auto_tests_docs/auto_docs_omni.py index fbd80b6a..3ae647a7 100644 --- a/scripts/auto_tests_docs/auto_docs_omni.py +++ b/scripts/auto_tests_docs/auto_docs_omni.py @@ -5,36 +5,13 @@ import threading from dotenv import load_dotenv from scripts.auto_tests_docs.docs import DOCUMENTATION_WRITER_SOP from swarms import OpenAIChat -from swarms.structs.agent import Agent -from swarms.structs.autoscaler import AutoScaler -from swarms.structs.base import BaseStructure -from swarms.structs.base_swarm import AbstractSwarm -from swarms.structs.base_workflow import BaseWorkflow -from swarms.structs.concurrent_workflow import ConcurrentWorkflow -from swarms.structs.conversation import Conversation -from swarms.structs.groupchat import GroupChat, GroupChatManager -from swarms.structs.model_parallizer import ModelParallelizer -from swarms.structs.multi_agent_collab import MultiAgentCollaboration -from swarms.structs.nonlinear_workflow import NonlinearWorkflow -from swarms.structs.recursive_workflow import RecursiveWorkflow -from swarms.structs.schemas import ( - Artifact, - ArtifactUpload, - StepInput, - TaskInput, -) -from swarms.structs.sequential_workflow import SequentialWorkflow -from swarms.structs.swarm_net import SwarmNetwork -from swarms.structs.utils import ( - distribute_tasks, - extract_key_from_json, - extract_tokens_from_text, - find_agent_by_id, - find_token_in_text, - parse_tasks, -) +########### + + +############### + load_dotenv() api_key = os.getenv("OPENAI_API_KEY") @@ -86,33 +63,7 @@ def process_documentation( def main(module: str = "docs/swarms/structs"): - items = [ - Agent, - SequentialWorkflow, - AutoScaler, - Conversation, - TaskInput, - Artifact, - ArtifactUpload, - StepInput, - SwarmNetwork, - ModelParallelizer, - MultiAgentCollaboration, - AbstractSwarm, - GroupChat, - GroupChatManager, - parse_tasks, - find_agent_by_id, - distribute_tasks, - find_token_in_text, - extract_key_from_json, - extract_tokens_from_text, - ConcurrentWorkflow, - RecursiveWorkflow, - NonlinearWorkflow, - BaseWorkflow, - BaseStructure, - ] + items = [] threads = [] for item in items: diff --git a/swarms/memory/__init__.py b/swarms/memory/__init__.py index a63a9553..d2eed0d5 100644 --- a/swarms/memory/__init__.py +++ b/swarms/memory/__init__.py @@ -2,10 +2,12 @@ from swarms.memory.base_vectordb import VectorDatabase from swarms.memory.short_term_memory import ShortTermMemory from swarms.memory.sqlite import SQLiteDB from swarms.memory.weaviate_db import WeaviateDB +from swarms.memory.visual_memory import VisualShortTermMemory __all__ = [ "VectorDatabase", "ShortTermMemory", "SQLiteDB", "WeaviateDB", + "VisualShortTermMemory", ] diff --git a/swarms/memory/visual_memory.py b/swarms/memory/visual_memory.py new file mode 100644 index 00000000..46a59509 --- /dev/null +++ b/swarms/memory/visual_memory.py @@ -0,0 +1,118 @@ +from typing import List +from datetime import datetime + + +class VisualShortTermMemory: + """ + A class representing visual short-term memory. + + Attributes: + memory (list): A list to store images and their descriptions. + + Examples: + example = VisualShortTermMemory() + example.add( + images=["image1.jpg", "image2.jpg"], + description=["description1", "description2"], + timestamps=[1.0, 2.0], + locations=["location1", "location2"], + ) + print(example.return_as_string()) + # print(example.get_images()) + """ + + def __init__(self): + self.memory = [] + + def add( + self, + images: List[str] = None, + description: List[str] = None, + timestamps: List[float] = None, + locations: List[str] = None, + ): + """ + Add images and their descriptions to the memory. + + Args: + images (list): A list of image paths. + description (list): A list of corresponding descriptions. + timestamps (list): A list of timestamps for each image. + locations (list): A list of locations where the images were captured. + """ + current_time = datetime.now() + + # Create a dictionary of each image and description + # and append it to the memory + for image, description, timestamp, location in zip( + images, description, timestamps, locations + ): + self.memory.append( + { + "image": image, + "description": description, + "timestamp": timestamp, + "location": location, + "added_at": current_time, + } + ) + + def get_images(self): + """ + Get a list of all images in the memory. + + Returns: + list: A list of image paths. + """ + return [item["image"] for item in self.memory] + + def get_descriptions(self): + """ + Get a list of all descriptions in the memory. + + Returns: + list: A list of descriptions. + """ + return [item["description"] for item in self.memory] + + def search_by_location(self, location: str): + """ + Search for images captured at a specific location. + + Args: + location (str): The location to search for. + + Returns: + list: A list of images captured at the specified location. + """ + return [ + item["image"] + for item in self.memory + if item["location"] == location + ] + + def search_by_timestamp(self, start_time: float, end_time: float): + """ + Search for images captured within a specific time range. + + Args: + start_time (float): The start time of the range. + end_time (float): The end time of the range. + + Returns: + list: A list of images captured within the specified time range. + """ + return [ + item["image"] + for item in self.memory + if start_time <= item["timestamp"] <= end_time + ] + + def return_as_string(self): + """ + Return the memory as a string. + + Returns: + str: A string representation of the memory. + """ + return str(self.memory) diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index 7e1a6439..dd3fc518 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -91,11 +91,13 @@ class ConcurrentWorkflow(BaseStructure): try: result = future.result() if self.print_results: - print(f"Task {task}: {result}") + logger.info(f"Task {task}: {result}") if self.return_results: results.append(result) except Exception as e: - print(f"Task {task} generated an exception: {e}") + logger.error( + f"Task {task} generated an exception: {e}" + ) return results if self.return_results else None diff --git a/swarms/structs/recursive_workflow.py b/swarms/structs/recursive_workflow.py index 17912563..7c1429d3 100644 --- a/swarms/structs/recursive_workflow.py +++ b/swarms/structs/recursive_workflow.py @@ -33,13 +33,13 @@ class RecursiveWorkflow(BaseStructure): def __init__(self, stop_token: str = ""): self.stop_token = stop_token - self.task_pool = List[Task] + self.task_pool = [] assert ( self.stop_token is not None ), "stop_token cannot be None" - def add(self, task: Task, tasks: List[Task] = None): + def add(self, task: Task = None, tasks: List[Task] = None): """Adds a task to the workflow. Args: @@ -49,12 +49,13 @@ class RecursiveWorkflow(BaseStructure): try: if tasks: for task in tasks: - self.task_pool.append(task) - logger.info( - "[INFO][RecursiveWorkflow] Added task" - f" {task} to workflow" - ) - else: + if isinstance(task, Task): + self.task_pool.append(task) + logger.info( + "[INFO][RecursiveWorkflow] Added task" + f" {task} to workflow" + ) + elif isinstance(task, Task): self.task_pool.append(task) logger.info( f"[INFO][RecursiveWorkflow] Added task {task} to" @@ -74,8 +75,8 @@ class RecursiveWorkflow(BaseStructure): try: for task in self.task_pool: while True: - result = task.execute() - if self.stop_token in result: + result = task.run() + if result is not None and self.stop_token in result: break logger.info(f"{result}") except Exception as error: diff --git a/swarms/structs/task.py b/swarms/structs/task.py index 9c0f8dac..c7ad12ab 100644 --- a/swarms/structs/task.py +++ b/swarms/structs/task.py @@ -54,9 +54,8 @@ class Task: >>> task.result """ - - description: str agent: Union[Callable, Agent] + description: str args: List[Any] = field(default_factory=list) kwargs: Dict[str, Any] = field(default_factory=dict) result: Any = None