parent
1e8137249a
commit
8bc642f78e
@ -1,97 +0,0 @@
|
||||
import os
|
||||
from typing import Callable, List
|
||||
|
||||
|
||||
class DialogueSimulator:
|
||||
"""
|
||||
Dialogue Simulator
|
||||
------------------
|
||||
|
||||
Args:
|
||||
------
|
||||
agents: List[Callable]
|
||||
max_iters: int
|
||||
name: str
|
||||
|
||||
Usage:
|
||||
------
|
||||
>>> from swarms import DialogueSimulator
|
||||
>>> from swarms.structs.agent import Agent
|
||||
>>> agents = Agent()
|
||||
>>> agents1 = Agent()
|
||||
>>> model = DialogueSimulator([agents, agents1], max_iters=10, name="test")
|
||||
>>> model.run("test")
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agents: List[Callable],
|
||||
max_iters: int = 10,
|
||||
name: str = None,
|
||||
):
|
||||
self.agents = agents
|
||||
self.max_iters = max_iters
|
||||
self.name = name
|
||||
|
||||
def run(self, message: str = None):
|
||||
"""Run the dialogue simulator"""
|
||||
try:
|
||||
step = 0
|
||||
if self.name and message:
|
||||
prompt = f"Name {self.name} and message: {message}"
|
||||
for agent in self.agents:
|
||||
agent.run(prompt)
|
||||
step += 1
|
||||
|
||||
while step < self.max_iters:
|
||||
speaker_idx = step % len(self.agents)
|
||||
speaker = self.agents[speaker_idx]
|
||||
speaker_message = speaker.run(prompt)
|
||||
|
||||
for receiver in self.agents:
|
||||
message_history = (
|
||||
f"Speaker Name: {speaker.name} and message:"
|
||||
f" {speaker_message}"
|
||||
)
|
||||
receiver.run(message_history)
|
||||
|
||||
print(f"({speaker.name}): {speaker_message}")
|
||||
print("\n")
|
||||
step += 1
|
||||
except Exception as error:
|
||||
print(f"Error running dialogue simulator: {error}")
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
f"DialogueSimulator({self.agents}, {self.max_iters},"
|
||||
f" {self.name})"
|
||||
)
|
||||
|
||||
def save_state(self):
|
||||
"""Save the state of the dialogue simulator"""
|
||||
try:
|
||||
if self.name:
|
||||
filename = f"{self.name}.txt"
|
||||
with open(filename, "w") as file:
|
||||
file.write(str(self))
|
||||
except Exception as error:
|
||||
print(f"Error saving state: {error}")
|
||||
|
||||
def load_state(self):
|
||||
"""Load the state of the dialogue simulator"""
|
||||
try:
|
||||
if self.name:
|
||||
filename = f"{self.name}.txt"
|
||||
with open(filename, "r") as file:
|
||||
return file.read()
|
||||
except Exception as error:
|
||||
print(f"Error loading state: {error}")
|
||||
|
||||
def delete_state(self):
|
||||
"""Delete the state of the dialogue simulator"""
|
||||
try:
|
||||
if self.name:
|
||||
filename = f"{self.name}.txt"
|
||||
os.remove(filename)
|
||||
except Exception as error:
|
||||
print(f"Error deleting state: {error}")
|
@ -1,307 +0,0 @@
|
||||
import logging
|
||||
import queue
|
||||
import threading
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import chromadb
|
||||
from chromadb.utils import embedding_functions
|
||||
|
||||
|
||||
class TaskStatus(Enum):
|
||||
QUEUED = 1
|
||||
RUNNING = 2
|
||||
COMPLETED = 3
|
||||
FAILED = 4
|
||||
|
||||
|
||||
class Orchestrator:
|
||||
"""
|
||||
The Orchestrator takes in an agent, worker, or boss as input
|
||||
then handles all the logic for
|
||||
- task creation,
|
||||
- task assignment,
|
||||
- and task compeletion.
|
||||
|
||||
And, the communication for millions of agents to chat with eachother through
|
||||
a vector database that each agent has access to chat with.
|
||||
|
||||
Each LLM agent chats with the orchestrator through a dedicated
|
||||
communication layer. The orchestrator assigns tasks to each LLM agent,
|
||||
which the agents then complete and return.
|
||||
|
||||
This setup allows for a high degree of flexibility, scalability, and robustness.
|
||||
|
||||
In the context of swarm LLMs, one could consider an **Omni-Vector Embedding Database
|
||||
for communication. This database could store and manage
|
||||
the high-dimensional vectors produced by each LLM agent.
|
||||
|
||||
Strengths: This approach would allow for similarity-based lookup and matching of
|
||||
LLM-generated vectors, which can be particularly useful for tasks that involve finding similar outputs or recognizing patterns.
|
||||
|
||||
Weaknesses: An Omni-Vector Embedding Database might add complexity to the system in terms of setup and maintenance.
|
||||
It might also require significant computational resources,
|
||||
depending on the volume of data being handled and the complexity of the vectors.
|
||||
The handling and transmission of high-dimensional vectors could also pose challenges
|
||||
in terms of network load.
|
||||
|
||||
# Orchestrator
|
||||
* Takes in an agent class with vector store,
|
||||
then handles all the communication and scales
|
||||
up a swarm with number of agents and handles task assignment and task completion
|
||||
|
||||
from swarms import OpenAI, Orchestrator, Swarm
|
||||
|
||||
orchestrated = Orchestrate(OpenAI, nodes=40) #handles all the task assignment and allocation and agent communication using a vectorstore as a universal communication layer and also handlles the task completion logic
|
||||
|
||||
Objective = "Make a business website for a marketing consultancy"
|
||||
|
||||
Swarms = Swarms(orchestrated, auto=True, Objective))
|
||||
```
|
||||
|
||||
In terms of architecture, the swarm might look something like this:
|
||||
|
||||
```
|
||||
(Orchestrator)
|
||||
/ \
|
||||
Tools + Vector DB -- (LLM Agent)---(Communication Layer) (Communication Layer)---(LLM Agent)-- Tools + Vector DB
|
||||
/ | | \
|
||||
(Task Assignment) (Task Completion) (Task Assignment) (Task Completion)
|
||||
|
||||
|
||||
###Usage
|
||||
```
|
||||
from swarms import Orchestrator
|
||||
|
||||
# Instantiate the Orchestrator with 10 agents
|
||||
orchestrator = Orchestrator(llm, agent_list=[llm]*10, task_queue=[])
|
||||
|
||||
# Add tasks to the Orchestrator
|
||||
tasks = [{"content": f"Write a short story about a {animal}."} for animal in ["cat", "dog", "bird", "fish", "lion", "tiger", "elephant", "giraffe", "monkey", "zebra"]]
|
||||
orchestrator.assign_tasks(tasks)
|
||||
|
||||
# Run the Orchestrator
|
||||
orchestrator.run()
|
||||
|
||||
# Retrieve the results
|
||||
for task in tasks:
|
||||
print(orchestrator.retrieve_result(id(task)))
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agent,
|
||||
agent_list: List[Any],
|
||||
task_queue: List[Any],
|
||||
collection_name: str = "swarm",
|
||||
api_key: str = None,
|
||||
model_name: str = None,
|
||||
embed_func=None,
|
||||
worker=None,
|
||||
):
|
||||
self.agent = agent
|
||||
self.agents = queue.Queue()
|
||||
|
||||
for _ in range(agent_list):
|
||||
self.agents.put(agent())
|
||||
|
||||
self.task_queue = queue.Queue()
|
||||
|
||||
self.chroma_client = chromadb.Client()
|
||||
|
||||
self.collection = self.chroma_client.create_collection(
|
||||
name=collection_name
|
||||
)
|
||||
|
||||
self.current_tasks = {}
|
||||
|
||||
self.lock = threading.Lock()
|
||||
self.condition = threading.Condition(self.lock)
|
||||
self.executor = ThreadPoolExecutor(
|
||||
max_workers=len(agent_list)
|
||||
)
|
||||
|
||||
self.embed_func = embed_func if embed_func else self.embed
|
||||
|
||||
# @abstractmethod
|
||||
|
||||
def assign_task(
|
||||
self, agent_id: int, task: Dict[str, Any]
|
||||
) -> None:
|
||||
"""Assign a task to a specific agent"""
|
||||
|
||||
while True:
|
||||
with self.condition:
|
||||
while not self.task_queue:
|
||||
self.condition.wait()
|
||||
agent = self.agents.get()
|
||||
task = self.task_queue.get()
|
||||
|
||||
try:
|
||||
result = self.worker.run(task["content"])
|
||||
|
||||
# using the embed method to get the vector representation of the result
|
||||
vector_representation = self.embed(
|
||||
result, self.api_key, self.model_name
|
||||
)
|
||||
|
||||
self.collection.add(
|
||||
embeddings=[vector_representation],
|
||||
documents=[str(id(task))],
|
||||
ids=[str(id(task))],
|
||||
)
|
||||
|
||||
logging.info(
|
||||
f"Task {id(str)} has been processed by agent"
|
||||
f" {id(agent)} with"
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
logging.error(
|
||||
f"Failed to process task {id(task)} by agent"
|
||||
f" {id(agent)}. Error: {error}"
|
||||
)
|
||||
finally:
|
||||
with self.condition:
|
||||
self.agents.put(agent)
|
||||
self.condition.notify()
|
||||
|
||||
def embed(self, input, api_key, model_name):
|
||||
openai = embedding_functions.OpenAIEmbeddingFunction(
|
||||
api_key=api_key, model_name=model_name
|
||||
)
|
||||
embedding = openai(input)
|
||||
return embedding
|
||||
|
||||
# @abstractmethod
|
||||
|
||||
def retrieve_results(self, agent_id: int) -> Any:
|
||||
"""Retrieve results from a specific agent"""
|
||||
|
||||
try:
|
||||
# Query the vector database for documents created by the agents
|
||||
results = self.collection.query(
|
||||
query_texts=[str(agent_id)], n_results=10
|
||||
)
|
||||
|
||||
return results
|
||||
except Exception as e:
|
||||
logging.error(
|
||||
f"Failed to retrieve results from agent {agent_id}."
|
||||
f" Error {e}"
|
||||
)
|
||||
raise
|
||||
|
||||
# @abstractmethod
|
||||
def update_vector_db(self, data) -> None:
|
||||
"""Update the vector database"""
|
||||
|
||||
try:
|
||||
self.collection.add(
|
||||
embeddings=[data["vector"]],
|
||||
documents=[str(data["task_id"])],
|
||||
ids=[str(data["task_id"])],
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(
|
||||
f"Failed to update the vector database. Error: {e}"
|
||||
)
|
||||
raise
|
||||
|
||||
# @abstractmethod
|
||||
|
||||
def get_vector_db(self):
|
||||
"""Retrieve the vector database"""
|
||||
return self.collection
|
||||
|
||||
def append_to_db(self, result: str):
|
||||
"""append the result of the swarm to a specifici collection in the database"""
|
||||
|
||||
try:
|
||||
self.collection.add(
|
||||
documents=[result], ids=[str(id(result))]
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(
|
||||
"Failed to append the agent output to database."
|
||||
f" Error: {e}"
|
||||
)
|
||||
raise
|
||||
|
||||
def run(self, objective: str):
|
||||
"""Runs"""
|
||||
if not objective or not isinstance(objective, str):
|
||||
logging.error("Invalid objective")
|
||||
raise ValueError("A valid objective is required")
|
||||
|
||||
try:
|
||||
self.task_queue.append(objective)
|
||||
|
||||
results = [
|
||||
self.assign_task(agent_id, task)
|
||||
for agent_id, task in zip(
|
||||
range(len(self.agents)), self.task_queue
|
||||
)
|
||||
]
|
||||
|
||||
for result in results:
|
||||
self.append_to_db(result)
|
||||
|
||||
logging.info(
|
||||
f"Successfully ran swarms with results: {results}"
|
||||
)
|
||||
return results
|
||||
except Exception as e:
|
||||
logging.error(f"An error occured in swarm: {e}")
|
||||
return None
|
||||
|
||||
def chat(self, sender_id: int, receiver_id: int, message: str):
|
||||
"""
|
||||
|
||||
Allows the agents to chat with eachother thrught the vectordatabase
|
||||
|
||||
# Instantiate the Orchestrator with 10 agents
|
||||
orchestrator = Orchestrator(
|
||||
llm,
|
||||
agent_list=[llm]*10,
|
||||
task_queue=[]
|
||||
)
|
||||
|
||||
# Agent 1 sends a message to Agent 2
|
||||
orchestrator.chat(sender_id=1, receiver_id=2, message="Hello, Agent 2!")
|
||||
|
||||
"""
|
||||
|
||||
message_vector = self.embed(
|
||||
message, self.api_key, self.model_name
|
||||
)
|
||||
|
||||
# store the mesage in the vector database
|
||||
self.collection.add(
|
||||
embeddings=[message_vector],
|
||||
documents=[message],
|
||||
ids=[f"{sender_id}_to_{receiver_id}"],
|
||||
)
|
||||
|
||||
self.run(
|
||||
objective=f"chat with agent {receiver_id} about {message}"
|
||||
)
|
||||
|
||||
def add_agents(self, num_agents: int):
|
||||
for _ in range(num_agents):
|
||||
self.agents.put(self.agent())
|
||||
self.executor = ThreadPoolExecutor(
|
||||
max_workers=self.agents.qsize()
|
||||
)
|
||||
|
||||
def remove_agents(self, num_agents):
|
||||
for _ in range(num_agents):
|
||||
if not self.agents.empty():
|
||||
self.agents.get()
|
||||
self.executor = ThreadPoolExecutor(
|
||||
max_workers=self.agents.qsize()
|
||||
)
|
@ -1,90 +0,0 @@
|
||||
from queue import Queue, PriorityQueue
|
||||
|
||||
|
||||
class SimpleSwarm:
|
||||
def __init__(
|
||||
self,
|
||||
llm,
|
||||
num_agents: int = None,
|
||||
openai_api_key: str = None,
|
||||
ai_name: str = None,
|
||||
rounds: int = 1,
|
||||
*args,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
|
||||
Usage:
|
||||
# Initialize the swarm with 5 agents, an API key, and a name for the AI model
|
||||
swarm = SimpleSwarm(num_agents=5, openai_api_key="YOUR_OPENAI_API_KEY", ai_name="Optimus Prime")
|
||||
|
||||
# Normal task without priority
|
||||
normal_task = "Describe the process of photosynthesis in simple terms."
|
||||
swarm.distribute_task(normal_task)
|
||||
|
||||
# Priority task; lower numbers indicate higher priority (e.g., 1 is higher priority than 2)
|
||||
priority_task = "Translate the phrase 'Hello World' to French."
|
||||
swarm.distribute_task(priority_task, priority=1)
|
||||
|
||||
# Run the tasks and gather the responses
|
||||
responses = swarm.run()
|
||||
|
||||
# Print responses
|
||||
for response in responses:
|
||||
print(response)
|
||||
|
||||
# Providing feedback to the system (this is a stubbed method and won't produce a visible effect, but serves as an example)
|
||||
swarm.provide_feedback("Improve translation accuracy.")
|
||||
|
||||
# Perform a health check on the agents (this is also a stubbed method, illustrating potential usage)
|
||||
swarm.health_check()
|
||||
|
||||
"""
|
||||
self.llm = llm
|
||||
self.agents = [self.llm for _ in range(num_agents)]
|
||||
self.task_queue = Queue()
|
||||
self.priority_queue = PriorityQueue()
|
||||
|
||||
def distribute(self, task: str = None, priority=None):
|
||||
"""Distribute a task to the agents"""
|
||||
if priority:
|
||||
self.priority_queue.put((priority, task))
|
||||
else:
|
||||
self.task_queue.put(task)
|
||||
|
||||
def _process_task(self, task):
|
||||
# TODO, Implement load balancing, fallback mechanism
|
||||
for worker in self.agents:
|
||||
response = worker.run(task)
|
||||
if response:
|
||||
return response
|
||||
return "All Agents failed"
|
||||
|
||||
def run(self):
|
||||
"""Run the simple swarm"""
|
||||
|
||||
responses = []
|
||||
|
||||
# process high priority tasks first
|
||||
while not self.priority_queue.empty():
|
||||
_, task = self.priority_queue.get()
|
||||
responses.append(self._process_task(task))
|
||||
|
||||
# process normal tasks
|
||||
while not self.task_queue.empty():
|
||||
task = self.task_queue.get()
|
||||
responses.append(self._process_task(task))
|
||||
|
||||
return responses
|
||||
|
||||
def run_old(self, task):
|
||||
responses = []
|
||||
|
||||
for worker in self.agents:
|
||||
response = worker.run(task)
|
||||
responses.append(response)
|
||||
|
||||
return responses
|
||||
|
||||
def __call__(self, task):
|
||||
return self.run(task)
|
Loading…
Reference in new issue