"""Useful for when you need to spawn an autonomous agent instance as a worker to accomplish complex tasks, it can search the internet or spawn child multi-modality models to process and generate images and text or audio and so on"""
def__init__(self,llm,tools,vectorstore):
ifnotllmornottoolsornotvectorstore:
raiseValueError("llm, tools, and vectorstore cannot be None")
logging.error("llm, tools, and vectorstore cannot be None.")
raiseValueError("llm, tools, and vectorstore cannot be None.")
logging.error(f"Error while creating agent: {str(e)}")
raisee
defadd_tool(self,tool:Tool):
ifnotisinstance(tool,Tool):
raiseTypeError("Tool must be an instance of Tool")
logging.error("Tool must be an instance of Tool.")
raiseTypeError("Tool must be an instance of Tool.")
self.tools.append(tool)
defrun(self,prompt:str)->str:
ifnotisinstance(prompt,str):
raiseTypeError("Prompt must be a string")
logging.error("Prompt must be a string.")
raiseTypeError("Prompt must be a string.")
ifnotprompt:
raiseValueError("Prompt is empty")
logging.error("Prompt is empty.")
raiseValueError("Prompt is empty.")
try:
self.agent.run([f"{prompt}"])
return"Task completed by WorkerNode"
exceptExceptionase:
logging.error(f"While running the agent: {str(e)}")
raisee
# worker_tool = Tool(
# name="WorkerNode AI Agent",
# func=WorkerNode.run,
# description="Useful for when you need to spawn an autonomous agent instance as a worker to accomplish complex tasks, it can search the internet or spawn child multi-modality models to process and generate images and text or audio and so on"
logging.error(f"Failed to initialize vector store: {e}")
raise
defcreate_worker_node(self,llm_class=ChatOpenAI):
defcreate_worker_node(self,llm_class=ChatOpenAI, ai_name="Swarm Worker AI Assistant",ai_role="Assistant",human_in_the_loop=False,search_kwargs={},verbose=False):