process tasks

Former-commit-id: 5d9cee9660
group-chat
Kye 2 years ago
parent 742cc8d1dc
commit 991485fe10

@ -1,3 +1,4 @@
import concurrent.futures
import faiss import faiss
from langchain.chat_models import ChatOpenAI from langchain.chat_models import ChatOpenAI
from langchain.docstore import InMemoryDocstore from langchain.docstore import InMemoryDocstore
@ -24,7 +25,6 @@ class Worker:
Useful for when you need to spawn an autonomous agent instance as a worker to accomplish complex tasks, Useful for when you need to spawn an autonomous agent instance as a worker to accomplish complex tasks,
it can search the internet or spawn child multi-modality models to process and generate images and text or audio and so on it can search the internet or spawn child multi-modality models to process and generate images and text or audio and so on
Parameters: Parameters:
- `model_name` (str): The name of the language model to be used (default: "gpt-4"). - `model_name` (str): The name of the language model to be used (default: "gpt-4").
- `openai_api_key` (str): The OpenAI API key (optional). - `openai_api_key` (str): The OpenAI API key (optional).
@ -36,7 +36,6 @@ class Worker:
- `llm` (ChatOpenAI): Pre-initialized ChatOpenAI model instance (optional). - `llm` (ChatOpenAI): Pre-initialized ChatOpenAI model instance (optional).
- `openai` (bool): If True, use the OpenAI language model; otherwise, use `llm` (default: True). - `openai` (bool): If True, use the OpenAI language model; otherwise, use `llm` (default: True).
#Usage #Usage
``` ```
from swarms import Worker from swarms import Worker
@ -93,18 +92,62 @@ class Worker:
self.setup_memory() self.setup_memory()
self.setup_agent() self.setup_agent()
self.task_queue = []
self.executor = concurrent.futures.ThreadPoolExecutor()
def reset(self): def reset(self):
"""
Reset the message history.
"""
self.message_history = ["Here is the conversation so far"] self.message_history = ["Here is the conversation so far"]
def receieve(self, name: str, message: str) -> None: def receieve(self, name: str, message: str) -> None:
"""
Receive a message and update the message history.
Parameters:
- `name` (str): The name of the sender.
- `message` (str): The received message.
"""
self.message_history.append(f"{name}: {message}") self.message_history.append(f"{name}: {message}")
def add(self, task, priority=0):
self.task_queue.append((priority, task))
def process_task(self, task):
try:
result = self.agent.run([task])
return result
except Exception as error:
error_message = f"Error while running task: {str(error)}"
return error_message
def process_tasks_parallel(self):
futures = [
self.executor.submit(
self.process_task,
task
) for _, task in self.task_queue
]
concurrent.futures.wait(futures)
results = [future.result() for future in futures]
return results
@log_decorator @log_decorator
@error_decorator @error_decorator
@timing_decorator @timing_decorator
def setup_tools(self, external_tools): def setup_tools(self, external_tools):
""" """
Set up tools for the worker.
Parameters:
- `external_tools` (list): List of external tools (optional).
Example:
```
external_tools = [MyTool1(), MyTool2()] external_tools = [MyTool1(), MyTool2()]
worker = Worker(model_name="gpt-4", worker = Worker(model_name="gpt-4",
openai_api_key="my_key", openai_api_key="my_key",
@ -113,7 +156,7 @@ class Worker:
external_tools=external_tools, external_tools=external_tools,
human_in_the_loop=False, human_in_the_loop=False,
temperature=0.5) temperature=0.5)
```
""" """
self.tools = [ self.tools = [
WriteFileTool(root_dir=ROOT_DIR), WriteFileTool(root_dir=ROOT_DIR),
@ -128,6 +171,9 @@ class Worker:
self.tools.extend(external_tools) self.tools.extend(external_tools)
def setup_memory(self): def setup_memory(self):
"""
Set up memory for the worker.
"""
try: try:
embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key) embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
embedding_size = 4096 embedding_size = 4096
@ -139,6 +185,9 @@ class Worker:
def setup_agent(self): def setup_agent(self):
"""
Set up the autonomous agent.
"""
try: try:
self.agent = AutoGPT.from_llm_and_tools( self.agent = AutoGPT.from_llm_and_tools(
ai_name=self.ai_name, ai_name=self.ai_name,
@ -156,6 +205,15 @@ class Worker:
@error_decorator @error_decorator
@timing_decorator @timing_decorator
def run(self, task): def run(self, task):
"""
Run the autonomous agent on a given task.
Parameters:
- `task`: The task to be processed.
Returns:
- `result`: The result of the agent's processing.
"""
try: try:
result = self.agent.run([task]) result = self.agent.run([task])
return result return result
@ -166,6 +224,15 @@ class Worker:
@error_decorator @error_decorator
@timing_decorator @timing_decorator
def __call__(self, task): def __call__(self, task):
"""
Make the worker callable to run the agent on a given task.
Parameters:
- `task`: The task to be processed.
Returns:
- `results`: The results of the agent's processing.
"""
try: try:
results = self.agent.run([task]) results = self.agent.run([task])
return results return results

Loading…
Cancel
Save