stable worker autobot node with prototype autoscaler need task assignment and delegation

pull/53/head
Kye 1 year ago
parent 84adda80bf
commit 01e7e9b34c

@ -47,20 +47,34 @@
################ ################
from swarms import WorkerNode # from swarms import WorkerNode
# Your OpenAI API key # # Your OpenAI API key
api_key = "example key" # api_key = ""
# # Initialize a WorkerNode with your API key
# node = WorkerNode(api_key)
# # node.create_worker_node()
# Initialize a WorkerNode with your API key # # Define an objective
node = WorkerNode(api_key) # objective = "Please make a web GUI for using HTTP API server..."
node.create_worker_node()
# # Run the task
# task = node.run(objective)
# print(task)
##########
from swarms import AutoBot
# Define an objective
objective = "Please make a web GUI for using HTTP API server..."
# Run the task auto_bot = AutoBot(
task = node.run(objective) openai_api_key="",
ai_name="Optimus Prime",
)
print(task) task = "What were the winning boston marathon times for the past 5 years (ending in 2022)? Generate a table of the year, name, country of origin, and times."
response = auto_bot.run(task)
print(response)

@ -0,0 +1,9 @@
from swarms import AutoBot, AutoScaler
auto_scaler = AutoScaler()
auto_scaler.start()
for i in range(100):
auto_scaler.add_task(f"Task {i}")

@ -35,7 +35,7 @@ google-generativeai = "*"
torch = "*" torch = "*"
langchain-experimental = "*" langchain-experimental = "*"
playwright = "*" playwright = "*"
duckduckgo_search = "*" duckduckgo-search = "*"
faiss-cpu = "*" faiss-cpu = "*"
wget = "*" wget = "*"
httpx = "*" httpx = "*"

@ -121,7 +121,6 @@ google-generativeai
oceandb oceandb
langchain-experimental langchain-experimental
playwright playwright
duckduckgo_search
wget==3.2 wget==3.2
simpleaichat simpleaichat
httpx httpx
@ -133,3 +132,5 @@ google-search-results==2.4.2
Pillow Pillow
faiss-cpu faiss-cpu
openai openai
google-generativeai
duckduckgo-search

@ -1,14 +1,18 @@
#swarms #swarms
from swarms.orchestrator.autoscaler import AutoScaler
# worker # worker
# from swarms.workers.worker_node import WorkerNode from swarms.workers.worker_node import WorkerNode
from swarms.workers.autobot import AutoBot
#boss #boss
# from swarms.boss.boss_node import BossNode from swarms.boss.boss_node import BossNode
#models #models
from swarms.agents.models.anthropic import Anthropic from swarms.agents.models.anthropic import Anthropic
from swarms.agents.models.huggingface import HuggingFaceLLM from swarms.agents.models.huggingface import HuggingFaceLLM
from swarms.agents.models.palm import GooglePalm # from swarms.agents.models.palm import GooglePalm
from swarms.agents.models.petals import Petals from swarms.agents.models.petals import Petals
# from swarms.agents.models.openai import OpenAI from swarms.agents.models.openai import OpenAI

@ -4,7 +4,7 @@
#models #models
from swarms.agents.models.anthropic import Anthropic from swarms.agents.models.anthropic import Anthropic
from swarms.agents.models.huggingface import HuggingFaceLLM from swarms.agents.models.huggingface import HuggingFaceLLM
from swarms.agents.models.palm import GooglePalm # from swarms.agents.models.palm import GooglePalm
from swarms.agents.models.petals import Petals from swarms.agents.models.petals import Petals
from swarms.agents.models.openai import OpenAI from swarms.agents.models.openai import OpenAI

@ -1,6 +1,6 @@
import torch import torch
import logging import logging
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig from transformers import AutoModelForCausalLM, AutoTokenizer #,# BitsAndBytesConfig
class HuggingFaceLLM: class HuggingFaceLLM:
def __init__(self, model_id: str, device: str = None, max_length: int = 20, quantize: bool = False, quantization_config: dict = None): def __init__(self, model_id: str, device: str = None, max_length: int = 20, quantize: bool = False, quantization_config: dict = None):
@ -10,15 +10,15 @@ class HuggingFaceLLM:
self.max_length = max_length self.max_length = max_length
bnb_config = None bnb_config = None
if quantize: # if quantize:
if not quantization_config: # if not quantization_config:
quantization_config = { # quantization_config = {
'load_in_4bit': True, # 'load_in_4bit': True,
'bnb_4bit_use_double_quant': True, # 'bnb_4bit_use_double_quant': True,
'bnb_4bit_quant_type': "nf4", # 'bnb_4bit_quant_type': "nf4",
'bnb_4bit_compute_dtype': torch.bfloat16 # 'bnb_4bit_compute_dtype': torch.bfloat16
} # }
bnb_config = BitsAndBytesConfig(**quantization_config) # bnb_config = BitsAndBytesConfig(**quantization_config)
try: try:
self.tokenizer = AutoTokenizer.from_pretrained(self.model_id) self.tokenizer = AutoTokenizer.from_pretrained(self.model_id)

@ -1,187 +1,189 @@
from __future__ import annotations # from __future__ import annotations
import logging # import logging
from swarms.utils.logger import logger # from swarms.utils.logger import logger
from typing import Any, Callable, Dict, List, Optional # from typing import Any, Callable, Dict, List, Optional
from pydantic import BaseModel, root_validator # from pydantic import BaseModel, model_validator
from tenacity import ( # from tenacity import (
before_sleep_log, # before_sleep_log,
retry, # retry,
retry_if_exception_type, # retry_if_exception_type,
stop_after_attempt, # stop_after_attempt,
wait_exponential, # wait_exponential,
) # )
import google.generativeai as genai # import google.generativeai as palm
class GooglePalmError(Exception): # class GooglePalmError(Exception):
"""Error raised when there is an issue with the Google PaLM API.""" # """Error raised when there is an issue with the Google PaLM API."""
def _truncate_at_stop_tokens( # def _truncate_at_stop_tokens(
text: str, # text: str,
stop: Optional[List[str]], # stop: Optional[List[str]],
) -> str: # ) -> str:
"""Truncates text at the earliest stop token found.""" # """Truncates text at the earliest stop token found."""
if stop is None: # if stop is None:
return text # return text
for stop_token in stop: # for stop_token in stop:
stop_token_idx = text.find(stop_token) # stop_token_idx = text.find(stop_token)
if stop_token_idx != -1: # if stop_token_idx != -1:
text = text[:stop_token_idx] # text = text[:stop_token_idx]
return text # return text
def _response_to_result(response: genai.types.ChatResponse, stop: Optional[List[str]]) -> Dict[str, Any]:
"""Convert a PaLM chat response to a result dictionary.""" # def _response_to_result(response: palm.types.ChatResponse, stop: Optional[List[str]]) -> Dict[str, Any]:
result = { # """Convert a PaLM chat response to a result dictionary."""
"id": response.id, # result = {
"created": response.created, # "id": response.id,
"model": response.model, # "created": response.created,
"usage": { # "model": response.model,
"prompt_tokens": response.usage.prompt_tokens, # "usage": {
"completion_tokens": response.usage.completion_tokens, # "prompt_tokens": response.usage.prompt_tokens,
"total_tokens": response.usage.total_tokens, # "completion_tokens": response.usage.completion_tokens,
}, # "total_tokens": response.usage.total_tokens,
"choices": [], # },
} # "choices": [],
for choice in response.choices: # }
result["choices"].append({ # for choice in response.choices:
"text": _truncate_at_stop_tokens(choice.text, stop), # result["choices"].append({
"index": choice.index, # "text": _truncate_at_stop_tokens(choice.text, stop),
"finish_reason": choice.finish_reason, # "index": choice.index,
}) # "finish_reason": choice.finish_reason,
return result # })
# return result
def _messages_to_prompt_dict(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Convert a list of message dictionaries to a prompt dictionary.""" # def _messages_to_prompt_dict(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
prompt = {"messages": []} # """Convert a list of message dictionaries to a prompt dictionary."""
for message in messages: # prompt = {"messages": []}
prompt["messages"].append({ # for message in messages:
"role": message["role"], # prompt["messages"].append({
"content": message["content"], # "role": message["role"],
}) # "content": message["content"],
return prompt # })
# return prompt
def _create_retry_decorator() -> Callable[[Any], Any]:
"""Create a retry decorator with exponential backoff."""
return retry( # def _create_retry_decorator() -> Callable[[Any], Any]:
retry=retry_if_exception_type(GooglePalmError), # """Create a retry decorator with exponential backoff."""
stop=stop_after_attempt(5), # return retry(
wait=wait_exponential(multiplier=1, min=2, max=30), # retry=retry_if_exception_type(GooglePalmError),
before_sleep=before_sleep_log(logger, logging.DEBUG), # stop=stop_after_attempt(5),
reraise=True, # wait=wait_exponential(multiplier=1, min=2, max=30),
) # before_sleep=before_sleep_log(logger, logging.DEBUG),
# reraise=True,
####################### => main class # )
class GooglePalm(BaseModel):
"""Wrapper around Google's PaLM Chat API."""
# ####################### => main class
client: Any #: :meta private: # class GooglePalm(BaseModel):
model_name: str = "models/chat-bison-001" # """Wrapper around Google's PaLM Chat API."""
google_api_key: Optional[str] = None
temperature: Optional[float] = None # client: Any #: :meta private:
top_p: Optional[float] = None # model_name: str = "models/chat-bison-001"
top_k: Optional[int] = None # google_api_key: Optional[str] = None
n: int = 1 # temperature: Optional[float] = None
# top_p: Optional[float] = None
@root_validator() # top_k: Optional[int] = None
def validate_environment(cls, values: Dict) -> Dict: # n: int = 1
# Same as before
pass # @model_validator(mode="pre")
# def validate_environment(cls, values: Dict) -> Dict:
def chat_with_retry(self, **kwargs: Any) -> Any: # # Same as before
"""Use tenacity to retry the completion call.""" # pass
retry_decorator = _create_retry_decorator()
# def chat_with_retry(self, **kwargs: Any) -> Any:
@retry_decorator # """Use tenacity to retry the completion call."""
def _chat_with_retry(**kwargs: Any) -> Any: # retry_decorator = _create_retry_decorator()
return self.client.chat(**kwargs)
# @retry_decorator
return _chat_with_retry(**kwargs) # def _chat_with_retry(**kwargs: Any) -> Any:
# return self.client.chat(**kwargs)
async def achat_with_retry(self, **kwargs: Any) -> Any:
"""Use tenacity to retry the async completion call.""" # return _chat_with_retry(**kwargs)
retry_decorator = _create_retry_decorator()
# async def achat_with_retry(self, **kwargs: Any) -> Any:
@retry_decorator # """Use tenacity to retry the async completion call."""
async def _achat_with_retry(**kwargs: Any) -> Any: # retry_decorator = _create_retry_decorator()
return await self.client.chat_async(**kwargs)
# @retry_decorator
return await _achat_with_retry(**kwargs) # async def _achat_with_retry(**kwargs: Any) -> Any:
# return await self.client.chat_async(**kwargs)
def __call__(
self, # return await _achat_with_retry(**kwargs)
messages: List[Dict[str, Any]],
stop: Optional[List[str]] = None, # def __call__(
**kwargs: Any, # self,
) -> Dict[str, Any]: # messages: List[Dict[str, Any]],
prompt = _messages_to_prompt_dict(messages) # stop: Optional[List[str]] = None,
# **kwargs: Any,
response: genai.types.ChatResponse = self.chat_with_retry( # ) -> Dict[str, Any]:
model=self.model_name, # prompt = _messages_to_prompt_dict(messages)
prompt=prompt,
temperature=self.temperature, # response: palm.types.ChatResponse = self.chat_with_retry(
top_p=self.top_p, # model=self.model_name,
top_k=self.top_k, # prompt=prompt,
candidate_count=self.n, # temperature=self.temperature,
**kwargs, # top_p=self.top_p,
) # top_k=self.top_k,
# candidate_count=self.n,
return _response_to_result(response, stop) # **kwargs,
# )
def generate(
self, # return _response_to_result(response, stop)
messages: List[Dict[str, Any]],
stop: Optional[List[str]] = None, # def generate(
**kwargs: Any, # self,
) -> Dict[str, Any]: # messages: List[Dict[str, Any]],
prompt = _messages_to_prompt_dict(messages) # stop: Optional[List[str]] = None,
# **kwargs: Any,
response: genai.types.ChatResponse = self.chat_with_retry( # ) -> Dict[str, Any]:
model=self.model_name, # prompt = _messages_to_prompt_dict(messages)
prompt=prompt,
temperature=self.temperature, # response: palm.types.ChatResponse = self.chat_with_retry(
top_p=self.top_p, # model=self.model_name,
top_k=self.top_k, # prompt=prompt,
candidate_count=self.n, # temperature=self.temperature,
**kwargs, # top_p=self.top_p,
) # top_k=self.top_k,
# candidate_count=self.n,
return _response_to_result(response, stop) # **kwargs,
# )
async def _agenerate(
self, # return _response_to_result(response, stop)
messages: List[Dict[str, Any]],
stop: Optional[List[str]] = None, # async def _agenerate(
**kwargs: Any, # self,
) -> Dict[str, Any]: # messages: List[Dict[str, Any]],
prompt = _messages_to_prompt_dict(messages) # stop: Optional[List[str]] = None,
# **kwargs: Any,
response: genai.types.ChatResponse = await self.achat_with_retry( # ) -> Dict[str, Any]:
model=self.model_name, # prompt = _messages_to_prompt_dict(messages)
prompt=prompt,
temperature=self.temperature, # response: palm.types.ChatResponse = await self.achat_with_retry(
top_p=self.top_p, # model=self.model_name,
top_k=self.top_k, # prompt=prompt,
candidate_count=self.n, # temperature=self.temperature,
) # top_p=self.top_p,
# top_k=self.top_k,
return _response_to_result(response, stop) # candidate_count=self.n,
# )
@property # return _response_to_result(response, stop)
def _identifying_params(self) -> Dict[str, Any]:
"""Get the identifying parameters.""" # @property
return { # def _identifying_params(self) -> Dict[str, Any]:
"model_name": self.model_name, # """Get the identifying parameters."""
"temperature": self.temperature, # return {
"top_p": self.top_p, # "model_name": self.model_name,
"top_k": self.top_k, # "temperature": self.temperature,
"n": self.n, # "top_p": self.top_p,
} # "top_k": self.top_k,
# "n": self.n,
@property # }
def _llm_type(self) -> str:
return "google-palm-chat" # @property
# def _llm_type(self) -> str:
# return "google-palm-chat"

@ -132,7 +132,7 @@ class WebpageQATool(BaseTool):
query_website_tool = WebpageQATool(qa_chain=load_qa_with_sources_chain(llm)) query_website_tool = WebpageQATool(qa_chain=load_qa_with_sources_chain(llm))
# !pip install duckduckgo_search # !pip install duckduckgo_search
web_search = DuckDuckGoSearchRun() # web_search = DuckDuckGoSearchRun()
@ -140,3 +140,4 @@ web_search = DuckDuckGoSearchRun()
# # @tool # # @tool
# code_intepret = CodeInterpreter() # code_intepret = CodeInterpreter()

@ -0,0 +1,62 @@
import threading
import queue
from time import sleep
from swarms.workers.autobot import AutoBot
# TODO Handle task assignment and task delegation
# TODO: User task => decomposed into very small sub tasks => sub tasks assigned to workers => workers complete and update the swarm, can ask for help from other agents.
# TODO: Missing, Task Assignment, Task delegation, Task completion, Swarm level communication with vector db
class AutoScaler:
def __init__(self,
initial_agents=10,
scale_up_factor=1,
idle_threshold=0.2,
busy_threshold=0.7
):
self.agents_pool = [AutoBot() for _ in range(initial_agents)]
self.task_queue = queue.Queue()
self.scale_up_factor = scale_up_factor
self.idle_threshold = idle_threshold
self.lock = threading.Lock()
def add_task(self, task):
self.tasks_queue.put(task)
def scale_up(self):
with self.lock:
new_agents_counts = len(self.agents_pool) * self.scale_up_factor
for _ in range(new_agents_counts):
self.agents_pool.append(AutoBot())
def scale_down(self):
with self.lock:
if len(self.agents_pool) > 10: #ensure minmum of 10 agents
del self.agents_pool[-1] #remove last agent
def monitor_and_scale(self):
while True:
sleep(60)#check minute
pending_tasks = self.task_queue.qsize()
active_agents = sum([1 for agent in self.agents_pool if agent.is_busy()])
if pending_tasks / len(self.agents_pool) > self.busy_threshold:
self.scale_up()
elif active_agents / len(self.agents_pool) < self.idle_threshold:
self.scale_down()
def start(self):
monitor_thread = threading.Thread(target=self.monitor_and_scale)
monitor_thread.start()
while True:
task = self.task_queue.get()
if task:
available_agent = next((agent for agent in self.agents_pool))
if available_agent:
available_agent.run(task)

@ -0,0 +1,75 @@
import faiss
from langchain.chat_models import ChatOpenAI
from langchain.docstore import InMemoryDocstore
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain_experimental.autonomous_agents import AutoGPT
from swarms.agents.tools.autogpt import (
DuckDuckGoSearchRun,
FileChatMessageHistory,
ReadFileTool,
WebpageQATool,
WriteFileTool,
load_qa_with_sources_chain,
process_csv,
# web_search,
query_website_tool
)
ROOT_DIR = "./data/"
class AutoBot:
def __init__(self,
model_name="gpt-4",
openai_api_key=None,
ai_name="Autobot Swarm Worker",
ai_role="Worker in a swarm",
# embedding_size=None,
# k=None,
temperature=0.5):
self.openai_api_key = openai_api_key
self.temperature = temperature
self.llm = ChatOpenAI(model_name=model_name,
openai_api_key=self.openai_api_key,
temperature=self.temperature)
self.ai_name = ai_name
self.ai_role = ai_role
# self.embedding_size = embedding_size
# # self.k = k
self.setup_tools()
self.setup_memory()
self.setup_agent()
def setup_tools(self):
self.tools = [
WriteFileTool(root_dir=ROOT_DIR),
ReadFileTool(root_dir=ROOT_DIR),
process_csv,
query_website_tool,
]
def setup_memory(self):
embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
self.vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
def setup_agent(self):
self.agent = AutoGPT.from_llm_and_tools(
ai_name=self.ai_name,
ai_role=self.ai_role,
tools=self.tools,
llm=self.llm,
memory=self.vectorstore.as_retriever(search_kwargs={"k": 8}),
)
def run(self, task):
result = self.agent.run([task])
return result

@ -17,17 +17,17 @@ from swarms.agents.tools.autogpt import (
WriteFileTool, WriteFileTool,
load_qa_with_sources_chain, load_qa_with_sources_chain,
process_csv, process_csv,
web_search, # web_search,
)
from swarms.agents.tools.developer import (
code_editor_append,
code_editor_delete,
code_editor_patch,
code_editor_read,
code_editor_summary,
code_editor_write,
terminal_execute,
) )
# from swarms.agents.tools.developer import (
# code_editor_append,
# code_editor_delete,
# code_editor_patch,
# code_editor_read,
# code_editor_summary,
# code_editor_write,
# terminal_execute,
# )
ROOT_DIR = "./data/" ROOT_DIR = "./data/"
@ -182,8 +182,8 @@ class WorkerNode:
def initialize_vectorstore(self): def initialize_vectorstore(self):
try: try:
embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key) embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
embedding_size = self.embedding_size # embedding_size = self.embedding_size
index = faiss.IndexFlatL2(embedding_size) index = faiss.IndexFlatL2(self.embedding_size)
return FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {}) return FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
except Exception as e: except Exception as e:
@ -227,18 +227,11 @@ class WorkerNode:
llm = self.initialize_llm(llm_class, self.temperature) llm = self.initialize_llm(llm_class, self.temperature)
tools = [ tools = [
web_search, # web_search,
WriteFileTool(root_dir=ROOT_DIR), WriteFileTool(root_dir=ROOT_DIR),
ReadFileTool(root_dir=ROOT_DIR), ReadFileTool(root_dir=ROOT_DIR),
process_csv, process_csv,
WebpageQATool(qa_chain=load_qa_with_sources_chain(llm)), WebpageQATool(qa_chain=load_qa_with_sources_chain(llm)),
code_editor_append,
code_editor_delete,
code_editor_patch,
code_editor_read,
code_editor_summary,
code_editor_write,
terminal_execute,
] ]
if not tools: if not tools:
logging.error("Tools are not initialized") logging.error("Tools are not initialized")
@ -276,7 +269,7 @@ def worker_node(openai_api_key, objective):
try: try:
worker_node = WorkerNode(openai_api_key) worker_node = WorkerNode(openai_api_key)
worker_node.create_worker_node() # worker_node.create_worker_node()
return worker_node.run(objective) return worker_node.run(objective)
except Exception as e: except Exception as e:
logging.error(f"An error occured in worker_node: {e}") logging.error(f"An error occured in worker_node: {e}")

Loading…
Cancel
Save