worker clean up

Former-commit-id: be16da4952
group-chat
Kye 1 year ago
parent 410c707046
commit ea4cf03f88

@ -10,7 +10,6 @@ The `OmniModalAgent` class is at the core of an architecture designed to facilit
4. **Tools**: A collection of tools and utilities used to process different types of tasks. They span across areas like image captioning, translation, and more. 4. **Tools**: A collection of tools and utilities used to process different types of tasks. They span across areas like image captioning, translation, and more.
## Structure & Organization ## Structure & Organization
### Table of Contents: ### Table of Contents:

@ -8,8 +8,6 @@ from swarms.agents.omni_modal_agent import OmniModalAgent
#utils #utils
from swarms.agents.message import Message from swarms.agents.message import Message
from swarms.agents.stream_response import stream from swarms.agents.stream_response import stream

@ -1,4 +1,5 @@
from swarms.workers.worker import Worker from swarms.workers.worker import Worker
from queue import Queue, PriorityQueue
class SimpleSwarm: class SimpleSwarm:
def __init__( def __init__(
@ -9,20 +10,76 @@ class SimpleSwarm:
): ):
""" """
# Usage Usage:
swarm = Swarm(num_workers=5, openai_api_key="", ai_name="Optimus Prime") # Initialize the swarm with 5 workers, an API key, and a name for the AI model
task = "What were the winning Boston Marathon times for the past 5 years (ending in 2022)? Generate a table of the year, name, country of origin, and times." swarm = SimpleSwarm(num_workers=5, openai_api_key="YOUR_OPENAI_API_KEY", ai_name="Optimus Prime")
responses = swarm.distribute_task(task)
# Normal task without priority
normal_task = "Describe the process of photosynthesis in simple terms."
swarm.distribute_task(normal_task)
# Priority task; lower numbers indicate higher priority (e.g., 1 is higher priority than 2)
priority_task = "Translate the phrase 'Hello World' to French."
swarm.distribute_task(priority_task, priority=1)
# Run the tasks and gather the responses
responses = swarm.run()
# Print responses
for response in responses: for response in responses:
print(response) print(response)
# Providing feedback to the system (this is a stubbed method and won't produce a visible effect, but serves as an example)
swarm.provide_feedback("Improve translation accuracy.")
# Perform a health check on the agents (this is also a stubbed method, illustrating potential usage)
swarm.health_check()
""" """
self.workers = [ self.workers = [
Worker(openai_api_key, ai_name) for _ in range(num_workers) Worker(openai_api_key, ai_name) for _ in range(num_workers)
] ]
self.task_queue = Queue()
self.priority_queue = PriorityQueue()
def distribute(
self,
task,
priority=None
):
"""Distribute a task to the workers"""
if priority:
self.priority_queue.put((priority, task))
else:
self.task_queue.put(task)
def _process_task(self, task):
#TODO, Implement load balancing, fallback mechanism
for worker in self.workers:
response = worker.run(task)
if response:
return response
return "All Agents failed"
def run(self):
"""Run the simple swarm"""
responses = []
#process high priority tasks first
while not self.priority_queue.empty():
_, task = self.priority_queue.get()
responses.append(self._process_task(task))
#process normal tasks
while not self.task_queue.empty():
task = self.task_queue.get()
responses.append(self._process_task(task))
return responses
def run(self, task): def run_old(self, task):
responses = [] responses = []
for worker in self.workers: for worker in self.workers:

@ -1,4 +1,4 @@
# import concurrent.futures
import faiss import faiss
from langchain.chat_models import ChatOpenAI from langchain.chat_models import ChatOpenAI
from langchain.docstore import InMemoryDocstore from langchain.docstore import InMemoryDocstore
@ -7,6 +7,7 @@ from langchain.tools.human.tool import HumanInputRun
from langchain.vectorstores import FAISS from langchain.vectorstores import FAISS
from langchain_experimental.autonomous_agents import AutoGPT from langchain_experimental.autonomous_agents import AutoGPT
from swarms.agents.message import Message
from swarms.tools.autogpt import ( from swarms.tools.autogpt import (
ReadFileTool, ReadFileTool,
VQAinference, VQAinference,
@ -17,6 +18,13 @@ from swarms.tools.autogpt import (
) )
from swarms.utils.decorators import error_decorator, log_decorator, timing_decorator from swarms.utils.decorators import error_decorator, log_decorator, timing_decorator
# self.llm = ChatOpenAI(
# model_name=model_name,
# openai_api_key=self.openai_api_key,
# temperature=self.temperature
# )
#cache
ROOT_DIR = "./data/" ROOT_DIR = "./data/"
@ -51,42 +59,20 @@ class Worker:
``` ```
""" """
# @log_decorator
# @error_decorator
# @timing_decorator
def __init__( def __init__(
self, self,
model_name="gpt-4",
openai_api_key = None, openai_api_key = None,
ai_name = "Autobot Swarm Worker", ai_name = "Autobot Swarm Worker",
ai_role = "Worker in a swarm", ai_role = "Worker in a swarm",
external_tools = None, external_tools = None,
human_in_the_loop = False, human_in_the_loop = False,
temperature = 0.5, temperature = 0.5,
# llm=None, llm = None,
# openai: bool = True,
): ):
self.openai_api_key = openai_api_key self.openai_api_key = openai_api_key
self.temperature = temperature self.temperature = temperature
self.human_in_the_loop = human_in_the_loop self.human_in_the_loop = human_in_the_loop
# self.openai = openai self.llm = llm
# if self.openai is True:
# try:
self.llm = ChatOpenAI(
model_name=model_name,
openai_api_key=self.openai_api_key,
temperature=self.temperature
)
# except Exception as error:
# raise RuntimeError(f"Error Initializing ChatOpenAI: {error}")
# else:
# self.llm = llm(
# model_name=model_name,
# temperature=self.temperature
# )
self.ai_name = ai_name self.ai_name = ai_name
self.ai_role = ai_role self.ai_role = ai_role
self.setup_tools(external_tools) self.setup_tools(external_tools)
@ -96,8 +82,6 @@ class Worker:
# self.task_queue = [] # self.task_queue = []
# self.executor = concurrent.futures.ThreadPoolExecutor() # self.executor = concurrent.futures.ThreadPoolExecutor()
def reset(self): def reset(self):
""" """
Reset the message history. Reset the message history.
@ -108,7 +92,6 @@ class Worker:
def name(self): def name(self):
return self.ai_name return self.ai_name
def receieve(self, name: str, message: str) -> None: def receieve(self, name: str, message: str) -> None:
""" """
Receive a message and update the message history. Receive a message and update the message history.
@ -125,29 +108,8 @@ class Worker:
def add(self, task, priority=0): def add(self, task, priority=0):
self.task_queue.append((priority, task)) self.task_queue.append((priority, task))
# def process_task(self, task):
# try:
# result = self.agent.run([task])
# return result
# except Exception as error:
# error_message = f"Error while running task: {str(error)}"
# return error_message
# def process_tasks_parallel(self):
# futures = [
# self.executor.submit(
# self.process_task,
# task
# ) for _, task in self.task_queue
# ]
# concurrent.futures.wait(futures)
# results = [future.result() for future in futures]
# return results
# @log_decorator
# @error_decorator
# @timing_decorator
def setup_tools(self, external_tools): def setup_tools(self, external_tools):
""" """
Set up tools for the worker. Set up tools for the worker.
@ -179,6 +141,7 @@ class Worker:
if external_tools is not None: if external_tools is not None:
self.tools.extend(external_tools) self.tools.extend(external_tools)
def setup_memory(self): def setup_memory(self):
""" """
Set up memory for the worker. Set up memory for the worker.
@ -210,9 +173,9 @@ class Worker:
except Exception as error: except Exception as error:
raise RuntimeError(f"Error setting up agent: {error}") raise RuntimeError(f"Error setting up agent: {error}")
# @log_decorator @log_decorator
# @error_decorator @error_decorator
# @timing_decorator @timing_decorator
def run(self, task): def run(self, task):
""" """
Run the autonomous agent on a given task. Run the autonomous agent on a given task.
@ -229,9 +192,9 @@ class Worker:
except Exception as error: except Exception as error:
raise RuntimeError(f"Error while running agent: {error}") raise RuntimeError(f"Error while running agent: {error}")
# @log_decorator @log_decorator
# @error_decorator @error_decorator
# @timing_decorator @timing_decorator
def __call__(self, task): def __call__(self, task):
""" """
Make the worker callable to run the agent on a given task. Make the worker callable to run the agent on a given task.
@ -248,4 +211,89 @@ class Worker:
except Exception as error: except Exception as error:
raise RuntimeError(f"Error while running agent: {error}") raise RuntimeError(f"Error while running agent: {error}")
def health_check(self):
pass
@log_decorator
@error_decorator
@timing_decorator
def chat(
self,
msg: str = None,
streaming: bool = False
):
"""
Run chat
Args:
msg (str, optional): Message to send to the agent. Defaults to None.
language (str, optional): Language to use. Defaults to None.
streaming (bool, optional): Whether to stream the response. Defaults to False.
Returns:
str: Response from the agent
Usage:
--------------
agent = MultiModalAgent()
agent.chat("Hello")
"""
#add users message to the history
self.history.append(
Message(
"User",
msg
)
)
#process msg
try:
response = self.agent.run(msg)
#add agent's response to the history
self.history.append(
Message(
"Agent",
response
)
)
#if streaming is = True
if streaming:
return self._stream_response(response)
else:
response
except Exception as error:
error_message = f"Error processing message: {str(error)}"
#add error to history
self.history.append(
Message(
"Agent",
error_message
)
)
return error_message
def _stream_response(
self,
response: str = None
):
"""
Yield the response token by token (word by word)
Usage:
--------------
for token in _stream_response(response):
print(token)
"""
for token in response.split():
yield token

Loading…
Cancel
Save