@ -15,7 +15,7 @@ class Swarms:
# Initialize language model
return llm_class ( openai_api_key = self . openai_api_key , temperature = temperature )
def initialize_tools ( self , llm_class ):
def initialize_tools ( self , llm_class , worker_node ):
llm = self . initialize_llm ( llm_class )
# Initialize tools
web_search = DuckDuckGoSearchRun ( )
@ -29,6 +29,7 @@ class Swarms:
# RequestsGet()
Tool ( name = " RequestsGet " , func = RequestsGet . get , description = " A portal to the internet, Use this when you need to get specific content from a website. Input should be a url (i.e. https://www.google.com). The output will be the text response of the GET request. " ) ,
Tool ( name = " WorkerNode AI Agent " , func = worker_node . run , description = " Input: an objective with a todo list for that objective. Output: your task completed: Please be very clear what the objective and task instructions are. The Swarm worker agent is Useful for when you need to spawn an autonomous agent instance as a worker to accomplish any complex tasks, it can search the internet or write code or spawn child multi-modality models to process and generate images and text or audio and so on " )
# CodeEditor,
@ -37,6 +38,7 @@ class Swarms:
# ExitConversation
#code editor + terminal editor + visual agent
# Give the worker node itself as a tool
]
assert tools is not None , " tools is not initialized "
@ -76,19 +78,37 @@ class Swarms:
return BossNode ( llm , vectorstore , agent_executor , max_iterations = 5 )
def run_swarms ( self , objective ):
try :
# Run the swarm with the given objective
worker_tools = self . initialize_tools ( OpenAI )
assert worker_tools is not None , " worker_tools is not initialized "
def run_swarms ( self , objective , run_as = " boss " ):
# try :
# # Run the swarm with the given objective
# worker_tools = self.initialize_tools(OpenAI)
# assert worker_tools is not None, "worker_tools is not initialized "
vectorstore = self . initialize_vectorstore ( )
worker_node = self . initialize_worker_node ( worker_tools , vectorstore )
# vectorstore = self.initialize_vectorstore()
# worker_node = self.initialize_worker_node(worker_tools, vectorstore)
boss_node = self . initialize_boss_node ( vectorstore , worker_node )
# boss_node = self.initialize_boss_node(vectorstore, worker_node)
task = boss_node . create_task ( objective )
return boss_node . execute_task ( task )
# task = boss_node.create_task(objective)
# return boss_node.execute_task(task)
# except Exception as e:
# logging.error(f"An error occurred in run_swarms: {e}")
# raise
#===============> optional approach
try :
# Run the swarm with the given objective
vectorstore = self . initialize_vectorstore ( )
worker_node = self . initialize_worker_node ( [ ] , vectorstore ) # Initialize with an empty tool list
worker_tools = self . initialize_tools ( OpenAI , worker_node ) # Now the worker_node instance exists and can be passed to initialize_tools
worker_node . add_tool ( worker_tools [ - 1 ] ) # Add the self-reference tool to the actual worker_node
if run_as . lower ( ) == ' worker ' :
tool_input = { ' prompt ' : objective }
return worker_node . run ( tool_input )
else :
boss_node = self . initialize_boss_node ( vectorstore , worker_node )
task = boss_node . create_task ( objective )
return boss_node . execute_task ( task )
except Exception as e :
logging . error ( f " An error occurred in run_swarms: { e } " )
raise
@ -98,6 +118,17 @@ class Swarms:
# usage
def swarm ( api_key , objective ) :
swarms = Swarms ( api_key )
return swarms . run_swarms ( objective )
# # Use the function
# api_key = "APIKEY"
# objective = "What is the capital of the UK?"
# result = swarm(api_key, objective)
# print(result) # Prints: "The capital of the UK is London."
@ -214,20 +245,6 @@ class Swarms:
# usage
def swarm ( api_key , objective ) :
swarms = Swarms ( api_key )
return swarms . run_swarms ( objective )
# # Use the function
# api_key = "APIKEY"
# objective = "What is the capital of the UK?"
# result = swarm(api_key, objective)
# print(result) # Prints: "The capital of the UK is London."