[FEATS][ AgentJob, MultiThreadedWorkflow, MultiProcessingWorkflow]

pull/386/head
Kye 11 months ago
parent 44f4cb318b
commit bd18842584

1
.gitignore vendored

@ -17,6 +17,7 @@ venv
.DS_Store .DS_Store
.DS_STORE .DS_STORE
Cargo.lock
swarms/agents/.DS_Store swarms/agents/.DS_Store
_build _build

@ -0,0 +1,102 @@
import multion
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.models.base_llm import AbstractLLM
from swarms.structs.agent import Agent
from swarms.structs.task import Task
class MultiOnAgent(AbstractLLM):
"""
Represents a multi-on agent that performs browsing tasks.
Args:
max_steps (int): The maximum number of steps to perform during browsing.
starting_url (str): The starting URL for browsing.
Attributes:
max_steps (int): The maximum number of steps to perform during browsing.
starting_url (str): The starting URL for browsing.
"""
def __init__(
self,
multion_api_key: str,
max_steps: int = 4,
starting_url: str = "https://www.google.com",
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.multion_api_key = multion_api_key
self.max_steps = max_steps
self.starting_url = starting_url
multion.login(
use_api=True,
# multion_api_key=self.multion_api_key
*args,
**kwargs,
)
def run(self, task: str, *args, **kwargs):
"""
Runs a browsing task.
Args:
task (str): The task to perform during browsing.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
dict: The response from the browsing task.
"""
response = multion.browse(
{
"cmd": task,
"url": self.starting_url,
"maxSteps": self.max_steps,
},
*args,
**kwargs,
)
return response.result, response.status, response.lastUrl
# model
model = MultiOnAgent(
multion_api_key="535ae401948b4c59bc1b2c61eec90fe6"
)
# out = model.run("search for a recipe")
agent = Agent(
agent_name="MultiOnAgent",
description="A multi-on agent that performs browsing tasks.",
llm=model,
max_loops=1,
system_prompt=None,
)
# Task
task = Task(
agent=agent,
description=(
"send an email to vyom on superhuman for a partnership with"
" multion"
),
)
# Swarm
workflow = ConcurrentWorkflow(
max_workers=1000,
autosave=True,
print_results=True,
return_results=True,
)
# Add task to workflow
workflow.add(task)
# Run workflow
workflow.run()

@ -81,7 +81,6 @@ roboflow = "*"
[tool.poetry.group.lint.dependencies] [tool.poetry.group.lint.dependencies]
ruff = ">=0.0.249,<0.1.7" ruff = ">=0.0.249,<0.1.7"
types-toml = "^0.10.8.1" types-toml = "^0.10.8.1"
types-redis = "^4.3.21.6"
types-pytz = "^2023.3.0.0" types-pytz = "^2023.3.0.0"
black = "^23.1.0" black = "^23.1.0"
types-chardet = "^5.0.4.6" types-chardet = "^5.0.4.6"

@ -68,6 +68,14 @@ from swarms.structs.task_queue_base import (
synchronized_queue, synchronized_queue,
TaskQueueBase, TaskQueueBase,
) )
from swarms.structs.multi_process_workflow import (
MultiProcessingWorkflow,
)
from swarms.structs.multi_threaded_workflow import (
MultiThreadedWorkflow,
)
from swarms.structs.agent_base import AgentJob
__all__ = [ __all__ = [
"Agent", "Agent",
@ -131,4 +139,7 @@ __all__ = [
"StackOverflowSwarm", "StackOverflowSwarm",
"synchronized_queue", "synchronized_queue",
"TaskQueueBase", "TaskQueueBase",
"MultiProcessingWorkflow",
"MultiThreadedWorkflow",
"AgentJob",
] ]

@ -0,0 +1,20 @@
import threading
from typing import Callable, Tuple
class AgentJob(threading.Thread):
"""A class that handles multithreading logic.
Args:
function (Callable): The function to be executed in a separate thread.
args (Tuple): The arguments to be passed to the function.
"""
def __init__(self, function: Callable, args: Tuple):
threading.Thread.__init__(self)
self.function = function
self.args = args
def run(self) -> None:
"""Runs the function in a separate thread."""
self.function(*self.args)

@ -1,5 +1,5 @@
import asyncio import asyncio
from abc import ABC from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Callable, Dict, List, Optional from typing import Any, Callable, Dict, List, Optional
@ -432,3 +432,59 @@ class AbstractSwarm(ABC):
self.agents, self.agents,
) )
return list(responses) return list(responses)
@abstractmethod
def add_swarm_entry(self, swarm):
"""
Add the information of a joined Swarm to the registry.
Args:
swarm (SwarmManagerBase): Instance of SwarmManagerBase representing the joined Swarm.
Returns:
None
"""
@abstractmethod
def add_agent_entry(self, agent: Agent):
"""
Add the information of an Agent to the registry.
Args:
agent (Agent): Instance of Agent representing the Agent.
Returns:
None
"""
@abstractmethod
def retrieve_swarm_information(self, swarm_id: str):
"""
Retrieve the information of a specific Swarm from the registry.
Args:
swarm_id (str): Unique identifier of the Swarm.
Returns:
SwarmManagerBase: Instance of SwarmManagerBase representing the retrieved Swarm, or None if not found.
"""
@abstractmethod
def retrieve_joined_agents(self, agent_id: str) -> List[Agent]:
"""
Retrieve the information the Agents which have joined the registry.
Returns:
Agent: Instance of Agent representing the retrieved Agent, or None if not found.
"""
@abstractmethod
def join_swarm(
self, from_entity: Agent | Agent, to_entity: Agent
):
"""
Add a relationship between a Swarm and an Agent or other Swarm to the registry.
Args:
from (Agent | SwarmManagerBase): Instance of Agent or SwarmManagerBase representing the source of the relationship.
"""

@ -0,0 +1,197 @@
import logging
from functools import wraps
from multiprocessing import Manager, Pool, cpu_count
from time import sleep
from typing import List
from swarms.structs.base_workflow import BaseWorkflow
from swarms.structs.task import Task
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
# Retry on failure
def retry_on_failure(max_retries: int = 3, delay: int = 5):
"""
Decorator that retries a function a specified number of times on failure.
Args:
max_retries (int): The maximum number of retries (default: 3).
delay (int): The delay in seconds between retries (default: 5).
Returns:
The result of the function if it succeeds within the maximum number of retries,
otherwise None.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for _ in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as error:
logging.error(
f"Error: {str(error)}, retrying in"
f" {delay} seconds..."
)
sleep(delay)
return None
return wrapper
return decorator
class MultiProcessingWorkflow(BaseWorkflow):
"""
Initialize a MultiProcessWorkflow object.
Args:
max_workers (int): The maximum number of workers to use for parallel processing.
autosave (bool): Flag indicating whether to automatically save the workflow.
tasks (List[Task]): A list of Task objects representing the workflow tasks.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Example:
>>> from swarms.structs.multi_process_workflow import MultiProcessingWorkflow
>>> from swarms.structs.task import Task
>>> from datetime import datetime
>>> from time import sleep
>>>
>>> # Define a simple task
>>> def simple_task():
>>> sleep(1)
>>> return datetime.now()
>>>
>>> # Create a task object
>>> task = Task(
>>> name="Simple Task",
>>> execute=simple_task,
>>> priority=1,
>>> )
>>>
>>> # Create a workflow with the task
>>> workflow = MultiProcessingWorkflow(tasks=[task])
>>>
>>> # Run the workflow
>>> results = workflow.run(task)
>>>
>>> # Print the results
>>> print(results)
"""
def __init__(
self,
max_workers: int = 5,
autosave: bool = True,
tasks: List[Task] = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.max_workers = max_workers
self.autosave = autosave
self.tasks = sorted(
tasks or [], key=lambda task: task.priority, reverse=True
)
self.max_workers or cpu_count()
if tasks is None:
tasks = []
self.tasks = tasks
def execute_task(self, task: Task, *args, **kwargs):
"""Execute a task and handle exceptions.
Args:
task (Task): The task to execute.
*args: Additional positional arguments for the task execution.
**kwargs: Additional keyword arguments for the task execution.
Returns:
Any: The result of the task execution.
"""
try:
result = task.execute(*args, **kwargs)
logging.info(
f"Task {task} completed successfully with result"
f" {result}"
)
if self.autosave:
self._autosave_task_result(task, result)
except Exception as e:
logging.error(
(
"An error occurred during execution of task"
f" {task}: {str(e)}"
),
exc_info=True,
)
return None
def run(self, task: Task, *args, **kwargs):
"""Run the workflow.
Args:
task (Task): The task to run.
*args: Additional positional arguments for the task execution.
**kwargs: Additional keyword arguments for the task execution.
Returns:
List[Any]: The results of all executed tasks.
"""
try:
results = []
with Manager() as manager:
with Pool(
processes=self.max_workers, *args, **kwargs
) as pool:
# Using manager.list() to collect results in a process safe way
results_list = manager.list()
jobs = [
pool.apply_async(
self.execute_task,
(task,),
callback=results_list.append,
timeout=task.timeout,
*args,
**kwargs,
)
for task in self.tasks
]
# Wait for all jobs to complete
for job in jobs:
job.get()
results = list(results_list)
return results
except Exception as error:
logging.error(f"Error in run: {error}")
return None
def _autosave_task_result(self, task: Task, result):
"""Autosave task result. This should be adapted based on how autosaving is implemented.
Args:
task (Task): The task for which to autosave the result.
result (Any): The result of the task execution.
"""
# Note: This method might need to be adapted to ensure it's process-safe, depending on how autosaving is implemented.
logging.info(f"Autosaving result for task {task}: {result}")
# Actual autosave logic here

@ -0,0 +1,157 @@
import threading
from swarms.structs.base_workflow import BaseWorkflow
import logging
from concurrent.futures import (
FIRST_COMPLETED,
ThreadPoolExecutor,
wait,
)
from typing import List
from swarms.structs.task import Task
import queue
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
class PriorityTask:
"""
Represents a task with a priority level.
Attributes:
task (Task): The task to be executed.
priority (int): The priority level of the task.
"""
def __init__(self, task: Task, priority: int = 0):
self.task = task
self.priority = priority
def __lt__(self, other):
return self.priority < other.priority
class MultiThreadedWorkflow(BaseWorkflow):
"""
Represents a multi-threaded workflow that executes tasks concurrently using a thread pool.
Args:
max_workers (int): The maximum number of worker threads in the thread pool. Default is 5.
autosave (bool): Flag indicating whether to automatically save task results. Default is True.
tasks (List[PriorityTask]): List of priority tasks to be executed. Default is an empty list.
retry_attempts (int): The maximum number of retry attempts for failed tasks. Default is 3.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Attributes:
max_workers (int): The maximum number of worker threads in the thread pool.
autosave (bool): Flag indicating whether to automatically save task results.
retry_attempts (int): The maximum number of retry attempts for failed tasks.
tasks_queue (PriorityQueue): The queue that holds the priority tasks.
lock (Lock): The lock used for thread synchronization.
Methods:
execute_tasks: Executes the tasks in the thread pool and returns the results.
_autosave_task_result: Autosaves the result of a task.
"""
def __init__(
self,
max_workers: int = 5,
autosave: bool = True,
tasks: List[PriorityTask] = None,
retry_attempts: int = 3,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.max_workers = max_workers
self.autosave = autosave
self.retry_attempts = retry_attempts
if tasks is None:
tasks = []
self.tasks_queue = queue.PriorityQueue()
for task in tasks:
self.tasks_queue.put(task)
self.lock = threading.Lock()
def run(self):
"""
Executes the tasks in the thread pool and returns the results.
Returns:
List: The list of results from the executed tasks.
"""
results = []
with ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
future_to_task = {}
for _ in range(self.tasks_queue.qsize()):
priority_task = self.tasks_queue.get_nowait()
future = executor.submit(priority_task.task.execute)
future_to_task[future] = (
priority_task.task,
0,
) # (Task, attempt)
while future_to_task:
# Wait for the next future to complete
done, _ = wait(
future_to_task.keys(), return_when=FIRST_COMPLETED
)
for future in done:
task, attempt = future_to_task.pop(future)
try:
result = future.result()
results.append(result)
logging.info(
f"Task {task} completed successfully with"
f" result: {result}"
)
if self.autosave:
self._autosave_task_result(task, result)
except Exception as e:
logging.error(
(
f"Attempt {attempt+1} failed for task"
f" {task}: {str(e)}"
),
exc_info=True,
)
if attempt + 1 < self.retry_attempts:
# Retry the task
retry_future = executor.submit(
task.execute
)
future_to_task[retry_future] = (
task,
attempt + 1,
)
else:
logging.error(
f"Task {task} failed after"
f" {self.retry_attempts} attempts."
)
return results
def _autosave_task_result(self, task: Task, result):
"""
Autosaves the result of a task.
Args:
task (Task): The task whose result needs to be autosaved.
result: The result of the task.
"""
with self.lock:
logging.info(
f"Autosaving result for task {task}: {result}"
)
# Actual autosave logic goes here

@ -0,0 +1,176 @@
from dataclasses import asdict
from typing import List
import networkx as nx
import redis
from redis.commands.graph import Graph, Node
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import AbstractSwarm
class SwarmRelationship:
JOINED = "joined"
class RedisSwarmRegistry(AbstractSwarm):
"""
Initialize the SwarmRedisRegistry object.
Args:
host (str): The hostname or IP address of the Redis server. Default is "localhost".
port (int): The port number of the Redis server. Default is 6379.
db: The Redis database number. Default is 0.
graph_name (str): The name of the RedisGraph graph. Default is "swarm_registry".
"""
def __init__(
self,
host: str = "localhost",
port: int = 6379,
db=0,
graph_name: str = "swarm_registry",
):
self.redis = redis.StrictRedis(
host=host, port=port, db=db, decode_responses=True
)
self.redis_graph = Graph(self.redis, graph_name)
self.graph = nx.DiGraph()
def _entity_to_node(self, entity: Agent | Agent) -> Node:
"""
Converts an Agent or Swarm object to a Node object.
Args:
entity (Agent | Agent): The Agent or Swarm object to convert.
Returns:
Node: The converted Node object.
"""
return Node(
node_id=entity.id,
alias=entity.agent_name,
label=entity.agent_description,
properties=asdict(entity),
)
def _add_node(self, node: Agent | Agent):
"""
Adds a node to the graph.
Args:
node (Agent | Agent): The Agent or Swarm node to add.
"""
self.graph.add_node(node.id)
if isinstance(node, Agent):
self.add_swarm_entry(node)
elif isinstance(node, Agent):
self.add_agent_entry(node)
def _add_edge(self, from_node: Node, to_node: Node, relationship):
"""
Adds an edge between two nodes in the graph.
Args:
from_node (Node): The source node of the edge.
to_node (Node): The target node of the edge.
relationship: The relationship type between the nodes.
"""
match_query = (
f"MATCH (a:{from_node.label}),(b:{to_node.label}) WHERE"
f" a.id = {from_node.id} AND b.id = {to_node.id}"
)
query = f"""
{match_query}
CREATE (a)-[r:joined]->(b) RETURN r
""".replace("\n", "")
self.redis_graph.query(query)
def add_swarm_entry(self, swarm: Agent):
"""
Adds a swarm entry to the graph.
Args:
swarm (Agent): The swarm object to add.
"""
node = self._entity_to_node(swarm)
self._persist_node(node)
def add_agent_entry(self, agent: Agent):
"""
Adds an agent entry to the graph.
Args:
agent (Agent): The agent object to add.
"""
node = self._entity_to_node(agent)
self._persist_node(node)
def join_swarm(
self,
from_entity: Agent | Agent,
to_entity: Agent,
):
"""
Adds an edge between two nodes in the graph.
Args:
from_entity (Agent | Agent): The source entity of the edge.
to_entity (Agent): The target entity of the edge.
Returns:
Any: The result of adding the edge.
"""
from_node = self._entity_to_node(from_entity)
to_node = self._entity_to_node(to_entity)
return self._add_edge(
from_node, to_node, SwarmRelationship.JOINED
)
def _persist_node(self, node: Node):
"""
Persists a node in the graph.
Args:
node (Node): The node to persist.
"""
query = f"CREATE {node}"
self.redis_graph.query(query)
def retrieve_swarm_information(self, swarm_id: int) -> Agent:
"""
Retrieves swarm information from the registry.
Args:
swarm_id (int): The ID of the swarm to retrieve.
Returns:
Agent: The retrieved swarm information as an Agent object.
"""
swarm_key = f"swarm:{swarm_id}"
swarm_data = self.redis.hgetall(swarm_key)
if swarm_data:
# Parse the swarm_data and return an instance of AgentBase
# You can use the retrieved data to populate the AgentBase attributes
return Agent(**swarm_data)
return None
def retrieve_joined_agents(self) -> List[Agent]:
"""
Retrieves a list of joined agents from the registry.
Returns:
List[Agent]: The retrieved joined agents as a list of Agent objects.
"""
agent_data = self.redis_graph.query(
"MATCH (a:agent)-[:joined]->(b:manager) RETURN a"
)
if agent_data:
# Parse the agent_data and return an instance of AgentBase
# You can use the retrieved data to populate the AgentBase attributes
return [Agent(**agent_data) for agent_data in agent_data]
return None
Loading…
Cancel
Save