diff --git a/.gitignore b/.gitignore index ea778ce8..bc35715f 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ venv .DS_Store .DS_STORE +Cargo.lock swarms/agents/.DS_Store _build diff --git a/multion_agent.py b/multion_agent.py new file mode 100644 index 00000000..132ccb87 --- /dev/null +++ b/multion_agent.py @@ -0,0 +1,102 @@ +import multion +from swarms.structs.concurrent_workflow import ConcurrentWorkflow +from swarms.models.base_llm import AbstractLLM +from swarms.structs.agent import Agent +from swarms.structs.task import Task + + +class MultiOnAgent(AbstractLLM): + """ + Represents a multi-on agent that performs browsing tasks. + + Args: + max_steps (int): The maximum number of steps to perform during browsing. + starting_url (str): The starting URL for browsing. + + Attributes: + max_steps (int): The maximum number of steps to perform during browsing. + starting_url (str): The starting URL for browsing. + """ + + def __init__( + self, + multion_api_key: str, + max_steps: int = 4, + starting_url: str = "https://www.google.com", + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.multion_api_key = multion_api_key + self.max_steps = max_steps + self.starting_url = starting_url + + multion.login( + use_api=True, + # multion_api_key=self.multion_api_key + *args, + **kwargs, + ) + + def run(self, task: str, *args, **kwargs): + """ + Runs a browsing task. + + Args: + task (str): The task to perform during browsing. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. + + Returns: + dict: The response from the browsing task. + """ + response = multion.browse( + { + "cmd": task, + "url": self.starting_url, + "maxSteps": self.max_steps, + }, + *args, + **kwargs, + ) + + return response.result, response.status, response.lastUrl + + +# model +model = MultiOnAgent( + multion_api_key="535ae401948b4c59bc1b2c61eec90fe6" +) + +# out = model.run("search for a recipe") +agent = Agent( + agent_name="MultiOnAgent", + description="A multi-on agent that performs browsing tasks.", + llm=model, + max_loops=1, + system_prompt=None, +) + + +# Task +task = Task( + agent=agent, + description=( + "send an email to vyom on superhuman for a partnership with" + " multion" + ), +) + +# Swarm +workflow = ConcurrentWorkflow( + max_workers=1000, + autosave=True, + print_results=True, + return_results=True, +) + +# Add task to workflow +workflow.add(task) + +# Run workflow +workflow.run() diff --git a/pyproject.toml b/pyproject.toml index 74fa9101..90ab06e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,7 +81,6 @@ roboflow = "*" [tool.poetry.group.lint.dependencies] ruff = ">=0.0.249,<0.1.7" types-toml = "^0.10.8.1" -types-redis = "^4.3.21.6" types-pytz = "^2023.3.0.0" black = "^23.1.0" types-chardet = "^5.0.4.6" diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 1beddfb6..40bd3325 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -68,6 +68,14 @@ from swarms.structs.task_queue_base import ( synchronized_queue, TaskQueueBase, ) +from swarms.structs.multi_process_workflow import ( + MultiProcessingWorkflow, +) +from swarms.structs.multi_threaded_workflow import ( + MultiThreadedWorkflow, +) +from swarms.structs.agent_base import AgentJob + __all__ = [ "Agent", @@ -131,4 +139,7 @@ __all__ = [ "StackOverflowSwarm", "synchronized_queue", "TaskQueueBase", + "MultiProcessingWorkflow", + "MultiThreadedWorkflow", + "AgentJob", ] diff --git a/swarms/structs/agent_base.py b/swarms/structs/agent_base.py new file mode 100644 index 00000000..8f1a3669 --- /dev/null +++ b/swarms/structs/agent_base.py @@ -0,0 +1,20 @@ +import threading +from typing import Callable, Tuple + + +class AgentJob(threading.Thread): + """A class that handles multithreading logic. + + Args: + function (Callable): The function to be executed in a separate thread. + args (Tuple): The arguments to be passed to the function. + """ + + def __init__(self, function: Callable, args: Tuple): + threading.Thread.__init__(self) + self.function = function + self.args = args + + def run(self) -> None: + """Runs the function in a separate thread.""" + self.function(*self.args) diff --git a/swarms/structs/base_swarm.py b/swarms/structs/base_swarm.py index fe81df4e..a961be58 100644 --- a/swarms/structs/base_swarm.py +++ b/swarms/structs/base_swarm.py @@ -1,5 +1,5 @@ import asyncio -from abc import ABC +from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any, Callable, Dict, List, Optional @@ -432,3 +432,59 @@ class AbstractSwarm(ABC): self.agents, ) return list(responses) + + @abstractmethod + def add_swarm_entry(self, swarm): + """ + Add the information of a joined Swarm to the registry. + + Args: + swarm (SwarmManagerBase): Instance of SwarmManagerBase representing the joined Swarm. + + Returns: + None + """ + + @abstractmethod + def add_agent_entry(self, agent: Agent): + """ + Add the information of an Agent to the registry. + + Args: + agent (Agent): Instance of Agent representing the Agent. + + Returns: + None + """ + + @abstractmethod + def retrieve_swarm_information(self, swarm_id: str): + """ + Retrieve the information of a specific Swarm from the registry. + + Args: + swarm_id (str): Unique identifier of the Swarm. + + Returns: + SwarmManagerBase: Instance of SwarmManagerBase representing the retrieved Swarm, or None if not found. + """ + + @abstractmethod + def retrieve_joined_agents(self, agent_id: str) -> List[Agent]: + """ + Retrieve the information the Agents which have joined the registry. + + Returns: + Agent: Instance of Agent representing the retrieved Agent, or None if not found. + """ + + @abstractmethod + def join_swarm( + self, from_entity: Agent | Agent, to_entity: Agent + ): + """ + Add a relationship between a Swarm and an Agent or other Swarm to the registry. + + Args: + from (Agent | SwarmManagerBase): Instance of Agent or SwarmManagerBase representing the source of the relationship. + """ diff --git a/swarms/structs/multi_process_workflow.py b/swarms/structs/multi_process_workflow.py new file mode 100644 index 00000000..39c69eaa --- /dev/null +++ b/swarms/structs/multi_process_workflow.py @@ -0,0 +1,197 @@ +import logging +from functools import wraps +from multiprocessing import Manager, Pool, cpu_count +from time import sleep +from typing import List + +from swarms.structs.base_workflow import BaseWorkflow +from swarms.structs.task import Task + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) + + +# Retry on failure +def retry_on_failure(max_retries: int = 3, delay: int = 5): + """ + Decorator that retries a function a specified number of times on failure. + + Args: + max_retries (int): The maximum number of retries (default: 3). + delay (int): The delay in seconds between retries (default: 5). + + Returns: + The result of the function if it succeeds within the maximum number of retries, + otherwise None. + """ + + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + for _ in range(max_retries): + try: + return func(*args, **kwargs) + except Exception as error: + logging.error( + f"Error: {str(error)}, retrying in" + f" {delay} seconds..." + ) + sleep(delay) + return None + + return wrapper + + return decorator + + +class MultiProcessingWorkflow(BaseWorkflow): + """ + Initialize a MultiProcessWorkflow object. + + Args: + max_workers (int): The maximum number of workers to use for parallel processing. + autosave (bool): Flag indicating whether to automatically save the workflow. + tasks (List[Task]): A list of Task objects representing the workflow tasks. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. + + Example: + >>> from swarms.structs.multi_process_workflow import MultiProcessingWorkflow + >>> from swarms.structs.task import Task + >>> from datetime import datetime + >>> from time import sleep + >>> + >>> # Define a simple task + >>> def simple_task(): + >>> sleep(1) + >>> return datetime.now() + >>> + >>> # Create a task object + >>> task = Task( + >>> name="Simple Task", + >>> execute=simple_task, + >>> priority=1, + >>> ) + >>> + >>> # Create a workflow with the task + >>> workflow = MultiProcessingWorkflow(tasks=[task]) + >>> + >>> # Run the workflow + >>> results = workflow.run(task) + >>> + >>> # Print the results + >>> print(results) + """ + + def __init__( + self, + max_workers: int = 5, + autosave: bool = True, + tasks: List[Task] = None, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.max_workers = max_workers + self.autosave = autosave + self.tasks = sorted( + tasks or [], key=lambda task: task.priority, reverse=True + ) + + self.max_workers or cpu_count() + + if tasks is None: + tasks = [] + + self.tasks = tasks + + def execute_task(self, task: Task, *args, **kwargs): + """Execute a task and handle exceptions. + + Args: + task (Task): The task to execute. + *args: Additional positional arguments for the task execution. + **kwargs: Additional keyword arguments for the task execution. + + Returns: + Any: The result of the task execution. + + """ + try: + result = task.execute(*args, **kwargs) + + logging.info( + f"Task {task} completed successfully with result" + f" {result}" + ) + + if self.autosave: + self._autosave_task_result(task, result) + + except Exception as e: + logging.error( + ( + "An error occurred during execution of task" + f" {task}: {str(e)}" + ), + exc_info=True, + ) + return None + + def run(self, task: Task, *args, **kwargs): + """Run the workflow. + + Args: + task (Task): The task to run. + *args: Additional positional arguments for the task execution. + **kwargs: Additional keyword arguments for the task execution. + + Returns: + List[Any]: The results of all executed tasks. + + """ + try: + results = [] + with Manager() as manager: + with Pool( + processes=self.max_workers, *args, **kwargs + ) as pool: + # Using manager.list() to collect results in a process safe way + results_list = manager.list() + jobs = [ + pool.apply_async( + self.execute_task, + (task,), + callback=results_list.append, + timeout=task.timeout, + *args, + **kwargs, + ) + for task in self.tasks + ] + + # Wait for all jobs to complete + for job in jobs: + job.get() + + results = list(results_list) + + return results + except Exception as error: + logging.error(f"Error in run: {error}") + return None + + def _autosave_task_result(self, task: Task, result): + """Autosave task result. This should be adapted based on how autosaving is implemented. + + Args: + task (Task): The task for which to autosave the result. + result (Any): The result of the task execution. + + """ + # Note: This method might need to be adapted to ensure it's process-safe, depending on how autosaving is implemented. + logging.info(f"Autosaving result for task {task}: {result}") + # Actual autosave logic here diff --git a/swarms/structs/multi_threaded_workflow.py b/swarms/structs/multi_threaded_workflow.py new file mode 100644 index 00000000..df17d8ce --- /dev/null +++ b/swarms/structs/multi_threaded_workflow.py @@ -0,0 +1,157 @@ +import threading +from swarms.structs.base_workflow import BaseWorkflow +import logging +from concurrent.futures import ( + FIRST_COMPLETED, + ThreadPoolExecutor, + wait, +) +from typing import List +from swarms.structs.task import Task +import queue + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) + + +class PriorityTask: + """ + Represents a task with a priority level. + + Attributes: + task (Task): The task to be executed. + priority (int): The priority level of the task. + """ + + def __init__(self, task: Task, priority: int = 0): + self.task = task + self.priority = priority + + def __lt__(self, other): + return self.priority < other.priority + + +class MultiThreadedWorkflow(BaseWorkflow): + """ + Represents a multi-threaded workflow that executes tasks concurrently using a thread pool. + + Args: + max_workers (int): The maximum number of worker threads in the thread pool. Default is 5. + autosave (bool): Flag indicating whether to automatically save task results. Default is True. + tasks (List[PriorityTask]): List of priority tasks to be executed. Default is an empty list. + retry_attempts (int): The maximum number of retry attempts for failed tasks. Default is 3. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + + Attributes: + max_workers (int): The maximum number of worker threads in the thread pool. + autosave (bool): Flag indicating whether to automatically save task results. + retry_attempts (int): The maximum number of retry attempts for failed tasks. + tasks_queue (PriorityQueue): The queue that holds the priority tasks. + lock (Lock): The lock used for thread synchronization. + + Methods: + execute_tasks: Executes the tasks in the thread pool and returns the results. + _autosave_task_result: Autosaves the result of a task. + + """ + + def __init__( + self, + max_workers: int = 5, + autosave: bool = True, + tasks: List[PriorityTask] = None, + retry_attempts: int = 3, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.max_workers = max_workers + self.autosave = autosave + self.retry_attempts = retry_attempts + if tasks is None: + tasks = [] + self.tasks_queue = queue.PriorityQueue() + for task in tasks: + self.tasks_queue.put(task) + self.lock = threading.Lock() + + def run(self): + """ + Executes the tasks in the thread pool and returns the results. + + Returns: + List: The list of results from the executed tasks. + + """ + results = [] + with ThreadPoolExecutor( + max_workers=self.max_workers + ) as executor: + future_to_task = {} + for _ in range(self.tasks_queue.qsize()): + priority_task = self.tasks_queue.get_nowait() + future = executor.submit(priority_task.task.execute) + future_to_task[future] = ( + priority_task.task, + 0, + ) # (Task, attempt) + + while future_to_task: + # Wait for the next future to complete + done, _ = wait( + future_to_task.keys(), return_when=FIRST_COMPLETED + ) + + for future in done: + task, attempt = future_to_task.pop(future) + try: + result = future.result() + results.append(result) + logging.info( + f"Task {task} completed successfully with" + f" result: {result}" + ) + if self.autosave: + self._autosave_task_result(task, result) + except Exception as e: + logging.error( + ( + f"Attempt {attempt+1} failed for task" + f" {task}: {str(e)}" + ), + exc_info=True, + ) + if attempt + 1 < self.retry_attempts: + # Retry the task + retry_future = executor.submit( + task.execute + ) + future_to_task[retry_future] = ( + task, + attempt + 1, + ) + else: + logging.error( + f"Task {task} failed after" + f" {self.retry_attempts} attempts." + ) + + return results + + def _autosave_task_result(self, task: Task, result): + """ + Autosaves the result of a task. + + Args: + task (Task): The task whose result needs to be autosaved. + result: The result of the task. + + """ + with self.lock: + logging.info( + f"Autosaving result for task {task}: {result}" + ) + # Actual autosave logic goes here diff --git a/swarms/structs/swarm_redis_registry.py b/swarms/structs/swarm_redis_registry.py new file mode 100644 index 00000000..a17549cd --- /dev/null +++ b/swarms/structs/swarm_redis_registry.py @@ -0,0 +1,176 @@ +from dataclasses import asdict +from typing import List + +import networkx as nx +import redis +from redis.commands.graph import Graph, Node + +from swarms.structs.agent import Agent +from swarms.structs.base_swarm import AbstractSwarm + + +class SwarmRelationship: + JOINED = "joined" + + +class RedisSwarmRegistry(AbstractSwarm): + """ + Initialize the SwarmRedisRegistry object. + + Args: + host (str): The hostname or IP address of the Redis server. Default is "localhost". + port (int): The port number of the Redis server. Default is 6379. + db: The Redis database number. Default is 0. + graph_name (str): The name of the RedisGraph graph. Default is "swarm_registry". + """ + + def __init__( + self, + host: str = "localhost", + port: int = 6379, + db=0, + graph_name: str = "swarm_registry", + ): + self.redis = redis.StrictRedis( + host=host, port=port, db=db, decode_responses=True + ) + self.redis_graph = Graph(self.redis, graph_name) + self.graph = nx.DiGraph() + + def _entity_to_node(self, entity: Agent | Agent) -> Node: + """ + Converts an Agent or Swarm object to a Node object. + + Args: + entity (Agent | Agent): The Agent or Swarm object to convert. + + Returns: + Node: The converted Node object. + """ + return Node( + node_id=entity.id, + alias=entity.agent_name, + label=entity.agent_description, + properties=asdict(entity), + ) + + def _add_node(self, node: Agent | Agent): + """ + Adds a node to the graph. + + Args: + node (Agent | Agent): The Agent or Swarm node to add. + """ + self.graph.add_node(node.id) + if isinstance(node, Agent): + self.add_swarm_entry(node) + elif isinstance(node, Agent): + self.add_agent_entry(node) + + def _add_edge(self, from_node: Node, to_node: Node, relationship): + """ + Adds an edge between two nodes in the graph. + + Args: + from_node (Node): The source node of the edge. + to_node (Node): The target node of the edge. + relationship: The relationship type between the nodes. + """ + match_query = ( + f"MATCH (a:{from_node.label}),(b:{to_node.label}) WHERE" + f" a.id = {from_node.id} AND b.id = {to_node.id}" + ) + + query = f""" + {match_query} + CREATE (a)-[r:joined]->(b) RETURN r + """.replace("\n", "") + + self.redis_graph.query(query) + + def add_swarm_entry(self, swarm: Agent): + """ + Adds a swarm entry to the graph. + + Args: + swarm (Agent): The swarm object to add. + """ + node = self._entity_to_node(swarm) + self._persist_node(node) + + def add_agent_entry(self, agent: Agent): + """ + Adds an agent entry to the graph. + + Args: + agent (Agent): The agent object to add. + """ + node = self._entity_to_node(agent) + self._persist_node(node) + + def join_swarm( + self, + from_entity: Agent | Agent, + to_entity: Agent, + ): + """ + Adds an edge between two nodes in the graph. + + Args: + from_entity (Agent | Agent): The source entity of the edge. + to_entity (Agent): The target entity of the edge. + + Returns: + Any: The result of adding the edge. + """ + from_node = self._entity_to_node(from_entity) + to_node = self._entity_to_node(to_entity) + + return self._add_edge( + from_node, to_node, SwarmRelationship.JOINED + ) + + def _persist_node(self, node: Node): + """ + Persists a node in the graph. + + Args: + node (Node): The node to persist. + """ + query = f"CREATE {node}" + self.redis_graph.query(query) + + def retrieve_swarm_information(self, swarm_id: int) -> Agent: + """ + Retrieves swarm information from the registry. + + Args: + swarm_id (int): The ID of the swarm to retrieve. + + Returns: + Agent: The retrieved swarm information as an Agent object. + """ + swarm_key = f"swarm:{swarm_id}" + swarm_data = self.redis.hgetall(swarm_key) + if swarm_data: + # Parse the swarm_data and return an instance of AgentBase + # You can use the retrieved data to populate the AgentBase attributes + + return Agent(**swarm_data) + return None + + def retrieve_joined_agents(self) -> List[Agent]: + """ + Retrieves a list of joined agents from the registry. + + Returns: + List[Agent]: The retrieved joined agents as a list of Agent objects. + """ + agent_data = self.redis_graph.query( + "MATCH (a:agent)-[:joined]->(b:manager) RETURN a" + ) + if agent_data: + # Parse the agent_data and return an instance of AgentBase + # You can use the retrieved data to populate the AgentBase attributes + return [Agent(**agent_data) for agent_data in agent_data] + return None