@ -8,6 +8,7 @@ from langchain.docstore import InMemoryDocstore
from langchain . embeddings import OpenAIEmbeddings
from langchain_experimental . autonomous_agents import AutoGPT
from langchain . vectorstores import FAISS
from swarms . agents . tools . autogpt import (
FileChatMessageHistory ,
ReadFileTool ,
@ -19,31 +20,45 @@ from swarms.agents.tools.autogpt import (
web_search ,
)
# Constants
ROOT_DIR = " ./data/ "
# Logging configurations
logging . basicConfig ( level = logging . INFO , format = ' %(asctime)s - %(levelname)s - %(message)s ' )
class WorkerNodeInitializer :
""" Class to initialize and create autonomous agent instances as worker nodes. """
def __init__ ( self , openai_api_key : str , worker_name : str = " Swarm Worker AI Assistant " , * * kwargs ) :
class WorkerNodeInitializer :
""" Useful for when you need to spawn an autonomous agent instance as a worker to accomplish complex tasks, it can search the internet or spawn child multi-modality models to process and generate images and text or audio and so on """
def __init__ ( self ,
openai_api_key : str ,
llm : Optional [ Union [ InMemoryDocstore , ChatOpenAI ] ] = None ,
tools : Optional [ List [ Tool ] ] = None ,
embedding_size : Optional [ int ] = 1926 ,
worker_name : Optional [ str ] = " Swarm Worker AI Assistant " ,
worker_role : Optional [ str ] = " Assistant " ,
human_in_the_loop : Optional [ bool ] = False ,
search_kwargs : dict = { } ,
verbose : Optional [ bool ] = False ,
chat_history_file : str = " chat_history.txt " ) :
self . openai_api_key = openai_api_key
self . llm = kwargs . get ( ' llm ' , ChatOpenAI ( ) )
self . tools = kwargs . get ( ' tools ' , [ ReadFileTool ( ) , WriteFileTool ( ) ] )
self . llm = llm if llm is not None else ChatOpenAI ( )
self . tools = tools if tools is not None else [ ReadFileTool ( ) , WriteFileTool ( ) ]
# self.vectorstore = vectorstore
self . worker_name = worker_name
self . worker_role = kwargs . get ( ' worker_role ' , " Assistant " )
self . human_in_the_loop = kwargs . get ( ' human_in_the_loop ' , False )
self . search_kwargs = kwargs . get ( ' search_kwargs ' , { } )
self . chat_history_file = kwargs . get ( ' chat_history_file ' , " chat_history.txt " )
self . worker_role = worker_role
self . embedding_size = embedding_size
self . human_in_the_loop = human_in_the_loop
self . search_kwargs = search_kwargs
self . verbose = verbose
self . chat_history_file = chat_history_file
self . create_agent ( )
def create_agent ( self ) :
logging . info ( " Creating agent in WorkerNode " )
vectorstore = self . initialize_vectorstore ( )
try :
vectorstore = self . initialize_vectorstore ( )
self . agent = AutoGPT . from_llm_and_tools (
ai_name = self . worker_name ,
ai_role = self . worker_role ,
@ -55,75 +70,137 @@ class WorkerNodeInitializer:
)
except Exception as e :
logging . error ( f " Error while creating agent: { str ( e ) } " )
raise
raise e
def add_tool ( self , tool : Optional [ Tool ] = None ) :
tool = tool or DuckDuckGoSearchRun ( )
if tool is None :
tool = DuckDuckGoSearchRun ( )
if not isinstance ( tool , Tool ) :
logging . error ( " Tool must be an instance of Tool. " )
raise TypeError ( " Tool must be an instance of Tool. " )
self . tools . append ( tool )
def initialize_vectorstore ( self ) :
embeddings_model = OpenAIEmbeddings ( openai_api_key = self . openai_api_key )
embedding_size = 8192
index = faiss . IndexFlatL2 ( embedding_size )
return FAISS ( embeddings_model . embed_query , index , InMemoryDocstore ( { } ) , { } )
try :
embeddings_model = OpenAIEmbeddings ( openai_api_key = self . openai_api_key )
embedding_size = self . embedding_size
index = faiss . IndexFlatL2 ( embedding_size = embedding_size )
return FAISS ( embeddings_model . embed_query , index , InMemoryDocstore ( { } ) , { } )
except Exception as e :
logging . error ( f " Failed to initialize vector store: { e } " )
return None
def run ( self , prompt ) - > str :
if not prompt or not isinstance ( prompt , str ) :
raise ValueError ( " Prompt must be a non-empty string. " )
if not isinstance ( prompt , str ) :
logging . error ( " Prompt must be a string. " )
raise TypeError ( " Prompt must be a string. " )
if not prompt :
logging . error ( " Prompt is empty. " )
raise ValueError ( " Prompt is empty. " )
try :
self . agent . run ( [ prompt ] )
self . agent . run ( [ f " { prompt } " ] )
return " Task completed by WorkerNode "
except Exception as e :
logging . error ( f " Error running the agent: { str ( e ) } " )
raise
logging . error ( f " While running the agent: { str ( e ) } " )
raise e
class WorkerNode :
""" Main WorkerNode class to execute and manage tasks. """
def __init__ ( self ,
openai_api_key : str ,
temperature : int ,
llm : Optional [ Union [ InMemoryDocstore , ChatOpenAI ] ] = None ,
tools : Optional [ List [ Tool ] ] = None ,
# vectorstore: Optional[FAISS] = None,
# embedding_size: Optional[int] = 4026,
worker_name : Optional [ str ] = " Swarm Worker AI Assistant " ,
worker_role : Optional [ str ] = " Assistant " ,
human_in_the_loop : Optional [ bool ] = False ,
search_kwargs : dict = { } ,
verbose : Optional [ bool ] = False ,
chat_history_file : str = " chat_history.txt " ) :
def __init__ ( self , openai_api_key : str ) :
if not openai_api_key :
raise ValueError ( " OpenAI API key is required " )
raise ValueError ( " openai_api_key cannot be None " )
self . openai_api_key = openai_api_key
self . worker_node_initializer = WorkerNodeInitializer ( openai_api_key )
self . name = " Swarm Worker AI Assistant "
self . description = " A worker node that executes tasks "
self . name = worker_name # Added a name attribute
self . description = " A worker node that executes tasks " # Added a description attribute
self . embedding_size = embedding_size
def create_worker_node ( self , * * kwargs ) :
worker_name = kwargs . get ( ' worker_name ' , " Swarm Worker AI Assistant " )
llm_class = kwargs . get ( ' llm_class ' , ChatOpenAI )
def initialize_llm ( self , llm_class , temperature ) :
if not llm_class :
raise ValueError ( " llm_class cannot be None. " )
logging . error ( " llm_class cannot be none " )
raise ValueError ( " llm_class cannot be None " )
worker_tools = self . initialize_tools ( llm_class )
vectorstore = self . worker_node_initializer . initialize_vectorstore ( )
worker_node = WorkerNodeInitializer ( openai_api_key = self . openai_api_key , tools = worker_tools , vectorstore = vectorstore , ai_name = worker_name , * * kwargs )
return worker_node
try :
return llm_class ( openai_api_key = self . openai_api_key , temperature = temperature )
except Exception as e :
logging . error ( f " Failed to initialize language model: { e } " )
raise
def initialize_llm ( self , llm_class , temperature ) :
return llm_class ( openai_api_key = self . openai_api_key , temperature = temperature )
def initialize_tools ( self , llm_class ) :
llm = self . initialize_llm ( llm_class , temperature = 1.0 ) # default value for temperature
tools = [
web_search ,
WriteFileTool ( root_dir = ROOT_DIR ) ,
ReadFileTool ( root_dir = ROOT_DIR ) ,
process_csv ,
WebpageQATool ( qa_chain = load_qa_with_sources_chain ( llm ) ) ,
]
return tools
if not llm_class :
logging . error ( " llm_class not cannot be none " )
raise ValueError ( " llm_class cannot be none " )
try :
logging . info ( ' Creating WorkerNode ' )
llm = self . initialize_llm ( llm_class )
tools = [
web_search ,
WriteFileTool ( root_dir = ROOT_DIR ) ,
ReadFileTool ( root_dir = ROOT_DIR ) ,
process_csv ,
WebpageQATool ( qa_chain = load_qa_with_sources_chain ( llm ) ) ,
]
if not tools :
logging . error ( " Tools are not initialized " )
raise ValueError ( " Tools are not initialized " )
return tools
except Exception as e :
logging . error ( f " Failed to initialize tools: { e } " )
def create_worker_node ( self , worker_name , worker_role , human_in_the_loop , llm_class = ChatOpenAI , search_kwargs = { } , * * kwargs ) :
if not llm_class :
logging . error ( " llm_class cannot be None. " )
raise ValueError ( " llm_class cannot be None. " )
try :
worker_tools = self . initialize_tools ( llm_class )
vectorstore = self . worker_node_initializer . initialize_vectorstore ( )
worker_node = WorkerNodeInitializer (
openai_api_key = self . openai_api_key , # pass the openai_api_key
llm = self . initialize_llm ( llm_class ) ,
tools = worker_tools ,
vectorstore = vectorstore ,
ai_name = worker_name ,
ai_role = worker_role ,
human_in_the_loop = human_in_the_loop ,
search_kwargs = search_kwargs ,
)
worker_node . name = worker_name # Setting the name here
return worker_node
except Exception as e :
logging . error ( f " Failed to create worker node: { e } " )
raise
def worker_node ( openai_api_key ) :
""" Factory function to create a worker node. """
def worker_node ( openai_api_key ) :
if not openai_api_key :
logging . error ( " OpenAI API key is not provided " )
raise ValueError ( " OpenAI API key is required " )
node = WorkerNode ( openai_api_key )
return node . create_worker_node ( )
try :
worker_node = WorkerNode ( openai_api_key )
return worker_node . create_worker_node ( )
except Exception as e :
logging . error ( f " An error occured in worker_node: { e } " )
raise