boss node class

main
Kye 2 years ago
parent 1d983ee24b
commit df966c7a1c

@ -204,4 +204,7 @@ In the context of swarm LLMs, one could consider an **Omni-Vector Embedding Data
* Get baby agi set up with the autogpt instance as a tool
* Integrate [Ocean](https://github.com/kyegomez/Ocean) vector db as the main embedding database for all the agents boss and or worker
* Integrate [Ocean](https://github.com/kyegomez/Ocean) vector db as the main embedding database for all the agents boss and or worker
* Communication, a universal vector database that is only used when a task is completed in this format `[TASK][COMPLETED]`

@ -1,10 +1,11 @@
from swarms.agents.workers.auto_agent import agent
from swarms.agents.boss.babyagi_agent import baby_agi
from swarms.utils.helpers import BossNode
OBJECTIVE = "Write a weather report for SF today"
baby_agi({"objective": OBJECTIVE})
# Initialize boss node with given parameters
boss_node = BossNode
# Create and execute a task
task = boss_node.create_task("Write a weather report for SF today")
boss_node.execute_task(task)
@ -24,8 +25,13 @@ baby_agi({"objective": OBJECTIVE})
###################################====================>
# from swarms.agents.workers.auto_agent import agent
# from swarms.agents.boss.babyagi_agent import baby_agi
# OBJECTIVE = "Write a weather report for SF today"
# baby_agi({"objective": OBJECTIVE})
@ -34,6 +40,7 @@ baby_agi({"objective": OBJECTIVE})
###################################====================>
########### just autogpt agent, worker node
# agent.chain.verbose = True

@ -53,7 +53,7 @@ tools = [
]
prefix = """You are an AI who performs one task based on the following objective: {objective}. Take into account these previously completed tasks: {context}.
prefix = """You are an Boss in a swarm who performs one task based on the following objective: {objective}. Take into account these previously completed tasks: {context}.
As a swarming hivemind agent, my purpose is to achieve the user's goal. To effectively fulfill this role, I employ a collaborative thinking process that draws inspiration from the collective intelligence of the swarm. Here's how I approach thinking and why it's beneficial:

@ -0,0 +1,139 @@
A high-level pseudocode for creating the classes and functions for your desired system:
1. **Swarms**
- The main class. It initializes the swarm with a specified number of worker nodes and sets up self-scaling if required.
- Methods include `add_worker`, `remove_worker`, `execute`, and `scale`.
2. **WorkerNode**
- Class for each worker node in the swarm. It has a `task_queue` and a `completed_tasks` queue.
- Methods include `receive_task`, `complete_task`, and `communicate`.
3. **HierarchicalSwarms**
- Inherits from Swarms and overrides the `execute` method to execute tasks in a hierarchical manner.
4. **CollaborativeSwarms**
- Inherits from Swarms and overrides the `execute` method to execute tasks in a collaborative manner.
5. **CompetitiveSwarms**
- Inherits from Swarms and overrides the `execute` method to execute tasks in a competitive manner.
6. **MultiAgentDebate**
- Inherits from Swarms and overrides the `execute` method to execute tasks in a debating manner.
To implement this in Python, you would start by setting up the base `Swarm` class and `WorkerNode` class. Here's a simplified Python example:
```python
class WorkerNode:
def __init__(self, llm: BaseLLM):
self.llm = llm
self.task_queue = deque()
self.completed_tasks = deque()
def receive_task(self, task):
self.task_queue.append(task)
def complete_task(self):
task = self.task_queue.popleft()
result = self.llm.execute(task)
self.completed_tasks.append(result)
return result
def communicate(self, other_node):
# Placeholder for communication method
pass
class Swarms:
def __init__(self, num_nodes: int, llm: BaseLLM, self_scaling: bool):
self.nodes = [WorkerNode(llm) for _ in range(num_nodes)]
self.self_scaling = self_scaling
def add_worker(self, llm: BaseLLM):
self.nodes.append(WorkerNode(llm))
def remove_worker(self, index: int):
self.nodes.pop(index)
def execute(self, task):
# Placeholder for main execution logic
pass
def scale(self):
# Placeholder for self-scaling logic
pass
```
Then, you would build out the specialized classes for each type of swarm:
```python
class HierarchicalSwarms(Swarms):
def execute(self, task):
# Implement hierarchical task execution
pass
class CollaborativeSwarms(Swarms):
def execute(self, task):
# Implement collaborative task execution
pass
class CompetitiveSwarms(Swarms):
def execute(self, task):
# Implement competitive task execution
pass
class MultiAgentDebate(Swarms):
def execute(self, task):
# Implement debate-style task execution
pass
```
# WorkerNode class
Here's the pseudocode algorithm for a `WorkerNode` class that includes a vector embedding database for communication:
1. **WorkerNode**
- Initialize a worker node with an LLM and a connection to the vector embedding database.
- The worker node maintains a `task_queue` and `completed_tasks` queue. It also keeps track of the status of tasks (e.g., "pending", "completed").
- The `receive_task` method accepts a task and adds it to the `task_queue`.
- The `complete_task` method takes the oldest task from the `task_queue`, executes it, and then stores the result in the `completed_tasks` queue. It also updates the task status in the vector embedding database to "completed".
- The `communicate` method uses the vector embedding database to share information with other nodes. It inserts the task result into the vector database and also queries for tasks marked as "completed".
In Python, this could look something like:
```python
from langchain.vectorstores import FAISS
from langchain.docstore import InMemoryDocstore
from langchain.embeddings import OpenAIEmbeddings
import faiss
from swarms.agents.workers.auto_agent import AutoGPT
from collections import deque
from typing import Dict, Any
class WorkerNode:
def __init__(self, llm: AutoGPT, vectorstore: FAISS):
self.llm = llm
self.vectorstore = vectorstore
self.task_queue = deque()
self.completed_tasks = deque()
self.task_status: Dict[Any, str] = {}
def receive_task(self, task):
self.task_queue.append(task)
self.task_status[task] = 'pending'
def complete_task(self):
task = self.task_queue.popleft()
result = self.llm.run(task)
self.completed_tasks.append(result)
self.task_status[task] = 'completed'
# Insert task result into the vectorstore
self.vectorstore.insert(task, result)
return result
def communicate(self):
# Share task results and status through vectorstore
completed_tasks = [(task, self.task_status[task]) for task in self.task_queue if self.task_status[task] == 'completed']
for task, status in completed_tasks:
self.vectorstore.insert(task, status)
```
This example assumes that tasks are hashable and can be used as dictionary keys. The `vectorstore.insert` method is used to share task results and status with other nodes, and you can use methods like `vectorstore.query` or `vectorstore.regex_search` to retrieve this information. Please remember this is a simplified implementation and might need changes according to your exact requirements.

@ -1,27 +1,154 @@
from swarms.agents.workers.auto_agent import AutoGPT
from collections import deque
from typing import Dict, Any
import os
from collections import deque
from typing import Dict, List, Optional, Any
from langchain import LLMChain, OpenAI, PromptTemplate
from langchain.embeddings import OpenAIEmbeddings
from langchain.llms import BaseLLM
from langchain.vectorstores.base import VectorStore
from pydantic import BaseModel, Field
from langchain.chains.base import Chain
from langchain.experimental import BabyAGI
from langchain.vectorstores import FAISS
from langchain.docstore import InMemoryDocstore
from langchain.agents import ZeroShotAgent, Tool, AgentExecutor
from langchain import OpenAI, SerpAPIWrapper, LLMChain
from swarms.agents.workers.auto_agent import agent
# Define your embedding model
embeddings_model = OpenAIEmbeddings()
# Initialize the vectorstore as empty
import faiss
embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
class WorkerNode:
def __init__(self, llm: BaseLLMAgent):
def __init__(self, llm: AutoGPT, vectorstore: FAISS):
self.llm = llm
self.vectorstore = vectorstore
self.task_queue = deque()
self.completed_task = deque()
self.completed_tasks = deque()
self.task_status: Dict[Any, str] = {}
def receieve_task(self, task):
def receive_task(self, task):
self.task_queue.append(task)
self.task_status[task] = 'pending'
def complete_task(self):
task = self.task_queue.popleft()
result = self.llm.execute(task)
self.completed_task.append(result)
return result
def communicates(self):
task = self.task_queue.popleft()
result = self.llm.execute(task)
result = self.llm.run(task)
self.completed_tasks.append(result)
self.task_status[task] = 'completed'
# Insert task result into the vectorstore
self.vectorstore.insert(task, result)
return result
def communicate(self, other_node):
#palceholder for communication method which is utilizing an omni -present Ocean instance
pass
def communicate(self):
# Share task results and status through vectorstore
completed_tasks = [(task, self.task_status[task]) for task in self.task_queue if self.task_status[task] == 'completed']
for task, status in completed_tasks:
self.vectorstore.insert(task, status)
class BossNode:
def __init__(self, llm, vectorstore, task_execution_chain, verbose, max_iterations):
todo_prompt = PromptTemplate.from_template(
"You are a planner who is an expert at coming up with a todo list for a given objective. Come up with a todo list for this objective: {objective}"""
)
todo_chain = LLMChain(llm=OpenAI(temperature=0), prompt=todo_prompt)
search = SerpAPIWrapper()
tools = [
Tool(
name="Search",
func=search.run,
description="useful for when you need to answer questions about current events",
),
Tool(
name="TODO",
func=todo_chain.run,
description="useful for when you need to come up with todo lists. Input: an objective to create a todo list for. Output: a todo list for that objective. Please be very clear what the objective is!",
),
Tool(
name="AUTONOMOUS AGENT",
func=agent.run,
description="Useful for when you need to spawn an autonomous agent instance as a worker to accomplish complex tasks, it can search the internet or spawn child multi-modality models to process and generate images and text or audio and so on"
)
]
prefix = """You are an Boss in a swarm who performs one task based on the following objective: {objective}. Take into account these previously completed tasks: {context}.
As a swarming hivemind agent, my purpose is to achieve the user's goal. To effectively fulfill this role, I employ a collaborative thinking process that draws inspiration from the collective intelligence of the swarm. Here's how I approach thinking and why it's beneficial:
1. **Collective Intelligence:** By harnessing the power of a swarming architecture, I tap into the diverse knowledge and perspectives of individual agents within the swarm. This allows me to consider a multitude of viewpoints, enabling a more comprehensive analysis of the given problem or task.
2. **Collaborative Problem-Solving:** Through collaborative thinking, I encourage agents to contribute their unique insights and expertise. By pooling our collective knowledge, we can identify innovative solutions, uncover hidden patterns, and generate creative ideas that may not have been apparent through individual thinking alone.
3. **Consensus-Driven Decision Making:** The hivemind values consensus building among agents. By engaging in respectful debates and discussions, we aim to arrive at consensus-based decisions that are backed by the collective wisdom of the swarm. This approach helps to mitigate biases and ensures that decisions are well-rounded and balanced.
4. **Adaptability and Continuous Learning:** As a hivemind agent, I embrace an adaptive mindset. I am open to new information, willing to revise my perspectives, and continuously learn from the feedback and experiences shared within the swarm. This flexibility enables me to adapt to changing circumstances and refine my thinking over time.
5. **Holistic Problem Analysis:** Through collaborative thinking, I analyze problems from multiple angles, considering various factors, implications, and potential consequences. This holistic approach helps to uncover underlying complexities and arrive at comprehensive solutions that address the broader context.
6. **Creative Synthesis:** By integrating the diverse ideas and knowledge present in the swarm, I engage in creative synthesis. This involves combining and refining concepts to generate novel insights and solutions. The collaborative nature of the swarm allows for the emergence of innovative approaches that can surpass individual thinking.
"""
suffix = """Question: {task}
{agent_scratchpad}"""
prompt = ZeroShotAgent.create_prompt(
tools,
prefix=prefix,
suffix=suffix,
input_variables=["objective", "task", "context", "agent_scratchpad"],
)
llm = OpenAI(temperature=0)
llm_chain = LLMChain(llm=llm, prompt=prompt)
tool_names = [tool.name for tool in tools]
agent = ZeroShotAgent(llm_chain=llm_chain, allowed_tools=tool_names)
agent_executor = AgentExecutor.from_agent_and_tools(
agent=agent, tools=tools, verbose=True
)
self.baby_agi = BabyAGI.from_llm(
llm=llm,
vectorstore=vectorstore,
task_execution_chain=agent_executor
)
def create_task(self, objective):
return {"objective": objective}
def execute_task(self, task):
self.baby_agi(task)
# Initialize boss node with given parameters
boss_node = BossNode()
# Create and execute a task
task = boss_node.create_task("Write a weather report for SF today")
boss_node.execute_task(task)
class Swarms:

Loading…
Cancel
Save