diff --git a/swarms/workers/worker.py b/swarms/workers/worker.py index 95499638..0d2686b0 100644 --- a/swarms/workers/worker.py +++ b/swarms/workers/worker.py @@ -1,3 +1,4 @@ +import concurrent.futures import faiss from langchain.chat_models import ChatOpenAI from langchain.docstore import InMemoryDocstore @@ -24,7 +25,6 @@ class Worker: Useful for when you need to spawn an autonomous agent instance as a worker to accomplish complex tasks, it can search the internet or spawn child multi-modality models to process and generate images and text or audio and so on - Parameters: - `model_name` (str): The name of the language model to be used (default: "gpt-4"). - `openai_api_key` (str): The OpenAI API key (optional). @@ -35,7 +35,6 @@ class Worker: - `temperature` (float): The temperature parameter for response generation (default: 0.5). - `llm` (ChatOpenAI): Pre-initialized ChatOpenAI model instance (optional). - `openai` (bool): If True, use the OpenAI language model; otherwise, use `llm` (default: True). - #Usage ``` @@ -92,19 +91,63 @@ class Worker: self.setup_tools(external_tools) self.setup_memory() self.setup_agent() + + self.task_queue = [] + self.executor = concurrent.futures.ThreadPoolExecutor() + + def reset(self): + """ + Reset the message history. + """ self.message_history = ["Here is the conversation so far"] def receieve(self, name: str, message: str) -> None: + """ + Receive a message and update the message history. + + Parameters: + - `name` (str): The name of the sender. + - `message` (str): The received message. + """ self.message_history.append(f"{name}: {message}") + def add(self, task, priority=0): + self.task_queue.append((priority, task)) + + def process_task(self, task): + try: + result = self.agent.run([task]) + return result + except Exception as error: + error_message = f"Error while running task: {str(error)}" + return error_message + + def process_tasks_parallel(self): + futures = [ + self.executor.submit( + self.process_task, + task + ) for _, task in self.task_queue + ] + concurrent.futures.wait(futures) + results = [future.result() for future in futures] + return results + @log_decorator @error_decorator @timing_decorator def setup_tools(self, external_tools): """ + Set up tools for the worker. + + Parameters: + - `external_tools` (list): List of external tools (optional). + + Example: + ``` external_tools = [MyTool1(), MyTool2()] worker = Worker(model_name="gpt-4", openai_api_key="my_key", @@ -113,7 +156,7 @@ class Worker: external_tools=external_tools, human_in_the_loop=False, temperature=0.5) - + ``` """ self.tools = [ WriteFileTool(root_dir=ROOT_DIR), @@ -128,6 +171,9 @@ class Worker: self.tools.extend(external_tools) def setup_memory(self): + """ + Set up memory for the worker. + """ try: embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key) embedding_size = 4096 @@ -139,6 +185,9 @@ class Worker: def setup_agent(self): + """ + Set up the autonomous agent. + """ try: self.agent = AutoGPT.from_llm_and_tools( ai_name=self.ai_name, @@ -156,6 +205,15 @@ class Worker: @error_decorator @timing_decorator def run(self, task): + """ + Run the autonomous agent on a given task. + + Parameters: + - `task`: The task to be processed. + + Returns: + - `result`: The result of the agent's processing. + """ try: result = self.agent.run([task]) return result @@ -166,8 +224,17 @@ class Worker: @error_decorator @timing_decorator def __call__(self, task): + """ + Make the worker callable to run the agent on a given task. + + Parameters: + - `task`: The task to be processed. + + Returns: + - `results`: The results of the agent's processing. + """ try: results = self.agent.run([task]) return results except Exception as error: - raise RuntimeError(f"Error while running agent: {error}") \ No newline at end of file + raise RuntimeError(f"Error while running agent: {error}")