From f58509e7ac6c5da8bb52661059c0291417b367dc Mon Sep 17 00:00:00 2001 From: Kye Date: Wed, 9 Aug 2023 18:26:30 -0400 Subject: [PATCH] cleaned up worker node Former-commit-id: 45f9080b4fbdb5f4cf58cc94dd7743e4974dcc8d --- swarms/workers/worker_node.py | 187 ++++++++++------------------------ 1 file changed, 52 insertions(+), 135 deletions(-) diff --git a/swarms/workers/worker_node.py b/swarms/workers/worker_node.py index e066ef6c..fdf110af 100644 --- a/swarms/workers/worker_node.py +++ b/swarms/workers/worker_node.py @@ -6,10 +6,8 @@ from langchain.agents import Tool from langchain.chat_models import ChatOpenAI from langchain.docstore import InMemoryDocstore from langchain.embeddings import OpenAIEmbeddings -# TODO: refactor to Agent class from langchain_experimental.autonomous_agents import AutoGPT from langchain.vectorstores import FAISS - from swarms.agents.tools.autogpt import ( FileChatMessageHistory, ReadFileTool, @@ -21,49 +19,31 @@ from swarms.agents.tools.autogpt import ( web_search, ) +# Constants ROOT_DIR = "./data/" +# Logging configurations logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - class WorkerNodeInitializer: - """Useful for when you need to spawn an autonomous agent instance as a worker to accomplish complex tasks, it can search the internet or spawn child multi-modality models to process and generate images and text or audio and so on""" - def __init__(self, - openai_api_key: str, - llm: Optional[Union[InMemoryDocstore, ChatOpenAI]] = None, - tools: Optional[List[Tool]] = None, - # vectorstore: Optional[FAISS] = None, - # embedding_size: Optional[int] = 1926, - worker_name: Optional[str] = "Swarm Worker AI Assistant", - worker_role: Optional[str] = "Assistant", - human_in_the_loop: Optional[bool] = False, - search_kwargs: dict = {}, - verbose: Optional[bool] = False, - chat_history_file: str = "chat_history.txt"): + """Class to initialize and create autonomous agent instances as worker nodes.""" + + def __init__(self, openai_api_key: str, worker_name: str = "Swarm Worker AI Assistant", **kwargs): self.openai_api_key = openai_api_key - self.llm = llm if llm is not None else ChatOpenAI() - self.tools = tools if tools is not None else [ReadFileTool(), WriteFileTool()] - # self.vectorstore = vectorstore - - # Initializing agent in the constructor + self.llm = kwargs.get('llm', ChatOpenAI()) + self.tools = kwargs.get('tools', [ReadFileTool(), WriteFileTool()]) self.worker_name = worker_name - self.worker_role = worker_role - # self.embedding_size = embedding_size - self.human_in_the_loop = human_in_the_loop - self.search_kwargs = search_kwargs - self.verbose = verbose - self.chat_history_file = chat_history_file + self.worker_role = kwargs.get('worker_role', "Assistant") + self.human_in_the_loop = kwargs.get('human_in_the_loop', False) + self.search_kwargs = kwargs.get('search_kwargs', {}) + self.chat_history_file = kwargs.get('chat_history_file', "chat_history.txt") self.create_agent() def create_agent(self): - logging.info("Creating agent in WorkerNode") + vectorstore = self.initialize_vectorstore() try: - # if self.vectorstore is None: - # raise ValueError("Vectorstore is not initialized in WorkerNodeInitializer") - vectorstore = self.initialize_vectorstore() - self.agent = AutoGPT.from_llm_and_tools( ai_name=self.worker_name, ai_role=self.worker_role, @@ -73,140 +53,77 @@ class WorkerNodeInitializer: human_in_the_loop=self.human_in_the_loop, chat_history_memory=FileChatMessageHistory(self.chat_history_file), ) - # self.agent.chain.verbose = verbose except Exception as e: logging.error(f"Error while creating agent: {str(e)}") - raise e + raise def add_tool(self, tool: Optional[Tool] = None): - if tool is None: - tool = DuckDuckGoSearchRun() + tool = tool or DuckDuckGoSearchRun() if not isinstance(tool, Tool): - logging.error("Tool must be an instance of Tool.") raise TypeError("Tool must be an instance of Tool.") self.tools.append(tool) def initialize_vectorstore(self): - try: - embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key) - embedding_size = 8192 - index = faiss.IndexFlatL2(embedding_size=embedding_size) - return FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {}) - - except Exception as e: - logging.error(f"Failed to initialize vector store: {e}") - return None + embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key) + embedding_size = 8192 + index = faiss.IndexFlatL2(embedding_size) + return FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {}) def run(self, prompt) -> str: - if not isinstance(prompt, str): - logging.error("Prompt must be a string.") - raise TypeError("Prompt must be a string.") - - if not prompt: - logging.error("Prompt is empty.") - raise ValueError("Prompt is empty.") + if not prompt or not isinstance(prompt, str): + raise ValueError("Prompt must be a non-empty string.") try: - self.agent.run([f"{prompt}"]) + self.agent.run([prompt]) return "Task completed by WorkerNode" except Exception as e: - logging.error(f"While running the agent: {str(e)}") - raise e + logging.error(f"Error running the agent: {str(e)}") + raise class WorkerNode: - def __init__(self, - openai_api_key: str, - temperature: int, - llm: Optional[Union[InMemoryDocstore, ChatOpenAI]] = None, - tools: Optional[List[Tool]] = None, - # vectorstore: Optional[FAISS] = None, - # embedding_size: Optional[int] = 4026, - worker_name: Optional[str] = "Swarm Worker AI Assistant", - worker_role: Optional[str] = "Assistant", - human_in_the_loop: Optional[bool] = False, - search_kwargs: dict = {}, - verbose: Optional[bool] = False, - chat_history_file: str = "chat_history.txt"): + """Main WorkerNode class to execute and manage tasks.""" + def __init__(self, openai_api_key: str): if not openai_api_key: - raise ValueError("openai_api_key cannot be None") - + raise ValueError("OpenAI API key is required") self.openai_api_key = openai_api_key self.worker_node_initializer = WorkerNodeInitializer(openai_api_key) - self.name = worker_name # Added a name attribute - self.description = "A worker node that executes tasks" # Added a description attribute - # self.embedding_size = embedding_size - + self.name = "Swarm Worker AI Assistant" + self.description = "A worker node that executes tasks" - def initialize_llm(self, llm_class, temperature): - if not llm_class: - logging.error("llm_class cannot be none") - raise ValueError("llm_class cannot be None") + def create_worker_node(self, **kwargs): + worker_name = kwargs.get('worker_name', "Swarm Worker AI Assistant") + llm_class = kwargs.get('llm_class', ChatOpenAI) - try: - return llm_class(openai_api_key=self.openai_api_key, temperature=temperature) - except Exception as e: - logging.error(f"Failed to initialize language model: {e}") - raise - - - def initialize_tools(self, llm_class): - if not llm_class: - logging.error("llm_class not cannot be none") - raise ValueError("llm_class cannot be none") - try: - logging.info('Creating WorkerNode') - llm = self.initialize_llm(llm_class) - - tools = [ - web_search, - WriteFileTool(root_dir=ROOT_DIR), - ReadFileTool(root_dir=ROOT_DIR), - process_csv, - WebpageQATool(qa_chain=load_qa_with_sources_chain(llm)), - ] - if not tools: - logging.error("Tools are not initialized") - raise ValueError("Tools are not initialized") - return tools - except Exception as e: - logging.error(f"Failed to initialize tools: {e}") - - def create_worker_node(self, worker_name, worker_role, human_in_the_loop, llm_class=ChatOpenAI, search_kwargs={}, **kwargs): if not llm_class: - logging.error("llm_class cannot be None.") raise ValueError("llm_class cannot be None.") - try: - worker_tools = self.initialize_tools(llm_class) - vectorstore = self.worker_node_initializer.initialize_vectorstore() - worker_node = WorkerNodeInitializer( - openai_api_key=self.openai_api_key, # pass the openai_api_key - llm=self.initialize_llm(llm_class), - tools=worker_tools, - vectorstore=vectorstore, - ai_name=worker_name, - ai_role=worker_role, - human_in_the_loop=human_in_the_loop, - search_kwargs=search_kwargs, - ) - worker_node.name = worker_name # Setting the name here - return worker_node - except Exception as e: - logging.error(f"Failed to create worker node: {e}") - raise + + worker_tools = self.initialize_tools(llm_class) + vectorstore = self.worker_node_initializer.initialize_vectorstore() + worker_node = WorkerNodeInitializer(openai_api_key=self.openai_api_key, tools=worker_tools, vectorstore=vectorstore, ai_name=worker_name, **kwargs) + return worker_node + def initialize_llm(self, llm_class, temperature): + return llm_class(openai_api_key=self.openai_api_key, temperature=temperature) + def initialize_tools(self, llm_class): + llm = self.initialize_llm(llm_class, temperature=1.0) # default value for temperature + tools = [ + web_search, + WriteFileTool(root_dir=ROOT_DIR), + ReadFileTool(root_dir=ROOT_DIR), + process_csv, + WebpageQATool(qa_chain=load_qa_with_sources_chain(llm)), + ] + return tools def worker_node(openai_api_key): + """Factory function to create a worker node.""" + if not openai_api_key: - logging.error("OpenAI API key is not provided") raise ValueError("OpenAI API key is required") - try: - worker_node = WorkerNode(openai_api_key) - return worker_node.create_worker_node() - except Exception as e: - logging.error(f"An error occured in worker_node: {e}") - raise \ No newline at end of file + node = WorkerNode(openai_api_key) + return node.create_worker_node()