stable worker autobot node with prototype autoscaler need task assignment and delegation

Former-commit-id: 01e7e9b34c
group-chat
Kye 1 year ago
parent 9adf7b6084
commit d0d51a5975

@ -47,20 +47,34 @@
################
from swarms import WorkerNode
# from swarms import WorkerNode
# Your OpenAI API key
api_key = "example key"
# # Your OpenAI API key
# api_key = ""
# # Initialize a WorkerNode with your API key
# node = WorkerNode(api_key)
# # node.create_worker_node()
# Initialize a WorkerNode with your API key
node = WorkerNode(api_key)
node.create_worker_node()
# # Define an objective
# objective = "Please make a web GUI for using HTTP API server..."
# # Run the task
# task = node.run(objective)
# print(task)
##########
from swarms import AutoBot
# Define an objective
objective = "Please make a web GUI for using HTTP API server..."
# Run the task
task = node.run(objective)
auto_bot = AutoBot(
openai_api_key="",
ai_name="Optimus Prime",
)
print(task)
task = "What were the winning boston marathon times for the past 5 years (ending in 2022)? Generate a table of the year, name, country of origin, and times."
response = auto_bot.run(task)
print(response)

@ -0,0 +1,9 @@
from swarms import AutoBot, AutoScaler
auto_scaler = AutoScaler()
auto_scaler.start()
for i in range(100):
auto_scaler.add_task(f"Task {i}")

@ -35,7 +35,7 @@ google-generativeai = "*"
torch = "*"
langchain-experimental = "*"
playwright = "*"
duckduckgo_search = "*"
duckduckgo-search = "*"
faiss-cpu = "*"
wget = "*"
httpx = "*"

@ -121,7 +121,6 @@ google-generativeai
oceandb
langchain-experimental
playwright
duckduckgo_search
wget==3.2
simpleaichat
httpx
@ -132,4 +131,6 @@ redis
google-search-results==2.4.2
Pillow
faiss-cpu
openai
openai
google-generativeai
duckduckgo-search

@ -1,14 +1,18 @@
#swarms
from swarms.orchestrator.autoscaler import AutoScaler
# worker
# from swarms.workers.worker_node import WorkerNode
from swarms.workers.worker_node import WorkerNode
from swarms.workers.autobot import AutoBot
#boss
# from swarms.boss.boss_node import BossNode
from swarms.boss.boss_node import BossNode
#models
from swarms.agents.models.anthropic import Anthropic
from swarms.agents.models.huggingface import HuggingFaceLLM
from swarms.agents.models.palm import GooglePalm
# from swarms.agents.models.palm import GooglePalm
from swarms.agents.models.petals import Petals
# from swarms.agents.models.openai import OpenAI
from swarms.agents.models.openai import OpenAI

@ -4,7 +4,7 @@
#models
from swarms.agents.models.anthropic import Anthropic
from swarms.agents.models.huggingface import HuggingFaceLLM
from swarms.agents.models.palm import GooglePalm
# from swarms.agents.models.palm import GooglePalm
from swarms.agents.models.petals import Petals
from swarms.agents.models.openai import OpenAI

@ -1,6 +1,6 @@
import torch
import logging
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from transformers import AutoModelForCausalLM, AutoTokenizer #,# BitsAndBytesConfig
class HuggingFaceLLM:
def __init__(self, model_id: str, device: str = None, max_length: int = 20, quantize: bool = False, quantization_config: dict = None):
@ -10,15 +10,15 @@ class HuggingFaceLLM:
self.max_length = max_length
bnb_config = None
if quantize:
if not quantization_config:
quantization_config = {
'load_in_4bit': True,
'bnb_4bit_use_double_quant': True,
'bnb_4bit_quant_type': "nf4",
'bnb_4bit_compute_dtype': torch.bfloat16
}
bnb_config = BitsAndBytesConfig(**quantization_config)
# if quantize:
# if not quantization_config:
# quantization_config = {
# 'load_in_4bit': True,
# 'bnb_4bit_use_double_quant': True,
# 'bnb_4bit_quant_type': "nf4",
# 'bnb_4bit_compute_dtype': torch.bfloat16
# }
# bnb_config = BitsAndBytesConfig(**quantization_config)
try:
self.tokenizer = AutoTokenizer.from_pretrained(self.model_id)

@ -1,187 +1,189 @@
from __future__ import annotations
import logging
from swarms.utils.logger import logger
from typing import Any, Callable, Dict, List, Optional
from pydantic import BaseModel, root_validator
from tenacity import (
before_sleep_log,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
import google.generativeai as genai
class GooglePalmError(Exception):
"""Error raised when there is an issue with the Google PaLM API."""
def _truncate_at_stop_tokens(
text: str,
stop: Optional[List[str]],
) -> str:
"""Truncates text at the earliest stop token found."""
if stop is None:
return text
for stop_token in stop:
stop_token_idx = text.find(stop_token)
if stop_token_idx != -1:
text = text[:stop_token_idx]
return text
def _response_to_result(response: genai.types.ChatResponse, stop: Optional[List[str]]) -> Dict[str, Any]:
"""Convert a PaLM chat response to a result dictionary."""
result = {
"id": response.id,
"created": response.created,
"model": response.model,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
},
"choices": [],
}
for choice in response.choices:
result["choices"].append({
"text": _truncate_at_stop_tokens(choice.text, stop),
"index": choice.index,
"finish_reason": choice.finish_reason,
})
return result
def _messages_to_prompt_dict(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Convert a list of message dictionaries to a prompt dictionary."""
prompt = {"messages": []}
for message in messages:
prompt["messages"].append({
"role": message["role"],
"content": message["content"],
})
return prompt
def _create_retry_decorator() -> Callable[[Any], Any]:
"""Create a retry decorator with exponential backoff."""
return retry(
retry=retry_if_exception_type(GooglePalmError),
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=30),
before_sleep=before_sleep_log(logger, logging.DEBUG),
reraise=True,
)
####################### => main class
class GooglePalm(BaseModel):
"""Wrapper around Google's PaLM Chat API."""
client: Any #: :meta private:
model_name: str = "models/chat-bison-001"
google_api_key: Optional[str] = None
temperature: Optional[float] = None
top_p: Optional[float] = None
top_k: Optional[int] = None
n: int = 1
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
# Same as before
pass
def chat_with_retry(self, **kwargs: Any) -> Any:
"""Use tenacity to retry the completion call."""
retry_decorator = _create_retry_decorator()
@retry_decorator
def _chat_with_retry(**kwargs: Any) -> Any:
return self.client.chat(**kwargs)
return _chat_with_retry(**kwargs)
async def achat_with_retry(self, **kwargs: Any) -> Any:
"""Use tenacity to retry the async completion call."""
retry_decorator = _create_retry_decorator()
@retry_decorator
async def _achat_with_retry(**kwargs: Any) -> Any:
return await self.client.chat_async(**kwargs)
return await _achat_with_retry(**kwargs)
# from __future__ import annotations
# import logging
# from swarms.utils.logger import logger
# from typing import Any, Callable, Dict, List, Optional
# from pydantic import BaseModel, model_validator
# from tenacity import (
# before_sleep_log,
# retry,
# retry_if_exception_type,
# stop_after_attempt,
# wait_exponential,
# )
# import google.generativeai as palm
# class GooglePalmError(Exception):
# """Error raised when there is an issue with the Google PaLM API."""
# def _truncate_at_stop_tokens(
# text: str,
# stop: Optional[List[str]],
# ) -> str:
# """Truncates text at the earliest stop token found."""
# if stop is None:
# return text
# for stop_token in stop:
# stop_token_idx = text.find(stop_token)
# if stop_token_idx != -1:
# text = text[:stop_token_idx]
# return text
# def _response_to_result(response: palm.types.ChatResponse, stop: Optional[List[str]]) -> Dict[str, Any]:
# """Convert a PaLM chat response to a result dictionary."""
# result = {
# "id": response.id,
# "created": response.created,
# "model": response.model,
# "usage": {
# "prompt_tokens": response.usage.prompt_tokens,
# "completion_tokens": response.usage.completion_tokens,
# "total_tokens": response.usage.total_tokens,
# },
# "choices": [],
# }
# for choice in response.choices:
# result["choices"].append({
# "text": _truncate_at_stop_tokens(choice.text, stop),
# "index": choice.index,
# "finish_reason": choice.finish_reason,
# })
# return result
# def _messages_to_prompt_dict(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
# """Convert a list of message dictionaries to a prompt dictionary."""
# prompt = {"messages": []}
# for message in messages:
# prompt["messages"].append({
# "role": message["role"],
# "content": message["content"],
# })
# return prompt
# def _create_retry_decorator() -> Callable[[Any], Any]:
# """Create a retry decorator with exponential backoff."""
# return retry(
# retry=retry_if_exception_type(GooglePalmError),
# stop=stop_after_attempt(5),
# wait=wait_exponential(multiplier=1, min=2, max=30),
# before_sleep=before_sleep_log(logger, logging.DEBUG),
# reraise=True,
# )
# ####################### => main class
# class GooglePalm(BaseModel):
# """Wrapper around Google's PaLM Chat API."""
# client: Any #: :meta private:
# model_name: str = "models/chat-bison-001"
# google_api_key: Optional[str] = None
# temperature: Optional[float] = None
# top_p: Optional[float] = None
# top_k: Optional[int] = None
# n: int = 1
# @model_validator(mode="pre")
# def validate_environment(cls, values: Dict) -> Dict:
# # Same as before
# pass
# def chat_with_retry(self, **kwargs: Any) -> Any:
# """Use tenacity to retry the completion call."""
# retry_decorator = _create_retry_decorator()
# @retry_decorator
# def _chat_with_retry(**kwargs: Any) -> Any:
# return self.client.chat(**kwargs)
# return _chat_with_retry(**kwargs)
# async def achat_with_retry(self, **kwargs: Any) -> Any:
# """Use tenacity to retry the async completion call."""
# retry_decorator = _create_retry_decorator()
# @retry_decorator
# async def _achat_with_retry(**kwargs: Any) -> Any:
# return await self.client.chat_async(**kwargs)
# return await _achat_with_retry(**kwargs)
def __call__(
self,
messages: List[Dict[str, Any]],
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
prompt = _messages_to_prompt_dict(messages)
response: genai.types.ChatResponse = self.chat_with_retry(
model=self.model_name,
prompt=prompt,
temperature=self.temperature,
top_p=self.top_p,
top_k=self.top_k,
candidate_count=self.n,
**kwargs,
)
return _response_to_result(response, stop)
def generate(
self,
messages: List[Dict[str, Any]],
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
prompt = _messages_to_prompt_dict(messages)
response: genai.types.ChatResponse = self.chat_with_retry(
model=self.model_name,
prompt=prompt,
temperature=self.temperature,
top_p=self.top_p,
top_k=self.top_k,
candidate_count=self.n,
**kwargs,
)
return _response_to_result(response, stop)
async def _agenerate(
self,
messages: List[Dict[str, Any]],
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
prompt = _messages_to_prompt_dict(messages)
response: genai.types.ChatResponse = await self.achat_with_retry(
model=self.model_name,
prompt=prompt,
temperature=self.temperature,
top_p=self.top_p,
top_k=self.top_k,
candidate_count=self.n,
)
return _response_to_result(response, stop)
@property
def _identifying_params(self) -> Dict[str, Any]:
"""Get the identifying parameters."""
return {
"model_name": self.model_name,
"temperature": self.temperature,
"top_p": self.top_p,
"top_k": self.top_k,
"n": self.n,
}
@property
def _llm_type(self) -> str:
return "google-palm-chat"
# def __call__(
# self,
# messages: List[Dict[str, Any]],
# stop: Optional[List[str]] = None,
# **kwargs: Any,
# ) -> Dict[str, Any]:
# prompt = _messages_to_prompt_dict(messages)
# response: palm.types.ChatResponse = self.chat_with_retry(
# model=self.model_name,
# prompt=prompt,
# temperature=self.temperature,
# top_p=self.top_p,
# top_k=self.top_k,
# candidate_count=self.n,
# **kwargs,
# )
# return _response_to_result(response, stop)
# def generate(
# self,
# messages: List[Dict[str, Any]],
# stop: Optional[List[str]] = None,
# **kwargs: Any,
# ) -> Dict[str, Any]:
# prompt = _messages_to_prompt_dict(messages)
# response: palm.types.ChatResponse = self.chat_with_retry(
# model=self.model_name,
# prompt=prompt,
# temperature=self.temperature,
# top_p=self.top_p,
# top_k=self.top_k,
# candidate_count=self.n,
# **kwargs,
# )
# return _response_to_result(response, stop)
# async def _agenerate(
# self,
# messages: List[Dict[str, Any]],
# stop: Optional[List[str]] = None,
# **kwargs: Any,
# ) -> Dict[str, Any]:
# prompt = _messages_to_prompt_dict(messages)
# response: palm.types.ChatResponse = await self.achat_with_retry(
# model=self.model_name,
# prompt=prompt,
# temperature=self.temperature,
# top_p=self.top_p,
# top_k=self.top_k,
# candidate_count=self.n,
# )
# return _response_to_result(response, stop)
# @property
# def _identifying_params(self) -> Dict[str, Any]:
# """Get the identifying parameters."""
# return {
# "model_name": self.model_name,
# "temperature": self.temperature,
# "top_p": self.top_p,
# "top_k": self.top_k,
# "n": self.n,
# }
# @property
# def _llm_type(self) -> str:
# return "google-palm-chat"

@ -132,11 +132,12 @@ class WebpageQATool(BaseTool):
query_website_tool = WebpageQATool(qa_chain=load_qa_with_sources_chain(llm))
# !pip install duckduckgo_search
web_search = DuckDuckGoSearchRun()
# web_search = DuckDuckGoSearchRun()
# from swarms.agents.tools.code_intepretor import CodeInterpreter
# # @tool
# code_intepret = CodeInterpreter()
# code_intepret = CodeInterpreter()

@ -0,0 +1,62 @@
import threading
import queue
from time import sleep
from swarms.workers.autobot import AutoBot
# TODO Handle task assignment and task delegation
# TODO: User task => decomposed into very small sub tasks => sub tasks assigned to workers => workers complete and update the swarm, can ask for help from other agents.
# TODO: Missing, Task Assignment, Task delegation, Task completion, Swarm level communication with vector db
class AutoScaler:
def __init__(self,
initial_agents=10,
scale_up_factor=1,
idle_threshold=0.2,
busy_threshold=0.7
):
self.agents_pool = [AutoBot() for _ in range(initial_agents)]
self.task_queue = queue.Queue()
self.scale_up_factor = scale_up_factor
self.idle_threshold = idle_threshold
self.lock = threading.Lock()
def add_task(self, task):
self.tasks_queue.put(task)
def scale_up(self):
with self.lock:
new_agents_counts = len(self.agents_pool) * self.scale_up_factor
for _ in range(new_agents_counts):
self.agents_pool.append(AutoBot())
def scale_down(self):
with self.lock:
if len(self.agents_pool) > 10: #ensure minmum of 10 agents
del self.agents_pool[-1] #remove last agent
def monitor_and_scale(self):
while True:
sleep(60)#check minute
pending_tasks = self.task_queue.qsize()
active_agents = sum([1 for agent in self.agents_pool if agent.is_busy()])
if pending_tasks / len(self.agents_pool) > self.busy_threshold:
self.scale_up()
elif active_agents / len(self.agents_pool) < self.idle_threshold:
self.scale_down()
def start(self):
monitor_thread = threading.Thread(target=self.monitor_and_scale)
monitor_thread.start()
while True:
task = self.task_queue.get()
if task:
available_agent = next((agent for agent in self.agents_pool))
if available_agent:
available_agent.run(task)

@ -0,0 +1,75 @@
import faiss
from langchain.chat_models import ChatOpenAI
from langchain.docstore import InMemoryDocstore
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain_experimental.autonomous_agents import AutoGPT
from swarms.agents.tools.autogpt import (
DuckDuckGoSearchRun,
FileChatMessageHistory,
ReadFileTool,
WebpageQATool,
WriteFileTool,
load_qa_with_sources_chain,
process_csv,
# web_search,
query_website_tool
)
ROOT_DIR = "./data/"
class AutoBot:
def __init__(self,
model_name="gpt-4",
openai_api_key=None,
ai_name="Autobot Swarm Worker",
ai_role="Worker in a swarm",
# embedding_size=None,
# k=None,
temperature=0.5):
self.openai_api_key = openai_api_key
self.temperature = temperature
self.llm = ChatOpenAI(model_name=model_name,
openai_api_key=self.openai_api_key,
temperature=self.temperature)
self.ai_name = ai_name
self.ai_role = ai_role
# self.embedding_size = embedding_size
# # self.k = k
self.setup_tools()
self.setup_memory()
self.setup_agent()
def setup_tools(self):
self.tools = [
WriteFileTool(root_dir=ROOT_DIR),
ReadFileTool(root_dir=ROOT_DIR),
process_csv,
query_website_tool,
]
def setup_memory(self):
embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
self.vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
def setup_agent(self):
self.agent = AutoGPT.from_llm_and_tools(
ai_name=self.ai_name,
ai_role=self.ai_role,
tools=self.tools,
llm=self.llm,
memory=self.vectorstore.as_retriever(search_kwargs={"k": 8}),
)
def run(self, task):
result = self.agent.run([task])
return result

@ -17,17 +17,17 @@ from swarms.agents.tools.autogpt import (
WriteFileTool,
load_qa_with_sources_chain,
process_csv,
web_search,
)
from swarms.agents.tools.developer import (
code_editor_append,
code_editor_delete,
code_editor_patch,
code_editor_read,
code_editor_summary,
code_editor_write,
terminal_execute,
# web_search,
)
# from swarms.agents.tools.developer import (
# code_editor_append,
# code_editor_delete,
# code_editor_patch,
# code_editor_read,
# code_editor_summary,
# code_editor_write,
# terminal_execute,
# )
ROOT_DIR = "./data/"
@ -182,8 +182,8 @@ class WorkerNode:
def initialize_vectorstore(self):
try:
embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
embedding_size = self.embedding_size
index = faiss.IndexFlatL2(embedding_size)
# embedding_size = self.embedding_size
index = faiss.IndexFlatL2(self.embedding_size)
return FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
except Exception as e:
@ -227,18 +227,11 @@ class WorkerNode:
llm = self.initialize_llm(llm_class, self.temperature)
tools = [
web_search,
# web_search,
WriteFileTool(root_dir=ROOT_DIR),
ReadFileTool(root_dir=ROOT_DIR),
process_csv,
WebpageQATool(qa_chain=load_qa_with_sources_chain(llm)),
code_editor_append,
code_editor_delete,
code_editor_patch,
code_editor_read,
code_editor_summary,
code_editor_write,
terminal_execute,
]
if not tools:
logging.error("Tools are not initialized")
@ -276,7 +269,7 @@ def worker_node(openai_api_key, objective):
try:
worker_node = WorkerNode(openai_api_key)
worker_node.create_worker_node()
# worker_node.create_worker_node()
return worker_node.run(objective)
except Exception as e:
logging.error(f"An error occured in worker_node: {e}")

Loading…
Cancel
Save