From 01e7e9b34c56d860a19673efcb07c4c298aa1a1e Mon Sep 17 00:00:00 2001 From: Kye Date: Mon, 21 Aug 2023 20:10:18 -0400 Subject: [PATCH] stable worker autobot node with prototype autoscaler need task assignment and delegation --- chat_history.txt | 1 + example.py | 36 ++- playground/autoscaler.py | 9 + pyproject.toml | 2 +- requirements.txt | 5 +- swarms/__init__.py | 12 +- swarms/agents/__init__.py | 2 +- swarms/agents/models/huggingface.py | 20 +- swarms/agents/models/palm.py | 374 ++++++++++++++-------------- swarms/agents/tools/autogpt.py | 5 +- swarms/orchestrator/autoscaler.py | 62 +++++ swarms/workers/autobot.py | 75 ++++++ swarms/workers/worker_node.py | 35 ++- 13 files changed, 400 insertions(+), 238 deletions(-) create mode 100644 chat_history.txt create mode 100644 playground/autoscaler.py create mode 100644 swarms/orchestrator/autoscaler.py create mode 100644 swarms/workers/autobot.py diff --git a/chat_history.txt b/chat_history.txt new file mode 100644 index 00000000..0637a088 --- /dev/null +++ b/chat_history.txt @@ -0,0 +1 @@ +[] \ No newline at end of file diff --git a/example.py b/example.py index 124b5b18..69be9e3d 100644 --- a/example.py +++ b/example.py @@ -47,20 +47,34 @@ ################ -from swarms import WorkerNode +# from swarms import WorkerNode -# Your OpenAI API key -api_key = "example key" +# # Your OpenAI API key +# api_key = "" +# # Initialize a WorkerNode with your API key +# node = WorkerNode(api_key) +# # node.create_worker_node() -# Initialize a WorkerNode with your API key -node = WorkerNode(api_key) -node.create_worker_node() +# # Define an objective +# objective = "Please make a web GUI for using HTTP API server..." + +# # Run the task +# task = node.run(objective) + +# print(task) + + + +########## +from swarms import AutoBot -# Define an objective -objective = "Please make a web GUI for using HTTP API server..." -# Run the task -task = node.run(objective) +auto_bot = AutoBot( + openai_api_key="", + ai_name="Optimus Prime", +) -print(task) +task = "What were the winning boston marathon times for the past 5 years (ending in 2022)? Generate a table of the year, name, country of origin, and times." +response = auto_bot.run(task) +print(response) \ No newline at end of file diff --git a/playground/autoscaler.py b/playground/autoscaler.py new file mode 100644 index 00000000..3c7cc525 --- /dev/null +++ b/playground/autoscaler.py @@ -0,0 +1,9 @@ +from swarms import AutoBot, AutoScaler + +auto_scaler = AutoScaler() +auto_scaler.start() + +for i in range(100): + auto_scaler.add_task(f"Task {i}") + + \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index a3ccbeec..4d441009 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ google-generativeai = "*" torch = "*" langchain-experimental = "*" playwright = "*" -duckduckgo_search = "*" +duckduckgo-search = "*" faiss-cpu = "*" wget = "*" httpx = "*" diff --git a/requirements.txt b/requirements.txt index c433a1bb..87cb9ab8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -121,7 +121,6 @@ google-generativeai oceandb langchain-experimental playwright -duckduckgo_search wget==3.2 simpleaichat httpx @@ -132,4 +131,6 @@ redis google-search-results==2.4.2 Pillow faiss-cpu -openai \ No newline at end of file +openai +google-generativeai +duckduckgo-search \ No newline at end of file diff --git a/swarms/__init__.py b/swarms/__init__.py index 2163acf5..3e9e626e 100644 --- a/swarms/__init__.py +++ b/swarms/__init__.py @@ -1,14 +1,18 @@ #swarms +from swarms.orchestrator.autoscaler import AutoScaler # worker -# from swarms.workers.worker_node import WorkerNode +from swarms.workers.worker_node import WorkerNode +from swarms.workers.autobot import AutoBot #boss -# from swarms.boss.boss_node import BossNode +from swarms.boss.boss_node import BossNode #models from swarms.agents.models.anthropic import Anthropic from swarms.agents.models.huggingface import HuggingFaceLLM -from swarms.agents.models.palm import GooglePalm +# from swarms.agents.models.palm import GooglePalm from swarms.agents.models.petals import Petals -# from swarms.agents.models.openai import OpenAI +from swarms.agents.models.openai import OpenAI + + diff --git a/swarms/agents/__init__.py b/swarms/agents/__init__.py index 3400e7f1..32d65d8e 100644 --- a/swarms/agents/__init__.py +++ b/swarms/agents/__init__.py @@ -4,7 +4,7 @@ #models from swarms.agents.models.anthropic import Anthropic from swarms.agents.models.huggingface import HuggingFaceLLM -from swarms.agents.models.palm import GooglePalm +# from swarms.agents.models.palm import GooglePalm from swarms.agents.models.petals import Petals from swarms.agents.models.openai import OpenAI diff --git a/swarms/agents/models/huggingface.py b/swarms/agents/models/huggingface.py index f43c76e5..1eb36d79 100644 --- a/swarms/agents/models/huggingface.py +++ b/swarms/agents/models/huggingface.py @@ -1,6 +1,6 @@ import torch import logging -from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig +from transformers import AutoModelForCausalLM, AutoTokenizer #,# BitsAndBytesConfig class HuggingFaceLLM: def __init__(self, model_id: str, device: str = None, max_length: int = 20, quantize: bool = False, quantization_config: dict = None): @@ -10,15 +10,15 @@ class HuggingFaceLLM: self.max_length = max_length bnb_config = None - if quantize: - if not quantization_config: - quantization_config = { - 'load_in_4bit': True, - 'bnb_4bit_use_double_quant': True, - 'bnb_4bit_quant_type': "nf4", - 'bnb_4bit_compute_dtype': torch.bfloat16 - } - bnb_config = BitsAndBytesConfig(**quantization_config) + # if quantize: + # if not quantization_config: + # quantization_config = { + # 'load_in_4bit': True, + # 'bnb_4bit_use_double_quant': True, + # 'bnb_4bit_quant_type': "nf4", + # 'bnb_4bit_compute_dtype': torch.bfloat16 + # } + # bnb_config = BitsAndBytesConfig(**quantization_config) try: self.tokenizer = AutoTokenizer.from_pretrained(self.model_id) diff --git a/swarms/agents/models/palm.py b/swarms/agents/models/palm.py index 3f4856b0..20eafd61 100644 --- a/swarms/agents/models/palm.py +++ b/swarms/agents/models/palm.py @@ -1,187 +1,189 @@ -from __future__ import annotations - -import logging -from swarms.utils.logger import logger -from typing import Any, Callable, Dict, List, Optional - -from pydantic import BaseModel, root_validator -from tenacity import ( - before_sleep_log, - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential, -) - -import google.generativeai as genai - - -class GooglePalmError(Exception): - """Error raised when there is an issue with the Google PaLM API.""" - -def _truncate_at_stop_tokens( - text: str, - stop: Optional[List[str]], -) -> str: - """Truncates text at the earliest stop token found.""" - if stop is None: - return text - - for stop_token in stop: - stop_token_idx = text.find(stop_token) - if stop_token_idx != -1: - text = text[:stop_token_idx] - return text - -def _response_to_result(response: genai.types.ChatResponse, stop: Optional[List[str]]) -> Dict[str, Any]: - """Convert a PaLM chat response to a result dictionary.""" - result = { - "id": response.id, - "created": response.created, - "model": response.model, - "usage": { - "prompt_tokens": response.usage.prompt_tokens, - "completion_tokens": response.usage.completion_tokens, - "total_tokens": response.usage.total_tokens, - }, - "choices": [], - } - for choice in response.choices: - result["choices"].append({ - "text": _truncate_at_stop_tokens(choice.text, stop), - "index": choice.index, - "finish_reason": choice.finish_reason, - }) - return result - -def _messages_to_prompt_dict(messages: List[Dict[str, Any]]) -> Dict[str, Any]: - """Convert a list of message dictionaries to a prompt dictionary.""" - prompt = {"messages": []} - for message in messages: - prompt["messages"].append({ - "role": message["role"], - "content": message["content"], - }) - return prompt - -def _create_retry_decorator() -> Callable[[Any], Any]: - """Create a retry decorator with exponential backoff.""" - return retry( - retry=retry_if_exception_type(GooglePalmError), - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1, min=2, max=30), - before_sleep=before_sleep_log(logger, logging.DEBUG), - reraise=True, - ) - -####################### => main class -class GooglePalm(BaseModel): - """Wrapper around Google's PaLM Chat API.""" - - client: Any #: :meta private: - model_name: str = "models/chat-bison-001" - google_api_key: Optional[str] = None - temperature: Optional[float] = None - top_p: Optional[float] = None - top_k: Optional[int] = None - n: int = 1 - - @root_validator() - def validate_environment(cls, values: Dict) -> Dict: - # Same as before - pass - - def chat_with_retry(self, **kwargs: Any) -> Any: - """Use tenacity to retry the completion call.""" - retry_decorator = _create_retry_decorator() - - @retry_decorator - def _chat_with_retry(**kwargs: Any) -> Any: - return self.client.chat(**kwargs) - - return _chat_with_retry(**kwargs) - - async def achat_with_retry(self, **kwargs: Any) -> Any: - """Use tenacity to retry the async completion call.""" - retry_decorator = _create_retry_decorator() - - @retry_decorator - async def _achat_with_retry(**kwargs: Any) -> Any: - return await self.client.chat_async(**kwargs) - - return await _achat_with_retry(**kwargs) +# from __future__ import annotations + +# import logging +# from swarms.utils.logger import logger +# from typing import Any, Callable, Dict, List, Optional + +# from pydantic import BaseModel, model_validator +# from tenacity import ( +# before_sleep_log, +# retry, +# retry_if_exception_type, +# stop_after_attempt, +# wait_exponential, +# ) + +# import google.generativeai as palm + + +# class GooglePalmError(Exception): +# """Error raised when there is an issue with the Google PaLM API.""" + +# def _truncate_at_stop_tokens( +# text: str, +# stop: Optional[List[str]], +# ) -> str: +# """Truncates text at the earliest stop token found.""" +# if stop is None: +# return text + +# for stop_token in stop: +# stop_token_idx = text.find(stop_token) +# if stop_token_idx != -1: +# text = text[:stop_token_idx] +# return text + + +# def _response_to_result(response: palm.types.ChatResponse, stop: Optional[List[str]]) -> Dict[str, Any]: +# """Convert a PaLM chat response to a result dictionary.""" +# result = { +# "id": response.id, +# "created": response.created, +# "model": response.model, +# "usage": { +# "prompt_tokens": response.usage.prompt_tokens, +# "completion_tokens": response.usage.completion_tokens, +# "total_tokens": response.usage.total_tokens, +# }, +# "choices": [], +# } +# for choice in response.choices: +# result["choices"].append({ +# "text": _truncate_at_stop_tokens(choice.text, stop), +# "index": choice.index, +# "finish_reason": choice.finish_reason, +# }) +# return result + +# def _messages_to_prompt_dict(messages: List[Dict[str, Any]]) -> Dict[str, Any]: +# """Convert a list of message dictionaries to a prompt dictionary.""" +# prompt = {"messages": []} +# for message in messages: +# prompt["messages"].append({ +# "role": message["role"], +# "content": message["content"], +# }) +# return prompt + + +# def _create_retry_decorator() -> Callable[[Any], Any]: +# """Create a retry decorator with exponential backoff.""" +# return retry( +# retry=retry_if_exception_type(GooglePalmError), +# stop=stop_after_attempt(5), +# wait=wait_exponential(multiplier=1, min=2, max=30), +# before_sleep=before_sleep_log(logger, logging.DEBUG), +# reraise=True, +# ) + + +# ####################### => main class +# class GooglePalm(BaseModel): +# """Wrapper around Google's PaLM Chat API.""" + +# client: Any #: :meta private: +# model_name: str = "models/chat-bison-001" +# google_api_key: Optional[str] = None +# temperature: Optional[float] = None +# top_p: Optional[float] = None +# top_k: Optional[int] = None +# n: int = 1 + +# @model_validator(mode="pre") +# def validate_environment(cls, values: Dict) -> Dict: +# # Same as before +# pass + +# def chat_with_retry(self, **kwargs: Any) -> Any: +# """Use tenacity to retry the completion call.""" +# retry_decorator = _create_retry_decorator() + +# @retry_decorator +# def _chat_with_retry(**kwargs: Any) -> Any: +# return self.client.chat(**kwargs) + +# return _chat_with_retry(**kwargs) + +# async def achat_with_retry(self, **kwargs: Any) -> Any: +# """Use tenacity to retry the async completion call.""" +# retry_decorator = _create_retry_decorator() + +# @retry_decorator +# async def _achat_with_retry(**kwargs: Any) -> Any: +# return await self.client.chat_async(**kwargs) + +# return await _achat_with_retry(**kwargs) - def __call__( - self, - messages: List[Dict[str, Any]], - stop: Optional[List[str]] = None, - **kwargs: Any, - ) -> Dict[str, Any]: - prompt = _messages_to_prompt_dict(messages) - - response: genai.types.ChatResponse = self.chat_with_retry( - model=self.model_name, - prompt=prompt, - temperature=self.temperature, - top_p=self.top_p, - top_k=self.top_k, - candidate_count=self.n, - **kwargs, - ) - - return _response_to_result(response, stop) - - def generate( - self, - messages: List[Dict[str, Any]], - stop: Optional[List[str]] = None, - **kwargs: Any, - ) -> Dict[str, Any]: - prompt = _messages_to_prompt_dict(messages) - - response: genai.types.ChatResponse = self.chat_with_retry( - model=self.model_name, - prompt=prompt, - temperature=self.temperature, - top_p=self.top_p, - top_k=self.top_k, - candidate_count=self.n, - **kwargs, - ) - - return _response_to_result(response, stop) - - async def _agenerate( - self, - messages: List[Dict[str, Any]], - stop: Optional[List[str]] = None, - **kwargs: Any, - ) -> Dict[str, Any]: - prompt = _messages_to_prompt_dict(messages) - - response: genai.types.ChatResponse = await self.achat_with_retry( - model=self.model_name, - prompt=prompt, - temperature=self.temperature, - top_p=self.top_p, - top_k=self.top_k, - candidate_count=self.n, - ) - - return _response_to_result(response, stop) - - - @property - def _identifying_params(self) -> Dict[str, Any]: - """Get the identifying parameters.""" - return { - "model_name": self.model_name, - "temperature": self.temperature, - "top_p": self.top_p, - "top_k": self.top_k, - "n": self.n, - } - - @property - def _llm_type(self) -> str: - return "google-palm-chat" \ No newline at end of file +# def __call__( +# self, +# messages: List[Dict[str, Any]], +# stop: Optional[List[str]] = None, +# **kwargs: Any, +# ) -> Dict[str, Any]: +# prompt = _messages_to_prompt_dict(messages) + +# response: palm.types.ChatResponse = self.chat_with_retry( +# model=self.model_name, +# prompt=prompt, +# temperature=self.temperature, +# top_p=self.top_p, +# top_k=self.top_k, +# candidate_count=self.n, +# **kwargs, +# ) + +# return _response_to_result(response, stop) + +# def generate( +# self, +# messages: List[Dict[str, Any]], +# stop: Optional[List[str]] = None, +# **kwargs: Any, +# ) -> Dict[str, Any]: +# prompt = _messages_to_prompt_dict(messages) + +# response: palm.types.ChatResponse = self.chat_with_retry( +# model=self.model_name, +# prompt=prompt, +# temperature=self.temperature, +# top_p=self.top_p, +# top_k=self.top_k, +# candidate_count=self.n, +# **kwargs, +# ) + +# return _response_to_result(response, stop) + +# async def _agenerate( +# self, +# messages: List[Dict[str, Any]], +# stop: Optional[List[str]] = None, +# **kwargs: Any, +# ) -> Dict[str, Any]: +# prompt = _messages_to_prompt_dict(messages) + +# response: palm.types.ChatResponse = await self.achat_with_retry( +# model=self.model_name, +# prompt=prompt, +# temperature=self.temperature, +# top_p=self.top_p, +# top_k=self.top_k, +# candidate_count=self.n, +# ) + +# return _response_to_result(response, stop) + +# @property +# def _identifying_params(self) -> Dict[str, Any]: +# """Get the identifying parameters.""" +# return { +# "model_name": self.model_name, +# "temperature": self.temperature, +# "top_p": self.top_p, +# "top_k": self.top_k, +# "n": self.n, +# } + +# @property +# def _llm_type(self) -> str: +# return "google-palm-chat" \ No newline at end of file diff --git a/swarms/agents/tools/autogpt.py b/swarms/agents/tools/autogpt.py index c989b655..af471449 100644 --- a/swarms/agents/tools/autogpt.py +++ b/swarms/agents/tools/autogpt.py @@ -132,11 +132,12 @@ class WebpageQATool(BaseTool): query_website_tool = WebpageQATool(qa_chain=load_qa_with_sources_chain(llm)) # !pip install duckduckgo_search -web_search = DuckDuckGoSearchRun() +# web_search = DuckDuckGoSearchRun() # from swarms.agents.tools.code_intepretor import CodeInterpreter # # @tool -# code_intepret = CodeInterpreter() \ No newline at end of file +# code_intepret = CodeInterpreter() + diff --git a/swarms/orchestrator/autoscaler.py b/swarms/orchestrator/autoscaler.py new file mode 100644 index 00000000..eff3fc37 --- /dev/null +++ b/swarms/orchestrator/autoscaler.py @@ -0,0 +1,62 @@ +import threading +import queue +from time import sleep +from swarms.workers.autobot import AutoBot + +# TODO Handle task assignment and task delegation +# TODO: User task => decomposed into very small sub tasks => sub tasks assigned to workers => workers complete and update the swarm, can ask for help from other agents. +# TODO: Missing, Task Assignment, Task delegation, Task completion, Swarm level communication with vector db +class AutoScaler: + def __init__(self, + initial_agents=10, + scale_up_factor=1, + idle_threshold=0.2, + busy_threshold=0.7 + ): + self.agents_pool = [AutoBot() for _ in range(initial_agents)] + self.task_queue = queue.Queue() + self.scale_up_factor = scale_up_factor + self.idle_threshold = idle_threshold + self.lock = threading.Lock() + + def add_task(self, task): + self.tasks_queue.put(task) + + def scale_up(self): + with self.lock: + new_agents_counts = len(self.agents_pool) * self.scale_up_factor + for _ in range(new_agents_counts): + self.agents_pool.append(AutoBot()) + + def scale_down(self): + with self.lock: + if len(self.agents_pool) > 10: #ensure minmum of 10 agents + del self.agents_pool[-1] #remove last agent + + def monitor_and_scale(self): + while True: + sleep(60)#check minute + pending_tasks = self.task_queue.qsize() + active_agents = sum([1 for agent in self.agents_pool if agent.is_busy()]) + + if pending_tasks / len(self.agents_pool) > self.busy_threshold: + self.scale_up() + elif active_agents / len(self.agents_pool) < self.idle_threshold: + self.scale_down() + + def start(self): + monitor_thread = threading.Thread(target=self.monitor_and_scale) + monitor_thread.start() + + while True: + task = self.task_queue.get() + if task: + available_agent = next((agent for agent in self.agents_pool)) + if available_agent: + available_agent.run(task) + + + + + + \ No newline at end of file diff --git a/swarms/workers/autobot.py b/swarms/workers/autobot.py new file mode 100644 index 00000000..07221130 --- /dev/null +++ b/swarms/workers/autobot.py @@ -0,0 +1,75 @@ +import faiss +from langchain.chat_models import ChatOpenAI +from langchain.docstore import InMemoryDocstore +from langchain.embeddings import OpenAIEmbeddings +from langchain.vectorstores import FAISS +from langchain_experimental.autonomous_agents import AutoGPT + +from swarms.agents.tools.autogpt import ( + DuckDuckGoSearchRun, + FileChatMessageHistory, + ReadFileTool, + WebpageQATool, + WriteFileTool, + load_qa_with_sources_chain, + process_csv, + # web_search, + query_website_tool +) + + +ROOT_DIR = "./data/" + + +class AutoBot: + def __init__(self, + model_name="gpt-4", + openai_api_key=None, + ai_name="Autobot Swarm Worker", + ai_role="Worker in a swarm", + # embedding_size=None, + # k=None, + temperature=0.5): + self.openai_api_key = openai_api_key + self.temperature = temperature + self.llm = ChatOpenAI(model_name=model_name, + openai_api_key=self.openai_api_key, + temperature=self.temperature) + + self.ai_name = ai_name + self.ai_role = ai_role + + # self.embedding_size = embedding_size + # # self.k = k + + self.setup_tools() + self.setup_memory() + self.setup_agent() + + def setup_tools(self): + self.tools = [ + WriteFileTool(root_dir=ROOT_DIR), + ReadFileTool(root_dir=ROOT_DIR), + process_csv, + query_website_tool, + ] + + def setup_memory(self): + embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key) + embedding_size = 1536 + index = faiss.IndexFlatL2(embedding_size) + self.vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {}) + + def setup_agent(self): + self.agent = AutoGPT.from_llm_and_tools( + ai_name=self.ai_name, + ai_role=self.ai_role, + tools=self.tools, + llm=self.llm, + memory=self.vectorstore.as_retriever(search_kwargs={"k": 8}), + ) + + def run(self, task): + result = self.agent.run([task]) + return result + diff --git a/swarms/workers/worker_node.py b/swarms/workers/worker_node.py index 0860630b..28077552 100644 --- a/swarms/workers/worker_node.py +++ b/swarms/workers/worker_node.py @@ -17,17 +17,17 @@ from swarms.agents.tools.autogpt import ( WriteFileTool, load_qa_with_sources_chain, process_csv, - web_search, -) -from swarms.agents.tools.developer import ( - code_editor_append, - code_editor_delete, - code_editor_patch, - code_editor_read, - code_editor_summary, - code_editor_write, - terminal_execute, + # web_search, ) +# from swarms.agents.tools.developer import ( +# code_editor_append, +# code_editor_delete, +# code_editor_patch, +# code_editor_read, +# code_editor_summary, +# code_editor_write, +# terminal_execute, +# ) ROOT_DIR = "./data/" @@ -182,8 +182,8 @@ class WorkerNode: def initialize_vectorstore(self): try: embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key) - embedding_size = self.embedding_size - index = faiss.IndexFlatL2(embedding_size) + # embedding_size = self.embedding_size + index = faiss.IndexFlatL2(self.embedding_size) return FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {}) except Exception as e: @@ -227,18 +227,11 @@ class WorkerNode: llm = self.initialize_llm(llm_class, self.temperature) tools = [ - web_search, + # web_search, WriteFileTool(root_dir=ROOT_DIR), ReadFileTool(root_dir=ROOT_DIR), process_csv, WebpageQATool(qa_chain=load_qa_with_sources_chain(llm)), - code_editor_append, - code_editor_delete, - code_editor_patch, - code_editor_read, - code_editor_summary, - code_editor_write, - terminal_execute, ] if not tools: logging.error("Tools are not initialized") @@ -276,7 +269,7 @@ def worker_node(openai_api_key, objective): try: worker_node = WorkerNode(openai_api_key) - worker_node.create_worker_node() + # worker_node.create_worker_node() return worker_node.run(objective) except Exception as e: logging.error(f"An error occured in worker_node: {e}")