From a26acb875c4fd3e0f0189fd34d5decbf221599d5 Mon Sep 17 00:00:00 2001 From: Kye Date: Tue, 26 Sep 2023 12:41:41 -0400 Subject: [PATCH] groupchat --- pyproject.toml | 2 +- swarms/__init__.py | 5 +- swarms/models/openai.py | 1269 +++++++++++++++++++++++++++--------- swarms/swarms/groupchat.py | 156 +++++ 4 files changed, 1111 insertions(+), 321 deletions(-) create mode 100644 swarms/swarms/groupchat.py diff --git a/pyproject.toml b/pyproject.toml index 2fe82862..683990e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "1.6.7" +version = "1.6.9" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/__init__.py b/swarms/__init__.py index 8b24e4c6..d4992455 100644 --- a/swarms/__init__.py +++ b/swarms/__init__.py @@ -6,7 +6,7 @@ print(logo2) # worker # from swarms.workers.worker_node import WorkerNode -# from swarms.workers.worker import Worker +from swarms.workers.worker import Worker #boss # from swarms.boss.boss_node import Boss @@ -15,10 +15,11 @@ print(logo2) from swarms.models.anthropic import Anthropic from swarms.models.palm import GooglePalm from swarms.models.petals import Petals -#from swarms.models.openai import OpenAIChat +from swarms.models.openai import OpenAIChat #structs from swarms.structs.workflow import Workflow +from swarms.structs.task import Task # swarms from swarms.swarms.dialogue_simulator import DialogueSimulator diff --git a/swarms/models/openai.py b/swarms/models/openai.py index acffb366..32ffa1db 100644 --- a/swarms/models/openai.py +++ b/swarms/models/openai.py @@ -1,318 +1,951 @@ -# from __future__ import annotations - -# import logging -# import sys -# import warnings -# from typing import ( -# AbstractSet, -# Any, -# AsyncIterator, -# Collection, -# Dict, -# Iterator, -# List, -# Literal, -# Mapping, -# Optional, -# Tuple, -# Union, -# ) - -# from langchain.callbacks.manager import ( -# AsyncCallbackManagerForLLMRun, -# CallbackManagerForLLMRun, -# ) -# from langchain.pydantic_v1 import Field, root_validator -# from langchain.schema import Generation, LLMResult -# from langchain.schema.output import GenerationChunk -# from langchain.utils import get_from_dict_or_env - -# logger = logging.getLogger(__name__) - - -# import os -# def get_from_dict_or_env( -# data: Dict[str, Any], -# key: str, -# env_key: str, -# default: Optional[str] = None -# ) -> str: -# """Get a value from a dictionary or an environment variable.""" -# if key in data and data[key]: -# return data[key] -# else: -# return get_from_env(key, env_key, default=default) - - -# def get_from_env(key: str, env_key: str, default: Optional[str] = None) -> str: -# """Get a value from a dictionary or an environment variable.""" -# if env_key in os.environ and os.environ[env_key]: -# return os.environ[env_key] -# elif default is not None: -# return default -# else: -# raise ValueError( -# f"Did not find {key}, please add an environment variable" -# f" `{env_key}` which contains it, or pass" -# f" `{key}` as a named parameter." -# ) - - - - -# class OpenAIChat: -# """OpenAI Chat large language models. - -# To use, you should have the ``openai`` python package installed, and the -# environment variable ``OPENAI_API_KEY`` set with your API key. - -# Any parameters that are valid to be passed to the openai.create call can be passed -# in, even if not explicitly saved on this class. - -# Example: -# .. code-block:: python - -# from langchain.llms import OpenAIChat -# openaichat = OpenAIChat(model_name="gpt-3.5-turbo") -# """ - -# client: Any #: :meta private: -# model_name: str = "gpt-3.5-turbo" -# """Model name to use.""" -# model_kwargs: Dict[str, Any] = Field(default_factory=dict) -# """Holds any model parameters valid for `create` call not explicitly specified.""" -# openai_api_key: Optional[str] = None -# openai_api_base: Optional[str] = None -# # to support explicit proxy for OpenAI -# openai_proxy: Optional[str] = None -# max_retries: int = 6 -# """Maximum number of retries to make when generating.""" -# prefix_messages: List = Field(default_factory=list) -# """Series of messages for Chat input.""" -# streaming: bool = False -# """Whether to stream the results or not.""" -# allowed_special: Union[Literal["all"], AbstractSet[str]] = set() -# """Set of special tokens that are allowed。""" -# disallowed_special: Union[Literal["all"], Collection[str]] = "all" -# """Set of special tokens that are not allowed。""" - -# @root_validator(pre=True) -# def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]: -# """Build extra kwargs from additional params that were passed in.""" -# all_required_field_names = {field.alias for field in cls.__fields__.values()} - -# extra = values.get("model_kwargs", {}) -# for field_name in list(values): -# if field_name not in all_required_field_names: -# if field_name in extra: -# raise ValueError(f"Found {field_name} supplied twice.") -# extra[field_name] = values.pop(field_name) -# values["model_kwargs"] = extra -# return values - -# @root_validator() -# def validate_environment(cls, values: Dict) -> Dict: -# """Validate that api key and python package exists in environment.""" -# openai_api_key = get_from_dict_or_env( -# values, "openai_api_key", "OPENAI_API_KEY" -# ) -# openai_api_base = get_from_dict_or_env( -# values, -# "openai_api_base", -# "OPENAI_API_BASE", -# default="", -# ) -# openai_proxy = get_from_dict_or_env( -# values, -# "openai_proxy", -# "OPENAI_PROXY", -# default="", -# ) -# openai_organization = get_from_dict_or_env( -# values, "openai_organization", "OPENAI_ORGANIZATION", default="" -# ) -# try: -# import openai - -# openai.api_key = openai_api_key -# if openai_api_base: -# openai.api_base = openai_api_base -# if openai_organization: -# openai.organization = openai_organization -# if openai_proxy: -# openai.proxy = {"http": openai_proxy, "https": openai_proxy} # type: ignore[assignment] # noqa: E501 -# except ImportError: -# raise ImportError( -# "Could not import openai python package. " -# "Please install it with `pip install openai`." -# ) -# try: -# values["client"] = openai.ChatCompletion -# except AttributeError: -# raise ValueError( -# "`openai` has no `ChatCompletion` attribute, this is likely " -# "due to an old version of the openai package. Try upgrading it " -# "with `pip install --upgrade openai`." -# ) -# warnings.warn( -# "You are trying to use a chat model. This way of initializing it is " -# "no longer supported. Instead, please use: " -# "`from langchain.chat_models import ChatOpenAI`" -# ) -# return values - -# @property -# def _default_params(self) -> Dict[str, Any]: -# """Get the default parameters for calling OpenAI API.""" -# return self.model_kwargs - -# def _get_chat_params( -# self, prompts: List[str], stop: Optional[List[str]] = None -# ) -> Tuple: -# if len(prompts) > 1: -# raise ValueError( -# f"OpenAIChat currently only supports single prompt, got {prompts}" -# ) -# messages = self.prefix_messages + [{"role": "user", "content": prompts[0]}] -# params: Dict[str, Any] = {**{"model": self.model_name}, **self._default_params} -# if stop is not None: -# if "stop" in params: -# raise ValueError("`stop` found in both the input and default params.") -# params["stop"] = stop -# if params.get("max_tokens") == -1: -# # for ChatGPT api, omitting max_tokens is equivalent to having no limit -# del params["max_tokens"] -# return messages, params - -# def _stream( -# self, -# prompt: str, -# stop: Optional[List[str]] = None, -# run_manager: Optional[CallbackManagerForLLMRun] = None, -# **kwargs: Any, -# ) -> Iterator[GenerationChunk]: -# messages, params = self._get_chat_params([prompt], stop) -# params = {**params, **kwargs, "stream": True} -# for stream_resp in completion_with_retry( -# self, messages=messages, run_manager=run_manager, **params -# ): -# token = stream_resp["choices"][0]["delta"].get("content", "") -# chunk = GenerationChunk(text=token) -# yield chunk -# if run_manager: -# run_manager.on_llm_new_token(token, chunk=chunk) - -# async def _astream( -# self, -# prompt: str, -# stop: Optional[List[str]] = None, -# run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, -# **kwargs: Any, -# ) -> AsyncIterator[GenerationChunk]: -# messages, params = self._get_chat_params([prompt], stop) -# params = {**params, **kwargs, "stream": True} -# async for stream_resp in await acompletion_with_retry( -# self, messages=messages, run_manager=run_manager, **params -# ): -# token = stream_resp["choices"][0]["delta"].get("content", "") -# chunk = GenerationChunk(text=token) -# yield chunk -# if run_manager: -# await run_manager.on_llm_new_token(token, chunk=chunk) - -# def _generate( -# self, -# prompts: List[str], -# stop: Optional[List[str]] = None, -# run_manager: Optional[CallbackManagerForLLMRun] = None, -# **kwargs: Any, -# ) -> LLMResult: -# if self.streaming: -# generation: Optional[GenerationChunk] = None -# for chunk in self._stream(prompts[0], stop, run_manager, **kwargs): -# if generation is None: -# generation = chunk -# else: -# generation += chunk -# assert generation is not None -# return LLMResult(generations=[[generation]]) - -# messages, params = self._get_chat_params(prompts, stop) -# params = {**params, **kwargs} -# full_response = completion_with_retry( -# self, messages=messages, run_manager=run_manager, **params -# ) -# llm_output = { -# "token_usage": full_response["usage"], -# "model_name": self.model_name, -# } -# return LLMResult( -# generations=[ -# [Generation(text=full_response["choices"][0]["message"]["content"])] -# ], -# llm_output=llm_output, -# ) - -# async def _agenerate( -# self, -# prompts: List[str], -# stop: Optional[List[str]] = None, -# run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, -# **kwargs: Any, -# ) -> LLMResult: -# if self.streaming: -# generation: Optional[GenerationChunk] = None -# async for chunk in self._astream(prompts[0], stop, run_manager, **kwargs): -# if generation is None: -# generation = chunk -# else: -# generation += chunk -# assert generation is not None -# return LLMResult(generations=[[generation]]) - -# messages, params = self._get_chat_params(prompts, stop) -# params = {**params, **kwargs} -# full_response = await acompletion_with_retry( -# self, messages=messages, run_manager=run_manager, **params -# ) -# llm_output = { -# "token_usage": full_response["usage"], -# "model_name": self.model_name, -# } -# return LLMResult( -# generations=[ -# [Generation(text=full_response["choices"][0]["message"]["content"])] -# ], -# llm_output=llm_output, -# ) - -# @property -# def _identifying_params(self) -> Mapping[str, Any]: -# """Get the identifying parameters.""" -# return {**{"model_name": self.model_name}, **self._default_params} - -# @property -# def _llm_type(self) -> str: -# """Return type of llm.""" -# return "openai-chat" - -# def get_token_ids(self, text: str) -> List[int]: -# """Get the token IDs using the tiktoken package.""" -# # tiktoken NOT supported for Python < 3.8 -# if sys.version_info[1] < 8: -# return super().get_token_ids(text) -# try: -# import tiktoken -# except ImportError: -# raise ImportError( -# "Could not import tiktoken python package. " -# "This is needed in order to calculate get_num_tokens. " -# "Please install it with `pip install tiktoken`." -# ) - -# enc = tiktoken.encoding_for_model(self.model_name) -# return enc.encode( -# text, -# allowed_special=self.allowed_special, -# disallowed_special=self.disallowed_special, -# ) \ No newline at end of file +from __future__ import annotations + +import logging +import sys +import warnings +from typing import ( + AbstractSet, + Any, + AsyncIterator, + Callable, + Collection, + Dict, + Iterator, + List, + Literal, + Mapping, + Optional, + Set, + Tuple, + Union, +) + +from langchain.callbacks.manager import ( + AsyncCallbackManagerForLLMRun, + CallbackManagerForLLMRun, +) +from langchain.llms.base import BaseLLM, create_base_retry_decorator +from langchain.pydantic_v1 import Field, root_validator +from langchain.schema import Generation, LLMResult +from langchain.schema.output import GenerationChunk +from langchain.utils import get_from_dict_or_env, get_pydantic_field_names +from langchain.utils.utils import build_extra_kwargs + +logger = logging.getLogger(__name__) + + +def update_token_usage( + keys: Set[str], response: Dict[str, Any], token_usage: Dict[str, Any] +) -> None: + """Update token usage.""" + _keys_to_use = keys.intersection(response["usage"]) + for _key in _keys_to_use: + if _key not in token_usage: + token_usage[_key] = response["usage"][_key] + else: + token_usage[_key] += response["usage"][_key] + + +def _stream_response_to_generation_chunk( + stream_response: Dict[str, Any], +) -> GenerationChunk: + """Convert a stream response to a generation chunk.""" + return GenerationChunk( + text=stream_response["choices"][0]["text"], + generation_info=dict( + finish_reason=stream_response["choices"][0].get("finish_reason", None), + logprobs=stream_response["choices"][0].get("logprobs", None), + ), + ) + + +def _update_response(response: Dict[str, Any], stream_response: Dict[str, Any]) -> None: + """Update response from the stream response.""" + response["choices"][0]["text"] += stream_response["choices"][0]["text"] + response["choices"][0]["finish_reason"] = stream_response["choices"][0].get( + "finish_reason", None + ) + response["choices"][0]["logprobs"] = stream_response["choices"][0]["logprobs"] + + +def _streaming_response_template() -> Dict[str, Any]: + return { + "choices": [ + { + "text": "", + "finish_reason": None, + "logprobs": None, + } + ] + } + + +def _create_retry_decorator( + llm: Union[BaseOpenAI, OpenAIChat], + run_manager: Optional[ + Union[AsyncCallbackManagerForLLMRun, CallbackManagerForLLMRun] + ] = None, +) -> Callable[[Any], Any]: + import openai + + errors = [ + openai.error.Timeout, + openai.error.APIError, + openai.error.APIConnectionError, + openai.error.RateLimitError, + openai.error.ServiceUnavailableError, + ] + return create_base_retry_decorator( + error_types=errors, max_retries=llm.max_retries, run_manager=run_manager + ) + + +def completion_with_retry( + llm: Union[BaseOpenAI, OpenAIChat], + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, +) -> Any: + """Use tenacity to retry the completion call.""" + retry_decorator = _create_retry_decorator(llm, run_manager=run_manager) + + @retry_decorator + def _completion_with_retry(**kwargs: Any) -> Any: + return llm.client.create(**kwargs) + + return _completion_with_retry(**kwargs) + + +async def acompletion_with_retry( + llm: Union[BaseOpenAI, OpenAIChat], + run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, + **kwargs: Any, +) -> Any: + """Use tenacity to retry the async completion call.""" + retry_decorator = _create_retry_decorator(llm, run_manager=run_manager) + + @retry_decorator + async def _completion_with_retry(**kwargs: Any) -> Any: + # Use OpenAI's async api https://github.com/openai/openai-python#async-api + return await llm.client.acreate(**kwargs) + + return await _completion_with_retry(**kwargs) + + +class BaseOpenAI(BaseLLM): + """Base OpenAI large language model class.""" + + @property + def lc_secrets(self) -> Dict[str, str]: + return {"openai_api_key": "OPENAI_API_KEY"} + + @classmethod + def is_lc_serializable(cls) -> bool: + return True + + client: Any = None #: :meta private: + model_name: str = Field(default="text-davinci-003", alias="model") + """Model name to use.""" + temperature: float = 0.7 + """What sampling temperature to use.""" + max_tokens: int = 256 + """The maximum number of tokens to generate in the completion. + -1 returns as many tokens as possible given the prompt and + the models maximal context size.""" + top_p: float = 1 + """Total probability mass of tokens to consider at each step.""" + frequency_penalty: float = 0 + """Penalizes repeated tokens according to frequency.""" + presence_penalty: float = 0 + """Penalizes repeated tokens.""" + n: int = 1 + """How many completions to generate for each prompt.""" + best_of: int = 1 + """Generates best_of completions server-side and returns the "best".""" + model_kwargs: Dict[str, Any] = Field(default_factory=dict) + """Holds any model parameters valid for `create` call not explicitly specified.""" + openai_api_key: Optional[str] = None + openai_api_base: Optional[str] = None + openai_organization: Optional[str] = None + # to support explicit proxy for OpenAI + openai_proxy: Optional[str] = None + batch_size: int = 20 + """Batch size to use when passing multiple documents to generate.""" + request_timeout: Optional[Union[float, Tuple[float, float]]] = None + """Timeout for requests to OpenAI completion API. Default is 600 seconds.""" + logit_bias: Optional[Dict[str, float]] = Field(default_factory=dict) + """Adjust the probability of specific tokens being generated.""" + max_retries: int = 6 + """Maximum number of retries to make when generating.""" + streaming: bool = False + """Whether to stream the results or not.""" + allowed_special: Union[Literal["all"], AbstractSet[str]] = set() + """Set of special tokens that are allowed。""" + disallowed_special: Union[Literal["all"], Collection[str]] = "all" + """Set of special tokens that are not allowed。""" + tiktoken_model_name: Optional[str] = None + """The model name to pass to tiktoken when using this class. + Tiktoken is used to count the number of tokens in documents to constrain + them to be under a certain limit. By default, when set to None, this will + be the same as the embedding model name. However, there are some cases + where you may want to use this Embedding class with a model name not + supported by tiktoken. This can include when using Azure embeddings or + when using one of the many model providers that expose an OpenAI-like + API but with different models. In those cases, in order to avoid erroring + when tiktoken is called, you can specify a model name to use here.""" + + def __new__(cls, **data: Any) -> Union[OpenAIChat, BaseOpenAI]: # type: ignore + """Initialize the OpenAI object.""" + model_name = data.get("model_name", "") + if ( + model_name.startswith("gpt-3.5-turbo") or model_name.startswith("gpt-4") + ) and "-instruct" not in model_name: + warnings.warn( + "You are trying to use a chat model. This way of initializing it is " + "no longer supported. Instead, please use: " + "`from langchain.chat_models import ChatOpenAI`" + ) + return OpenAIChat(**data) + return super().__new__(cls) + + class Config: + """Configuration for this pydantic object.""" + + allow_population_by_field_name = True + + @root_validator(pre=True) + def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]: + """Build extra kwargs from additional params that were passed in.""" + all_required_field_names = get_pydantic_field_names(cls) + extra = values.get("model_kwargs", {}) + values["model_kwargs"] = build_extra_kwargs( + extra, values, all_required_field_names + ) + return values + + @root_validator() + def validate_environment(cls, values: Dict) -> Dict: + """Validate that api key and python package exists in environment.""" + values["openai_api_key"] = get_from_dict_or_env( + values, "openai_api_key", "OPENAI_API_KEY" + ) + values["openai_api_base"] = get_from_dict_or_env( + values, + "openai_api_base", + "OPENAI_API_BASE", + default="", + ) + values["openai_proxy"] = get_from_dict_or_env( + values, + "openai_proxy", + "OPENAI_PROXY", + default="", + ) + values["openai_organization"] = get_from_dict_or_env( + values, + "openai_organization", + "OPENAI_ORGANIZATION", + default="", + ) + try: + import openai + + values["client"] = openai.Completion + except ImportError: + raise ImportError( + "Could not import openai python package. " + "Please install it with `pip install openai`." + ) + if values["streaming"] and values["n"] > 1: + raise ValueError("Cannot stream results when n > 1.") + if values["streaming"] and values["best_of"] > 1: + raise ValueError("Cannot stream results when best_of > 1.") + return values + + @property + def _default_params(self) -> Dict[str, Any]: + """Get the default parameters for calling OpenAI API.""" + normal_params = { + "temperature": self.temperature, + "max_tokens": self.max_tokens, + "top_p": self.top_p, + "frequency_penalty": self.frequency_penalty, + "presence_penalty": self.presence_penalty, + "n": self.n, + "request_timeout": self.request_timeout, + "logit_bias": self.logit_bias, + } + + # Azure gpt-35-turbo doesn't support best_of + # don't specify best_of if it is 1 + if self.best_of > 1: + normal_params["best_of"] = self.best_of + + return {**normal_params, **self.model_kwargs} + + def _stream( + self, + prompt: str, + stop: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> Iterator[GenerationChunk]: + params = {**self._invocation_params, **kwargs, "stream": True} + self.get_sub_prompts(params, [prompt], stop) # this mutates params + for stream_resp in completion_with_retry( + self, prompt=prompt, run_manager=run_manager, **params + ): + chunk = _stream_response_to_generation_chunk(stream_resp) + yield chunk + if run_manager: + run_manager.on_llm_new_token( + chunk.text, + chunk=chunk, + verbose=self.verbose, + logprobs=chunk.generation_info["logprobs"] + if chunk.generation_info + else None, + ) + + async def _astream( + self, + prompt: str, + stop: Optional[List[str]] = None, + run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> AsyncIterator[GenerationChunk]: + params = {**self._invocation_params, **kwargs, "stream": True} + self.get_sub_prompts(params, [prompt], stop) # this mutate params + async for stream_resp in await acompletion_with_retry( + self, prompt=prompt, run_manager=run_manager, **params + ): + chunk = _stream_response_to_generation_chunk(stream_resp) + yield chunk + if run_manager: + await run_manager.on_llm_new_token( + chunk.text, + chunk=chunk, + verbose=self.verbose, + logprobs=chunk.generation_info["logprobs"] + if chunk.generation_info + else None, + ) + + def _generate( + self, + prompts: List[str], + stop: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> LLMResult: + """Call out to OpenAI's endpoint with k unique prompts. + + Args: + prompts: The prompts to pass into the model. + stop: Optional list of stop words to use when generating. + + Returns: + The full LLM output. + + Example: + .. code-block:: python + + response = openai.generate(["Tell me a joke."]) + """ + # TODO: write a unit test for this + params = self._invocation_params + params = {**params, **kwargs} + sub_prompts = self.get_sub_prompts(params, prompts, stop) + choices = [] + token_usage: Dict[str, int] = {} + # Get the token usage from the response. + # Includes prompt, completion, and total tokens used. + _keys = {"completion_tokens", "prompt_tokens", "total_tokens"} + for _prompts in sub_prompts: + if self.streaming: + if len(_prompts) > 1: + raise ValueError("Cannot stream results with multiple prompts.") + + generation: Optional[GenerationChunk] = None + for chunk in self._stream(_prompts[0], stop, run_manager, **kwargs): + if generation is None: + generation = chunk + else: + generation += chunk + assert generation is not None + choices.append( + { + "text": generation.text, + "finish_reason": generation.generation_info.get("finish_reason") + if generation.generation_info + else None, + "logprobs": generation.generation_info.get("logprobs") + if generation.generation_info + else None, + } + ) + else: + response = completion_with_retry( + self, prompt=_prompts, run_manager=run_manager, **params + ) + choices.extend(response["choices"]) + update_token_usage(_keys, response, token_usage) + return self.create_llm_result(choices, prompts, token_usage) + + async def _agenerate( + self, + prompts: List[str], + stop: Optional[List[str]] = None, + run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> LLMResult: + """Call out to OpenAI's endpoint async with k unique prompts.""" + params = self._invocation_params + params = {**params, **kwargs} + sub_prompts = self.get_sub_prompts(params, prompts, stop) + choices = [] + token_usage: Dict[str, int] = {} + # Get the token usage from the response. + # Includes prompt, completion, and total tokens used. + _keys = {"completion_tokens", "prompt_tokens", "total_tokens"} + for _prompts in sub_prompts: + if self.streaming: + if len(_prompts) > 1: + raise ValueError("Cannot stream results with multiple prompts.") + + generation: Optional[GenerationChunk] = None + async for chunk in self._astream( + _prompts[0], stop, run_manager, **kwargs + ): + if generation is None: + generation = chunk + else: + generation += chunk + assert generation is not None + choices.append( + { + "text": generation.text, + "finish_reason": generation.generation_info.get("finish_reason") + if generation.generation_info + else None, + "logprobs": generation.generation_info.get("logprobs") + if generation.generation_info + else None, + } + ) + else: + response = await acompletion_with_retry( + self, prompt=_prompts, run_manager=run_manager, **params + ) + choices.extend(response["choices"]) + update_token_usage(_keys, response, token_usage) + return self.create_llm_result(choices, prompts, token_usage) + + def get_sub_prompts( + self, + params: Dict[str, Any], + prompts: List[str], + stop: Optional[List[str]] = None, + ) -> List[List[str]]: + """Get the sub prompts for llm call.""" + if stop is not None: + if "stop" in params: + raise ValueError("`stop` found in both the input and default params.") + params["stop"] = stop + if params["max_tokens"] == -1: + if len(prompts) != 1: + raise ValueError( + "max_tokens set to -1 not supported for multiple inputs." + ) + params["max_tokens"] = self.max_tokens_for_prompt(prompts[0]) + sub_prompts = [ + prompts[i : i + self.batch_size] + for i in range(0, len(prompts), self.batch_size) + ] + return sub_prompts + + def create_llm_result( + self, choices: Any, prompts: List[str], token_usage: Dict[str, int] + ) -> LLMResult: + """Create the LLMResult from the choices and prompts.""" + generations = [] + for i, _ in enumerate(prompts): + sub_choices = choices[i * self.n : (i + 1) * self.n] + generations.append( + [ + Generation( + text=choice["text"], + generation_info=dict( + finish_reason=choice.get("finish_reason"), + logprobs=choice.get("logprobs"), + ), + ) + for choice in sub_choices + ] + ) + llm_output = {"token_usage": token_usage, "model_name": self.model_name} + return LLMResult(generations=generations, llm_output=llm_output) + + @property + def _invocation_params(self) -> Dict[str, Any]: + """Get the parameters used to invoke the model.""" + openai_creds: Dict[str, Any] = { + "api_key": self.openai_api_key, + "api_base": self.openai_api_base, + "organization": self.openai_organization, + } + if self.openai_proxy: + import openai + + openai.proxy = {"http": self.openai_proxy, "https": self.openai_proxy} # type: ignore[assignment] # noqa: E501 + return {**openai_creds, **self._default_params} + + @property + def _identifying_params(self) -> Mapping[str, Any]: + """Get the identifying parameters.""" + return {**{"model_name": self.model_name}, **self._default_params} + + @property + def _llm_type(self) -> str: + """Return type of llm.""" + return "openai" + + def get_token_ids(self, text: str) -> List[int]: + """Get the token IDs using the tiktoken package.""" + # tiktoken NOT supported for Python < 3.8 + if sys.version_info[1] < 8: + return super().get_num_tokens(text) + try: + import tiktoken + except ImportError: + raise ImportError( + "Could not import tiktoken python package. " + "This is needed in order to calculate get_num_tokens. " + "Please install it with `pip install tiktoken`." + ) + + model_name = self.tiktoken_model_name or self.model_name + try: + enc = tiktoken.encoding_for_model(model_name) + except KeyError: + logger.warning("Warning: model not found. Using cl100k_base encoding.") + model = "cl100k_base" + enc = tiktoken.get_encoding(model) + + return enc.encode( + text, + allowed_special=self.allowed_special, + disallowed_special=self.disallowed_special, + ) + + @staticmethod + def modelname_to_contextsize(modelname: str) -> int: + """Calculate the maximum number of tokens possible to generate for a model. + + Args: + modelname: The modelname we want to know the context size for. + + Returns: + The maximum context size + + Example: + .. code-block:: python + + max_tokens = openai.modelname_to_contextsize("text-davinci-003") + """ + model_token_mapping = { + "gpt-4": 8192, + "gpt-4-0314": 8192, + "gpt-4-0613": 8192, + "gpt-4-32k": 32768, + "gpt-4-32k-0314": 32768, + "gpt-4-32k-0613": 32768, + "gpt-3.5-turbo": 4096, + "gpt-3.5-turbo-0301": 4096, + "gpt-3.5-turbo-0613": 4096, + "gpt-3.5-turbo-16k": 16385, + "gpt-3.5-turbo-16k-0613": 16385, + "gpt-3.5-turbo-instruct": 4096, + "text-ada-001": 2049, + "ada": 2049, + "text-babbage-001": 2040, + "babbage": 2049, + "text-curie-001": 2049, + "curie": 2049, + "davinci": 2049, + "text-davinci-003": 4097, + "text-davinci-002": 4097, + "code-davinci-002": 8001, + "code-davinci-001": 8001, + "code-cushman-002": 2048, + "code-cushman-001": 2048, + } + + # handling finetuned models + if "ft-" in modelname: + modelname = modelname.split(":")[0] + + context_size = model_token_mapping.get(modelname, None) + + if context_size is None: + raise ValueError( + f"Unknown model: {modelname}. Please provide a valid OpenAI model name." + "Known models are: " + ", ".join(model_token_mapping.keys()) + ) + + return context_size + + @property + def max_context_size(self) -> int: + """Get max context size for this model.""" + return self.modelname_to_contextsize(self.model_name) + + def max_tokens_for_prompt(self, prompt: str) -> int: + """Calculate the maximum number of tokens possible to generate for a prompt. + + Args: + prompt: The prompt to pass into the model. + + Returns: + The maximum number of tokens to generate for a prompt. + + Example: + .. code-block:: python + + max_tokens = openai.max_token_for_prompt("Tell me a joke.") + """ + num_tokens = self.get_num_tokens(prompt) + return self.max_context_size - num_tokens + + +class OpenAI(BaseOpenAI): + """OpenAI large language models. + + To use, you should have the ``openai`` python package installed, and the + environment variable ``OPENAI_API_KEY`` set with your API key. + + Any parameters that are valid to be passed to the openai.create call can be passed + in, even if not explicitly saved on this class. + + Example: + .. code-block:: python + + from langchain.llms import OpenAI + openai = OpenAI(model_name="text-davinci-003") + """ + + @property + def _invocation_params(self) -> Dict[str, Any]: + return {**{"model": self.model_name}, **super()._invocation_params} + + +class AzureOpenAI(BaseOpenAI): + """Azure-specific OpenAI large language models. + + To use, you should have the ``openai`` python package installed, and the + environment variable ``OPENAI_API_KEY`` set with your API key. + + Any parameters that are valid to be passed to the openai.create call can be passed + in, even if not explicitly saved on this class. + + Example: + .. code-block:: python + + from langchain.llms import AzureOpenAI + openai = AzureOpenAI(model_name="text-davinci-003") + """ + + deployment_name: str = "" + """Deployment name to use.""" + openai_api_type: str = "" + openai_api_version: str = "" + + @root_validator() + def validate_azure_settings(cls, values: Dict) -> Dict: + values["openai_api_version"] = get_from_dict_or_env( + values, + "openai_api_version", + "OPENAI_API_VERSION", + ) + values["openai_api_type"] = get_from_dict_or_env( + values, "openai_api_type", "OPENAI_API_TYPE", "azure" + ) + return values + + @property + def _identifying_params(self) -> Mapping[str, Any]: + return { + **{"deployment_name": self.deployment_name}, + **super()._identifying_params, + } + + @property + def _invocation_params(self) -> Dict[str, Any]: + openai_params = { + "engine": self.deployment_name, + "api_type": self.openai_api_type, + "api_version": self.openai_api_version, + } + return {**openai_params, **super()._invocation_params} + + @property + def _llm_type(self) -> str: + """Return type of llm.""" + return "azure" + + +class OpenAIChat(BaseLLM): + """OpenAI Chat large language models. + + To use, you should have the ``openai`` python package installed, and the + environment variable ``OPENAI_API_KEY`` set with your API key. + + Any parameters that are valid to be passed to the openai.create call can be passed + in, even if not explicitly saved on this class. + + Example: + .. code-block:: python + + from langchain.llms import OpenAIChat + openaichat = OpenAIChat(model_name="gpt-3.5-turbo") + """ + + client: Any #: :meta private: + model_name: str = "gpt-3.5-turbo" + """Model name to use.""" + model_kwargs: Dict[str, Any] = Field(default_factory=dict) + """Holds any model parameters valid for `create` call not explicitly specified.""" + openai_api_key: Optional[str] = None + openai_api_base: Optional[str] = None + # to support explicit proxy for OpenAI + openai_proxy: Optional[str] = None + max_retries: int = 6 + """Maximum number of retries to make when generating.""" + prefix_messages: List = Field(default_factory=list) + """Series of messages for Chat input.""" + streaming: bool = False + """Whether to stream the results or not.""" + allowed_special: Union[Literal["all"], AbstractSet[str]] = set() + """Set of special tokens that are allowed。""" + disallowed_special: Union[Literal["all"], Collection[str]] = "all" + """Set of special tokens that are not allowed。""" + + @root_validator(pre=True) + def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]: + """Build extra kwargs from additional params that were passed in.""" + all_required_field_names = {field.alias for field in cls.__fields__.values()} + + extra = values.get("model_kwargs", {}) + for field_name in list(values): + if field_name not in all_required_field_names: + if field_name in extra: + raise ValueError(f"Found {field_name} supplied twice.") + extra[field_name] = values.pop(field_name) + values["model_kwargs"] = extra + return values + + @root_validator() + def validate_environment(cls, values: Dict) -> Dict: + """Validate that api key and python package exists in environment.""" + openai_api_key = get_from_dict_or_env( + values, "openai_api_key", "OPENAI_API_KEY" + ) + openai_api_base = get_from_dict_or_env( + values, + "openai_api_base", + "OPENAI_API_BASE", + default="", + ) + openai_proxy = get_from_dict_or_env( + values, + "openai_proxy", + "OPENAI_PROXY", + default="", + ) + openai_organization = get_from_dict_or_env( + values, "openai_organization", "OPENAI_ORGANIZATION", default="" + ) + try: + import openai + + openai.api_key = openai_api_key + if openai_api_base: + openai.api_base = openai_api_base + if openai_organization: + openai.organization = openai_organization + if openai_proxy: + openai.proxy = {"http": openai_proxy, "https": openai_proxy} # type: ignore[assignment] # noqa: E501 + except ImportError: + raise ImportError( + "Could not import openai python package. " + "Please install it with `pip install openai`." + ) + try: + values["client"] = openai.ChatCompletion + except AttributeError: + raise ValueError( + "`openai` has no `ChatCompletion` attribute, this is likely " + "due to an old version of the openai package. Try upgrading it " + "with `pip install --upgrade openai`." + ) + warnings.warn( + "You are trying to use a chat model. This way of initializing it is " + "no longer supported. Instead, please use: " + "`from langchain.chat_models import ChatOpenAI`" + ) + return values + + @property + def _default_params(self) -> Dict[str, Any]: + """Get the default parameters for calling OpenAI API.""" + return self.model_kwargs + + def _get_chat_params( + self, prompts: List[str], stop: Optional[List[str]] = None + ) -> Tuple: + if len(prompts) > 1: + raise ValueError( + f"OpenAIChat currently only supports single prompt, got {prompts}" + ) + messages = self.prefix_messages + [{"role": "user", "content": prompts[0]}] + params: Dict[str, Any] = {**{"model": self.model_name}, **self._default_params} + if stop is not None: + if "stop" in params: + raise ValueError("`stop` found in both the input and default params.") + params["stop"] = stop + if params.get("max_tokens") == -1: + # for ChatGPT api, omitting max_tokens is equivalent to having no limit + del params["max_tokens"] + return messages, params + + def _stream( + self, + prompt: str, + stop: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> Iterator[GenerationChunk]: + messages, params = self._get_chat_params([prompt], stop) + params = {**params, **kwargs, "stream": True} + for stream_resp in completion_with_retry( + self, messages=messages, run_manager=run_manager, **params + ): + token = stream_resp["choices"][0]["delta"].get("content", "") + chunk = GenerationChunk(text=token) + yield chunk + if run_manager: + run_manager.on_llm_new_token(token, chunk=chunk) + + async def _astream( + self, + prompt: str, + stop: Optional[List[str]] = None, + run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> AsyncIterator[GenerationChunk]: + messages, params = self._get_chat_params([prompt], stop) + params = {**params, **kwargs, "stream": True} + async for stream_resp in await acompletion_with_retry( + self, messages=messages, run_manager=run_manager, **params + ): + token = stream_resp["choices"][0]["delta"].get("content", "") + chunk = GenerationChunk(text=token) + yield chunk + if run_manager: + await run_manager.on_llm_new_token(token, chunk=chunk) + + def _generate( + self, + prompts: List[str], + stop: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> LLMResult: + if self.streaming: + generation: Optional[GenerationChunk] = None + for chunk in self._stream(prompts[0], stop, run_manager, **kwargs): + if generation is None: + generation = chunk + else: + generation += chunk + assert generation is not None + return LLMResult(generations=[[generation]]) + + messages, params = self._get_chat_params(prompts, stop) + params = {**params, **kwargs} + full_response = completion_with_retry( + self, messages=messages, run_manager=run_manager, **params + ) + llm_output = { + "token_usage": full_response["usage"], + "model_name": self.model_name, + } + return LLMResult( + generations=[ + [Generation(text=full_response["choices"][0]["message"]["content"])] + ], + llm_output=llm_output, + ) + + async def _agenerate( + self, + prompts: List[str], + stop: Optional[List[str]] = None, + run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> LLMResult: + if self.streaming: + generation: Optional[GenerationChunk] = None + async for chunk in self._astream(prompts[0], stop, run_manager, **kwargs): + if generation is None: + generation = chunk + else: + generation += chunk + assert generation is not None + return LLMResult(generations=[[generation]]) + + messages, params = self._get_chat_params(prompts, stop) + params = {**params, **kwargs} + full_response = await acompletion_with_retry( + self, messages=messages, run_manager=run_manager, **params + ) + llm_output = { + "token_usage": full_response["usage"], + "model_name": self.model_name, + } + return LLMResult( + generations=[ + [Generation(text=full_response["choices"][0]["message"]["content"])] + ], + llm_output=llm_output, + ) + + @property + def _identifying_params(self) -> Mapping[str, Any]: + """Get the identifying parameters.""" + return {**{"model_name": self.model_name}, **self._default_params} + + @property + def _llm_type(self) -> str: + """Return type of llm.""" + return "openai-chat" + + def get_token_ids(self, text: str) -> List[int]: + """Get the token IDs using the tiktoken package.""" + # tiktoken NOT supported for Python < 3.8 + if sys.version_info[1] < 8: + return super().get_token_ids(text) + try: + import tiktoken + except ImportError: + raise ImportError( + "Could not import tiktoken python package. " + "This is needed in order to calculate get_num_tokens. " + "Please install it with `pip install tiktoken`." + ) + + enc = tiktoken.encoding_for_model(self.model_name) + return enc.encode( + text, + allowed_special=self.allowed_special, + disallowed_special=self.disallowed_special, + ) \ No newline at end of file diff --git a/swarms/swarms/groupchat.py b/swarms/swarms/groupchat.py new file mode 100644 index 00000000..3da4d7a1 --- /dev/null +++ b/swarms/swarms/groupchat.py @@ -0,0 +1,156 @@ +from dataclasses import dataclass +import sys +from typing import Dict, List, Optional, Union +from swarms.workers.worker import Worker + +@dataclass +class GroupChat: + """A group chat with multiple participants with a list of agents and a max number of rounds""" + + agents: List[Worker] + messages: List[Dict] + max_rounds: int = 10 + admin_name: str = "Admin" #admin agent + + @property + def agent_names(self) -> List[str]: + """returns the names of the agents in the group chat""" + return [agent.ai_name for agent in self.agents] + + def reset(self): + self.messages.clear() + + def agent_by_name(self, name: str) -> Worker: + """Find the next speaker baed on the message""" + return self.agents[self.agent_names.index(name)] + + def next_agent(self, agent: Worker) -> Worker: + """Returns the next agent in the list""" + return self.agents[ + (self.agents_names.index(agent.ai_name) + 1) % len(self.agents) + ] + + def select_speaker_msg(self): + """Return the message to select the next speaker""" + + return f""" + You are in a role play game the following rules are available: + {self.__participant_roles()}. + + Read the following conversation then select the next role from {self.agent_names} + to play and only return the role + """ + + def select_speaker( + self, + last_speaker: Worker, + selector: Worker, + ): + """Selects the next speaker""" + selector.update_system_message(self.select_speaker_msg()) + + final, name = selector.run( + self.messages + [ + { + "role": "system", + "context": f"Read the above conversation. Then select the next role from {self.agent_names} to play. Only return the role.", + } + ] + ) + if not final: + return self.next_agent(last_speaker) + try: + return self.agent_by_name(name) + except ValueError: + return self.next_agent(last_speaker) + + def _participant_roles(self): + return "\n".join( + [f"{agent.ai_name}: {agent.system_message}" for agent in self.agents] + ) + + +class GroupChatManager(Worker): + def __init__( + self, + groupchat: GroupChat, + name: Optional[str] = "chat_manager", + #unlimited auto reply + max_consecutive_auto_reply: Optional[int] = sys.maxsize, + human_input_mode: Optional[str] = "NEVER", + system_message: Optional[str] = "Group chat manager", + **kwargs + ): + super().__init__( + name=name, + max_consecutive_auto_reply=max_consecutive_auto_reply, + human_input_mode=human_input_mode, + system_message=system_message, + **kwargs + ) + self.register_reply( + Worker, + GroupChatManager.run_chat, + config=groupchat, + reset_config=GroupChat.reset + ) + + def run( + self, + messages: Optional[List[Dict]] = None, + sender: Optional[Worker] = None, + config: Optional[GroupChat] = None, + ) -> Union[str, Dict, None]: + #run + if messages is None: + messages = [] + + message = messages[-1] + speaker = sender + groupchat = config + + for i in range(groupchat.max_rounds): + if message["role"] != "function": + message["name"]= speaker.ai_name + + groupchat.messages.append(message) + + #broadcast the message to all agents except the speaker + for agent in groupchat.agents: + if agent != speaker: + self.send( + message, + agent, + request_reply=False, + silent=True, + ) + if i == groupchat.max_rounds - 1: + break + + try: + #select next speaker + speaker = groupchat.select_speaker(speaker, self) + #let the speaker speak + reply = speaker.generate_reply(sender=self) + + except KeyboardInterrupt: + #let the admin speak if interrupted + if groupchat.admin_name in groupchat.agent_names: + #admin agent is a particpant + speaker = groupchat.agent_by_name(groupchat.admin_name) + reply = speaker.generate_reply(sender=self) + else: + #admin agent is not found in particpants + raise + if reply is None: + break + + #speaker sends message without requesting a reply + speaker.send( + reply, + self, + request_reply=False + ) + message = self.last_message(speaker) + message = self.last_messge(speaker) + return True, None \ No newline at end of file