parent
d307b358d0
commit
dc9ff909be
@ -1,357 +0,0 @@
|
||||
"""OpenAI chat wrapper."""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple
|
||||
|
||||
import openai
|
||||
from langchain.chat_models.base import BaseChatModel
|
||||
from langchain.schema import (
|
||||
AIMessage,
|
||||
BaseMessage,
|
||||
ChatGeneration,
|
||||
ChatMessage,
|
||||
ChatResult,
|
||||
HumanMessage,
|
||||
SystemMessage,
|
||||
)
|
||||
from langchain.utils import get_from_dict_or_env
|
||||
from pydantic import BaseModel, Extra, Field, root_validator
|
||||
from tenacity import (
|
||||
before_sleep_log,
|
||||
retry,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
from swarms.utils.logger import logger
|
||||
|
||||
# from ansi import ANSI, Color, Style
|
||||
from swarms.utils.main import ANSI, Color, Style
|
||||
|
||||
|
||||
def _create_retry_decorator(llm: ChatOpenAI) -> Callable[[Any], Any]:
|
||||
import openai
|
||||
|
||||
min_seconds = 4
|
||||
max_seconds = 10
|
||||
# Wait 2^x * 1 second between each retry starting with
|
||||
# 4 seconds, then up to 10 seconds, then 10 seconds afterwards
|
||||
return retry(
|
||||
reraise=True,
|
||||
stop=stop_after_attempt(llm.max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=min_seconds, max=max_seconds),
|
||||
retry=(
|
||||
retry_if_exception_type(openai.error.Timeout)
|
||||
| retry_if_exception_type(openai.error.APIError)
|
||||
| retry_if_exception_type(openai.error.APIConnectionError)
|
||||
| retry_if_exception_type(openai.error.RateLimitError)
|
||||
| retry_if_exception_type(openai.error.ServiceUnavailableError)
|
||||
),
|
||||
before_sleep=before_sleep_log(logger, logging.WARNING),
|
||||
)
|
||||
|
||||
|
||||
async def acompletion_with_retry(llm: ChatOpenAI, **kwargs: Any) -> Any:
|
||||
"""Use tenacity to retry the async completion call."""
|
||||
retry_decorator = _create_retry_decorator(llm)
|
||||
|
||||
@retry_decorator
|
||||
async def _completion_with_retry(**kwargs: Any) -> Any:
|
||||
# Use OpenAI's async api https://github.com/openai/openai-python#async-api
|
||||
return await llm.client.acreate(**kwargs)
|
||||
|
||||
return await _completion_with_retry(**kwargs)
|
||||
|
||||
|
||||
def _convert_dict_to_message(_dict: dict) -> BaseMessage:
|
||||
role = _dict["role"]
|
||||
if role == "user":
|
||||
return HumanMessage(content=_dict["content"])
|
||||
elif role == "assistant":
|
||||
return AIMessage(content=_dict["content"])
|
||||
elif role == "system":
|
||||
return SystemMessage(content=_dict["content"])
|
||||
else:
|
||||
return ChatMessage(content=_dict["content"], role=role)
|
||||
|
||||
|
||||
def _convert_message_to_dict(message: BaseMessage) -> dict:
|
||||
if isinstance(message, ChatMessage):
|
||||
message_dict = {"role": message.role, "content": message.content}
|
||||
elif isinstance(message, HumanMessage):
|
||||
message_dict = {"role": "user", "content": message.content}
|
||||
elif isinstance(message, AIMessage):
|
||||
message_dict = {"role": "assistant", "content": message.content}
|
||||
elif isinstance(message, SystemMessage):
|
||||
message_dict = {"role": "system", "content": message.content}
|
||||
else:
|
||||
raise ValueError(f"Got unknown type {message}")
|
||||
if "name" in message.additional_kwargs:
|
||||
message_dict["name"] = message.additional_kwargs["name"]
|
||||
return message_dict
|
||||
|
||||
|
||||
def _create_chat_result(response: Mapping[str, Any]) -> ChatResult:
|
||||
generations = []
|
||||
for res in response["choices"]:
|
||||
message = _convert_dict_to_message(res["message"])
|
||||
gen = ChatGeneration(message=message)
|
||||
generations.append(gen)
|
||||
return ChatResult(generations=generations)
|
||||
|
||||
|
||||
class ModelNotFoundException(Exception):
|
||||
"""Exception raised when the model is not found."""
|
||||
|
||||
def __init__(self, model_name: str):
|
||||
self.model_name = model_name
|
||||
super().__init__(
|
||||
f"\n\nModel {ANSI(self.model_name).to(Color.red())} does not exist.\nMake sure if you have access to the model.\n"
|
||||
+ f"You can set the model name with the environment variable {ANSI('MODEL_NAME').to(Style.bold())} on {ANSI('.env').to(Style.bold())}.\n"
|
||||
+ "\nex) MODEL_NAME=gpt-4\n"
|
||||
+ ANSI(
|
||||
"\nLooks like you don't have access to gpt-4 yet. Try using `gpt-3.5-turbo`."
|
||||
if self.model_name == "gpt-4"
|
||||
else ""
|
||||
).to(Style.italic())
|
||||
)
|
||||
|
||||
|
||||
class ChatOpenAI(BaseChatModel, BaseModel):
|
||||
"""Wrapper around OpenAI Chat large language models.
|
||||
|
||||
To use, you should have the ``openai`` python package installed, and the
|
||||
environment variable ``OPENAI_API_KEY`` set with your API key.
|
||||
|
||||
Any parameters that are valid to be passed to the openai.create call can be passed
|
||||
in, even if not explicitly saved on this class.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
from langchain.chat_models import ChatOpenAI
|
||||
openai = ChatOpenAI(model_name="gpt-3.5-turbo")
|
||||
"""
|
||||
|
||||
client: Any #: :meta private:
|
||||
model_name: str = os.environ.get("MODEL_NAME", "gpt-4")
|
||||
"""Model name to use."""
|
||||
model_kwargs: Dict[str, Any] = Field(default_factory=dict)
|
||||
"""Holds any model parameters valid for `create` call not explicitly specified."""
|
||||
openai_api_key: Optional[str] = None
|
||||
max_retries: int = 6
|
||||
"""Maximum number of retries to make when generating."""
|
||||
streaming: bool = False
|
||||
"""Whether to stream the results or not."""
|
||||
n: int = 1
|
||||
"""Number of chat completions to generate for each prompt."""
|
||||
max_tokens: int = 2048
|
||||
"""Maximum number of tokens to generate."""
|
||||
|
||||
class Config:
|
||||
"""Configuration for this pydantic object."""
|
||||
|
||||
extra = Extra.ignore
|
||||
|
||||
def check_access(self) -> None:
|
||||
"""Check that the user has access to the model."""
|
||||
|
||||
try:
|
||||
openai.Engine.retrieve(self.model_name)
|
||||
except openai.error.InvalidRequestError:
|
||||
raise ModelNotFoundException(self.model_name)
|
||||
|
||||
@root_validator(pre=True)
|
||||
def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Build extra kwargs from additional params that were passed in."""
|
||||
all_required_field_names = {field.alias for field in cls.__fields__.values()}
|
||||
|
||||
extra = values.get("model_kwargs", {})
|
||||
for field_name in list(values):
|
||||
if field_name not in all_required_field_names:
|
||||
if field_name in extra:
|
||||
raise ValueError(f"Found {field_name} supplied twice.")
|
||||
extra[field_name] = values.pop(field_name)
|
||||
values["model_kwargs"] = extra
|
||||
return values
|
||||
|
||||
@root_validator()
|
||||
def validate_environment(cls, values: Dict) -> Dict:
|
||||
"""Validate that api key and python package exists in environment."""
|
||||
openai_api_key = get_from_dict_or_env(
|
||||
values, "openai_api_key", "OPENAI_API_KEY"
|
||||
)
|
||||
try:
|
||||
import openai
|
||||
|
||||
openai.api_key = openai_api_key
|
||||
except ImportError:
|
||||
raise ValueError(
|
||||
"Could not import openai python package. "
|
||||
"Please it install it with `pip install openai`."
|
||||
)
|
||||
try:
|
||||
values["client"] = openai.ChatCompletion
|
||||
except AttributeError:
|
||||
raise ValueError(
|
||||
"`openai` has no `ChatCompletion` attribute, this is likely "
|
||||
"due to an old version of the openai package. Try upgrading it "
|
||||
"with `pip install --upgrade openai`."
|
||||
)
|
||||
if values["n"] < 1:
|
||||
raise ValueError("n must be at least 1.")
|
||||
if values["n"] > 1 and values["streaming"]:
|
||||
raise ValueError("n must be 1 when streaming.")
|
||||
return values
|
||||
|
||||
@property
|
||||
def _default_params(self) -> Dict[str, Any]:
|
||||
"""Get the default parameters for calling OpenAI API."""
|
||||
return {
|
||||
"model": self.model_name,
|
||||
"max_tokens": self.max_tokens,
|
||||
"stream": self.streaming,
|
||||
"n": self.n,
|
||||
**self.model_kwargs,
|
||||
}
|
||||
|
||||
def _create_retry_decorator(self) -> Callable[[Any], Any]:
|
||||
import openai
|
||||
|
||||
min_seconds = 4
|
||||
max_seconds = 10
|
||||
# Wait 2^x * 1 second between each retry starting with
|
||||
# 4 seconds, then up to 10 seconds, then 10 seconds afterwards
|
||||
return retry(
|
||||
reraise=True,
|
||||
stop=stop_after_attempt(self.max_retries),
|
||||
wait=wait_exponential(multiplier=1, min=min_seconds, max=max_seconds),
|
||||
retry=(
|
||||
retry_if_exception_type(openai.error.Timeout)
|
||||
| retry_if_exception_type(openai.error.APIError)
|
||||
| retry_if_exception_type(openai.error.APIConnectionError)
|
||||
| retry_if_exception_type(openai.error.RateLimitError)
|
||||
| retry_if_exception_type(openai.error.ServiceUnavailableError)
|
||||
),
|
||||
before_sleep=before_sleep_log(logger, logging.WARNING),
|
||||
)
|
||||
|
||||
def completion_with_retry(self, **kwargs: Any) -> Any:
|
||||
"""Use tenacity to retry the completion call."""
|
||||
retry_decorator = self._create_retry_decorator()
|
||||
|
||||
@retry_decorator
|
||||
def _completion_with_retry(**kwargs: Any) -> Any:
|
||||
response = self.client.create(**kwargs)
|
||||
logger.debug("Response:\n\t%s", response)
|
||||
return response
|
||||
|
||||
return _completion_with_retry(**kwargs)
|
||||
|
||||
def _generate(
|
||||
self, messages: List[BaseMessage], stop: Optional[List[str]] = None
|
||||
) -> ChatResult:
|
||||
message_dicts, params = self._create_message_dicts(messages, stop)
|
||||
logger.debug("Messages:\n")
|
||||
for item in message_dicts:
|
||||
for k, v in item.items():
|
||||
logger.debug(f"\t\t{k}: {v}")
|
||||
logger.debug("\t-------")
|
||||
logger.debug("===========")
|
||||
|
||||
if self.streaming:
|
||||
inner_completion = ""
|
||||
role = "assistant"
|
||||
params["stream"] = True
|
||||
for stream_resp in self.completion_with_retry(
|
||||
messages=message_dicts, **params
|
||||
):
|
||||
role = stream_resp["choices"][0]["delta"].get("role", role)
|
||||
token = stream_resp["choices"][0]["delta"].get("content", "")
|
||||
inner_completion += token
|
||||
self.callback_manager.on_llm_new_token(
|
||||
token,
|
||||
verbose=self.verbose,
|
||||
)
|
||||
message = _convert_dict_to_message(
|
||||
{"content": inner_completion, "role": role}
|
||||
)
|
||||
return ChatResult(generations=[ChatGeneration(message=message)])
|
||||
response = self.completion_with_retry(messages=message_dicts, **params)
|
||||
return _create_chat_result(response)
|
||||
|
||||
def _create_message_dicts(
|
||||
self, messages: List[BaseMessage], stop: Optional[List[str]]
|
||||
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
|
||||
params: Dict[str, Any] = {**{"model": self.model_name}, **self._default_params}
|
||||
if stop is not None:
|
||||
if "stop" in params:
|
||||
raise ValueError("`stop` found in both the input and default params.")
|
||||
params["stop"] = stop
|
||||
message_dicts = [_convert_message_to_dict(m) for m in messages]
|
||||
return message_dicts, params
|
||||
|
||||
async def _agenerate(
|
||||
self, messages: List[BaseMessage], stop: Optional[List[str]] = None
|
||||
) -> ChatResult:
|
||||
message_dicts, params = self._create_message_dicts(messages, stop)
|
||||
if self.streaming:
|
||||
inner_completion = ""
|
||||
role = "assistant"
|
||||
params["stream"] = True
|
||||
async for stream_resp in await acompletion_with_retry(
|
||||
self, messages=message_dicts, **params
|
||||
):
|
||||
role = stream_resp["choices"][0]["delta"].get("role", role)
|
||||
token = stream_resp["choices"][0]["delta"].get("content", "")
|
||||
inner_completion += token
|
||||
if self.callback_manager.is_async:
|
||||
await self.callback_manager.on_llm_new_token(
|
||||
token,
|
||||
verbose=self.verbose,
|
||||
)
|
||||
else:
|
||||
self.callback_manager.on_llm_new_token(
|
||||
token,
|
||||
verbose=self.verbose,
|
||||
)
|
||||
message = _convert_dict_to_message(
|
||||
{"content": inner_completion, "role": role}
|
||||
)
|
||||
return ChatResult(generations=[ChatGeneration(message=message)])
|
||||
else:
|
||||
response = await acompletion_with_retry(
|
||||
self, messages=message_dicts, **params
|
||||
)
|
||||
return _create_chat_result(response)
|
||||
|
||||
@property
|
||||
def _identifying_params(self) -> Mapping[str, Any]:
|
||||
"""Get the identifying parameters."""
|
||||
return {**{"model_name": self.model_name}, **self._default_params}
|
||||
|
||||
def get_num_tokens(self, text: str) -> int:
|
||||
"""Calculate num tokens with tiktoken package."""
|
||||
# tiktoken NOT supported for Python 3.8 or below
|
||||
if sys.version_info[1] <= 8:
|
||||
return super().get_num_tokens(text)
|
||||
try:
|
||||
import tiktoken
|
||||
except ImportError:
|
||||
raise ValueError(
|
||||
"Could not import tiktoken python package. "
|
||||
"This is needed in order to calculate get_num_tokens. "
|
||||
"Please it install it with `pip install tiktoken`."
|
||||
)
|
||||
# create a GPT-3.5-Turbo encoder instance
|
||||
enc = tiktoken.encoding_for_model(self.model_name)
|
||||
|
||||
# encode the text using the GPT-3.5-Turbo encoder
|
||||
tokenized_text = enc.encode(text)
|
||||
|
||||
# calculate the number of tokens in the encoded text
|
||||
return len(tokenized_text)
|
@ -1,150 +0,0 @@
|
||||
import logging
|
||||
from typing import Any, List, Optional, Sequence, Tuple
|
||||
|
||||
from langchain.agents.agent import AgentOutputParser
|
||||
from langchain.base_language import BaseLanguageModel
|
||||
from langchain.callbacks.base import BaseCallbackManager
|
||||
from langchain.chains import LLMChain
|
||||
from langchain.prompts.base import BasePromptTemplate
|
||||
from langchain.prompts.chat import (
|
||||
ChatPromptTemplate,
|
||||
HumanMessagePromptTemplate,
|
||||
MessagesPlaceholder,
|
||||
SystemMessagePromptTemplate,
|
||||
)
|
||||
from langchain.schema import (
|
||||
AgentAction,
|
||||
AIMessage,
|
||||
BaseMessage,
|
||||
BaseOutputParser,
|
||||
HumanMessage,
|
||||
)
|
||||
from langchain.tools.base import BaseTool
|
||||
|
||||
from swarms.models.prompts.prebuild.multi_modal_prompts import EVAL_TOOL_RESPONSE
|
||||
from swarms.agents.utils.Agent import Agent
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
class ConversationalChatAgent(Agent):
|
||||
"""An agent designed to hold a conversation in addition to using tools."""
|
||||
|
||||
output_parser: BaseOutputParser
|
||||
|
||||
@property
|
||||
def _agent_type(self) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
def _get_default_output_parser(cls, **kwargs: Any) -> AgentOutputParser:
|
||||
"""Get default output parser for this class."""
|
||||
|
||||
|
||||
@property
|
||||
def observation_prefix(self) -> str:
|
||||
"""Prefix to append the observation with."""
|
||||
return "Observation: "
|
||||
|
||||
@property
|
||||
def llm_prefix(self) -> str:
|
||||
"""Prefix to append the llm call with."""
|
||||
return "Thought: "
|
||||
|
||||
@classmethod
|
||||
def create_prompt(
|
||||
cls,
|
||||
tools: Sequence[BaseTool],
|
||||
system_message: str,
|
||||
human_message: str,
|
||||
output_parser: BaseOutputParser,
|
||||
input_variables: Optional[List[str]] = None,
|
||||
) -> BasePromptTemplate:
|
||||
if not isinstance(tools, Sequence):
|
||||
raise TypeError("Tools must be a sequence")
|
||||
if not isinstance(system_message, str):
|
||||
raise TypeError("System message must be a string")
|
||||
if not isinstance(human_message, str):
|
||||
raise TypeError("Human message must be a string")
|
||||
if not isinstance(output_parser, BaseOutputParser):
|
||||
raise TypeError("Output parser must be an instance of BaseOutputParser")
|
||||
if input_variables and not isinstance(input_variables, list):
|
||||
raise TypeError("Input variables must be a list")
|
||||
|
||||
tool_strings = "\n".join(
|
||||
[f"> {tool.name}: {tool.description}" for tool in tools]
|
||||
)
|
||||
tool_names = ", ".join([tool.name for tool in tools])
|
||||
format_instructions = human_message.format(
|
||||
format_instructions=output_parser.get_format_instructions()
|
||||
)
|
||||
final_prompt = format_instructions.format(
|
||||
tool_names=tool_names, tools=tool_strings
|
||||
)
|
||||
if input_variables is None:
|
||||
input_variables = ["input", "chat_history", "agent_scratchpad"]
|
||||
messages = [
|
||||
SystemMessagePromptTemplate.from_template(system_message),
|
||||
MessagesPlaceholder(variable_name="chat_history"),
|
||||
HumanMessagePromptTemplate.from_template(final_prompt),
|
||||
MessagesPlaceholder(variable_name="agent_scratchpad"),
|
||||
]
|
||||
return ChatPromptTemplate(input_variables=input_variables, messages=messages)
|
||||
|
||||
def _extract_tool_and_input(self, llm_output: str) -> Optional[Tuple[str, str]]:
|
||||
try:
|
||||
response = self.output_parser.parse(llm_output)
|
||||
return response["action"], response["action_input"]
|
||||
except Exception as e:
|
||||
logging.error(f"Error while extracting tool and input: {str(e)}")
|
||||
raise ValueError(f"Could not parse LLM output: {llm_output}")
|
||||
|
||||
def _construct_scratchpad(
|
||||
self, intermediate_steps: List[Tuple[AgentAction, str]]
|
||||
) -> List[BaseMessage]:
|
||||
"""Construct the scratchpad that lets the agent continue its thought process."""
|
||||
thoughts: List[BaseMessage] = []
|
||||
for action, observation in intermediate_steps:
|
||||
thoughts.append(AIMessage(content=action.log))
|
||||
human_message = HumanMessage(
|
||||
content=EVAL_TOOL_RESPONSE.format(observation=observation)
|
||||
)
|
||||
thoughts.append(human_message)
|
||||
return thoughts
|
||||
|
||||
@classmethod
|
||||
def from_llm_and_tools(
|
||||
cls,
|
||||
llm: BaseLanguageModel,
|
||||
tools: Sequence[BaseTool],
|
||||
system_message: str,
|
||||
human_message: str,
|
||||
output_parser: BaseOutputParser,
|
||||
callback_manager: Optional[BaseCallbackManager] = None,
|
||||
input_variables: Optional[List[str]] = None,
|
||||
**kwargs: Any,
|
||||
) -> Agent:
|
||||
"""Construct an agent from an LLM and tools."""
|
||||
cls._validate_tools(tools)
|
||||
prompt = cls.create_prompt(
|
||||
tools,
|
||||
system_message=system_message,
|
||||
human_message=human_message,
|
||||
input_variables=input_variables,
|
||||
output_parser=output_parser,
|
||||
)
|
||||
llm_chain = LLMChain(
|
||||
llm=llm,
|
||||
prompt=prompt,
|
||||
callback_manager=callback_manager,
|
||||
)
|
||||
tool_names = [tool.name for tool in tools]
|
||||
try:
|
||||
return cls(
|
||||
llm_chain=llm_chain,
|
||||
allowed_tools=tool_names,
|
||||
output_parser=output_parser,
|
||||
**kwargs,
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Error while creating agent from LLM and tools: {str(e)}")
|
||||
raise e
|
||||
|
@ -1,24 +0,0 @@
|
||||
"""Tool for asking human input."""
|
||||
|
||||
|
||||
class HumanInputRun:
|
||||
"""Tool that asks user for input."""
|
||||
|
||||
def __init__(self, prompt_func=None, input_func=None):
|
||||
self.name = "human"
|
||||
self.description = (
|
||||
"You can ask a human for guidance when you think you "
|
||||
"got stuck or you are not sure what to do next. "
|
||||
"The input should be a question for the human."
|
||||
)
|
||||
self.prompt_func = prompt_func if prompt_func else self._print_func
|
||||
self.input_func = input_func if input_func else input
|
||||
|
||||
def _print_func(self, text: str) -> None:
|
||||
print("\n")
|
||||
print(text)
|
||||
|
||||
def run(self, query: str) -> str:
|
||||
"""Use the Human input tool."""
|
||||
self.prompt_func(query)
|
||||
return self.input_func()
|
@ -1,506 +0,0 @@
|
||||
"""Interface for vector stores."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import math
|
||||
import warnings
|
||||
from abc import ABC, abstractmethod
|
||||
from functools import partial
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
ClassVar,
|
||||
Collection,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
Type,
|
||||
TypeVar,
|
||||
)
|
||||
|
||||
from langchain.callbacks.manager import (
|
||||
AsyncCallbackManagerForRetrieverRun,
|
||||
CallbackManagerForRetrieverRun,
|
||||
)
|
||||
|
||||
from swarms.memory.document import Document
|
||||
from swarms.embeddings.base import Embeddings
|
||||
|
||||
from langchain.schema import BaseRetriever
|
||||
|
||||
from pydantic import Field, root_validator
|
||||
|
||||
VST = TypeVar("VST", bound="VectorStore")
|
||||
|
||||
|
||||
class VectorStore(ABC):
|
||||
"""Interface for vector stores."""
|
||||
|
||||
@abstractmethod
|
||||
def add_texts(
|
||||
self,
|
||||
texts: Iterable[str],
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[str]:
|
||||
"""Run more texts through the embeddings and add to the vectorstore.
|
||||
|
||||
Args:
|
||||
texts: Iterable of strings to add to the vectorstore.
|
||||
metadatas: Optional list of metadatas associated with the texts.
|
||||
kwargs: vectorstore specific parameters
|
||||
|
||||
Returns:
|
||||
List of ids from adding the texts into the vectorstore.
|
||||
"""
|
||||
|
||||
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
|
||||
"""Delete by vector ID or other criteria.
|
||||
|
||||
Args:
|
||||
ids: List of ids to delete.
|
||||
**kwargs: Other keyword arguments that subclasses might use.
|
||||
|
||||
Returns:
|
||||
Optional[bool]: True if deletion is successful,
|
||||
False otherwise, None if not implemented.
|
||||
"""
|
||||
|
||||
raise NotImplementedError("delete method must be implemented by subclass.")
|
||||
|
||||
async def aadd_texts(
|
||||
self,
|
||||
texts: Iterable[str],
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[str]:
|
||||
"""Run more texts through the embeddings and add to the vectorstore."""
|
||||
raise NotImplementedError
|
||||
|
||||
def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]:
|
||||
"""Run more documents through the embeddings and add to the vectorstore.
|
||||
|
||||
Args:
|
||||
documents (List[Document]: Documents to add to the vectorstore.
|
||||
|
||||
Returns:
|
||||
List[str]: List of IDs of the added texts.
|
||||
"""
|
||||
# TODO: Handle the case where the user doesn't provide ids on the Collection
|
||||
texts = [doc.page_content for doc in documents]
|
||||
metadatas = [doc.metadata for doc in documents]
|
||||
return self.add_texts(texts, metadatas, **kwargs)
|
||||
|
||||
async def aadd_documents(
|
||||
self, documents: List[Document], **kwargs: Any
|
||||
) -> List[str]:
|
||||
"""Run more documents through the embeddings and add to the vectorstore.
|
||||
|
||||
Args:
|
||||
documents (List[Document]: Documents to add to the vectorstore.
|
||||
|
||||
Returns:
|
||||
List[str]: List of IDs of the added texts.
|
||||
"""
|
||||
texts = [doc.page_content for doc in documents]
|
||||
metadatas = [doc.metadata for doc in documents]
|
||||
return await self.aadd_texts(texts, metadatas, **kwargs)
|
||||
|
||||
def search(self, query: str, search_type: str, **kwargs: Any) -> List[Document]:
|
||||
"""Return docs most similar to query using specified search type."""
|
||||
if search_type == "similarity":
|
||||
return self.similarity_search(query, **kwargs)
|
||||
elif search_type == "mmr":
|
||||
return self.max_marginal_relevance_search(query, **kwargs)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"search_type of {search_type} not allowed. Expected "
|
||||
"search_type to be 'similarity' or 'mmr'."
|
||||
)
|
||||
|
||||
async def asearch(
|
||||
self, query: str, search_type: str, **kwargs: Any
|
||||
) -> List[Document]:
|
||||
"""Return docs most similar to query using specified search type."""
|
||||
if search_type == "similarity":
|
||||
return await self.asimilarity_search(query, **kwargs)
|
||||
elif search_type == "mmr":
|
||||
return await self.amax_marginal_relevance_search(query, **kwargs)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"search_type of {search_type} not allowed. Expected "
|
||||
"search_type to be 'similarity' or 'mmr'."
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
def similarity_search(
|
||||
self, query: str, k: int = 4, **kwargs: Any
|
||||
) -> List[Document]:
|
||||
"""Return docs most similar to query."""
|
||||
|
||||
@staticmethod
|
||||
def _euclidean_relevance_score_fn(distance: float) -> float:
|
||||
"""Return a similarity score on a scale [0, 1]."""
|
||||
return 1.0 - distance / math.sqrt(2)
|
||||
|
||||
@staticmethod
|
||||
def _cosine_relevance_score_fn(distance: float) -> float:
|
||||
"""Normalize the distance to a score on a scale [0, 1]."""
|
||||
|
||||
return 1.0 - distance
|
||||
|
||||
@staticmethod
|
||||
def _max_inner_product_relevance_score_fn(distance: float) -> float:
|
||||
"""Normalize the distance to a score on a scale [0, 1]."""
|
||||
if distance > 0:
|
||||
return 1.0 - distance
|
||||
|
||||
return -1.0 * distance
|
||||
|
||||
def _select_relevance_score_fn(self) -> Callable[[float], float]:
|
||||
"""
|
||||
The 'correct' relevance function
|
||||
may differ depending on a few things, including:
|
||||
- the distance / similarity metric used by the VectorStore
|
||||
- the scale of your embeddings (OpenAI's are unit normed. Many others are not!)
|
||||
- embedding dimensionality
|
||||
- etc.
|
||||
|
||||
Vectorstores should define their own selection based method of relevance.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def similarity_search_with_score(
|
||||
self, *args: Any, **kwargs: Any
|
||||
) -> List[Tuple[Document, float]]:
|
||||
"""Run similarity search with distance."""
|
||||
raise NotImplementedError
|
||||
|
||||
def _similarity_search_with_relevance_scores(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
**kwargs: Any,
|
||||
) -> List[Tuple[Document, float]]:
|
||||
"""
|
||||
Default similarity search with relevance scores. Modify if necessary
|
||||
in subclass.
|
||||
Return docs and relevance scores in the range [0, 1].
|
||||
|
||||
0 is dissimilar, 1 is most similar.
|
||||
|
||||
Args:
|
||||
query: input text
|
||||
k: Number of Documents to return. Defaults to 4.
|
||||
**kwargs: kwargs to be passed to similarity search. Should include:
|
||||
score_threshold: Optional, a floating point value between 0 to 1 to
|
||||
filter the resulting set of retrieved docs
|
||||
|
||||
Returns:
|
||||
List of Tuples of (doc, similarity_score)
|
||||
"""
|
||||
relevance_score_fn = self._select_relevance_score_fn()
|
||||
docs_and_scores = self.similarity_search_with_score(query, k, **kwargs)
|
||||
return [(doc, relevance_score_fn(score)) for doc, score in docs_and_scores]
|
||||
|
||||
def similarity_search_with_relevance_scores(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
**kwargs: Any,
|
||||
) -> List[Tuple[Document, float]]:
|
||||
"""Return docs and relevance scores in the range [0, 1].
|
||||
|
||||
0 is dissimilar, 1 is most similar.
|
||||
|
||||
Args:
|
||||
query: input text
|
||||
k: Number of Documents to return. Defaults to 4.
|
||||
**kwargs: kwargs to be passed to similarity search. Should include:
|
||||
score_threshold: Optional, a floating point value between 0 to 1 to
|
||||
filter the resulting set of retrieved docs
|
||||
|
||||
Returns:
|
||||
List of Tuples of (doc, similarity_score)
|
||||
"""
|
||||
score_threshold = kwargs.pop("score_threshold", None)
|
||||
|
||||
docs_and_similarities = self._similarity_search_with_relevance_scores(
|
||||
query, k=k, **kwargs
|
||||
)
|
||||
if any(
|
||||
similarity < 0.0 or similarity > 1.0
|
||||
for _, similarity in docs_and_similarities
|
||||
):
|
||||
warnings.warn(
|
||||
"Relevance scores must be between"
|
||||
f" 0 and 1, got {docs_and_similarities}"
|
||||
)
|
||||
|
||||
if score_threshold is not None:
|
||||
docs_and_similarities = [
|
||||
(doc, similarity)
|
||||
for doc, similarity in docs_and_similarities
|
||||
if similarity >= score_threshold
|
||||
]
|
||||
if len(docs_and_similarities) == 0:
|
||||
warnings.warn(
|
||||
"No relevant docs were retrieved using the relevance score"
|
||||
f" threshold {score_threshold}"
|
||||
)
|
||||
return docs_and_similarities
|
||||
|
||||
async def asimilarity_search_with_relevance_scores(
|
||||
self, query: str, k: int = 4, **kwargs: Any
|
||||
) -> List[Tuple[Document, float]]:
|
||||
"""Return docs most similar to query."""
|
||||
|
||||
func = partial(self.similarity_search_with_relevance_scores, query, k, **kwargs)
|
||||
return await asyncio.get_event_loop().run_in_executor(None, func)
|
||||
|
||||
async def asimilarity_search(
|
||||
self, query: str, k: int = 4, **kwargs: Any
|
||||
) -> List[Document]:
|
||||
"""Return docs most similar to query."""
|
||||
func = partial(self.similarity_search, query, k, **kwargs)
|
||||
return await asyncio.get_event_loop().run_in_executor(None, func)
|
||||
|
||||
def similarity_search_by_vector(
|
||||
self, embedding: List[float], k: int = 4, **kwargs: Any
|
||||
) -> List[Document]:
|
||||
"""Return docs most similar to embedding vector.
|
||||
|
||||
Args:
|
||||
embedding: Embedding to look up documents similar to.
|
||||
k: Number of Documents to return. Defaults to 4.
|
||||
|
||||
Returns:
|
||||
List of Documents most similar to the query vector.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
async def asimilarity_search_by_vector(
|
||||
self, embedding: List[float], k: int = 4, **kwargs: Any
|
||||
) -> List[Document]:
|
||||
"""Return docs most similar to embedding vector."""
|
||||
|
||||
# This is a temporary workaround to make the similarity search
|
||||
# asynchronous. The proper solution is to make the similarity search
|
||||
# asynchronous in the vector store implementations.
|
||||
func = partial(self.similarity_search_by_vector, embedding, k, **kwargs)
|
||||
return await asyncio.get_event_loop().run_in_executor(None, func)
|
||||
|
||||
def max_marginal_relevance_search(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
fetch_k: int = 20,
|
||||
lambda_mult: float = 0.5,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Return docs selected using the maximal marginal relevance.
|
||||
|
||||
Maximal marginal relevance optimizes for similarity to query AND diversity
|
||||
among selected documents.
|
||||
|
||||
Args:
|
||||
query: Text to look up documents similar to.
|
||||
k: Number of Documents to return. Defaults to 4.
|
||||
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
|
||||
lambda_mult: Number between 0 and 1 that determines the degree
|
||||
of diversity among the results with 0 corresponding
|
||||
to maximum diversity and 1 to minimum diversity.
|
||||
Defaults to 0.5.
|
||||
Returns:
|
||||
List of Documents selected by maximal marginal relevance.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
async def amax_marginal_relevance_search(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
fetch_k: int = 20,
|
||||
lambda_mult: float = 0.5,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Return docs selected using the maximal marginal relevance."""
|
||||
func = partial(
|
||||
self.max_marginal_relevance_search, query, k, fetch_k, lambda_mult, **kwargs
|
||||
)
|
||||
return await asyncio.get_event_loop().run_in_executor(None, func)
|
||||
|
||||
def max_marginal_relevance_search_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
fetch_k: int = 20,
|
||||
lambda_mult: float = 0.5,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Return docs selected using the maximal marginal relevance.
|
||||
|
||||
Maximal marginal relevance optimizes for similarity to query AND diversity
|
||||
among selected documents.
|
||||
|
||||
Args:
|
||||
embedding: Embedding to look up documents similar to.
|
||||
k: Number of Documents to return. Defaults to 4.
|
||||
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
|
||||
lambda_mult: Number between 0 and 1 that determines the degree
|
||||
of diversity among the results with 0 corresponding
|
||||
to maximum diversity and 1 to minimum diversity.
|
||||
Defaults to 0.5.
|
||||
Returns:
|
||||
List of Documents selected by maximal marginal relevance.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
async def amax_marginal_relevance_search_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
fetch_k: int = 20,
|
||||
lambda_mult: float = 0.5,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Return docs selected using the maximal marginal relevance."""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def from_documents(
|
||||
cls: Type[VST],
|
||||
documents: List[Document],
|
||||
embedding: Embeddings,
|
||||
**kwargs: Any,
|
||||
) -> VST:
|
||||
"""Return VectorStore initialized from documents and embeddings."""
|
||||
texts = [d.page_content for d in documents]
|
||||
metadatas = [d.metadata for d in documents]
|
||||
return cls.from_texts(texts, embedding, metadatas=metadatas, **kwargs)
|
||||
|
||||
@classmethod
|
||||
async def afrom_documents(
|
||||
cls: Type[VST],
|
||||
documents: List[Document],
|
||||
embedding: Embeddings,
|
||||
**kwargs: Any,
|
||||
) -> VST:
|
||||
"""Return VectorStore initialized from documents and embeddings."""
|
||||
texts = [d.page_content for d in documents]
|
||||
metadatas = [d.metadata for d in documents]
|
||||
return await cls.afrom_texts(texts, embedding, metadatas=metadatas, **kwargs)
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def from_texts(
|
||||
cls: Type[VST],
|
||||
texts: List[str],
|
||||
embedding: Embeddings,
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
**kwargs: Any,
|
||||
) -> VST:
|
||||
"""Return VectorStore initialized from texts and embeddings."""
|
||||
|
||||
@classmethod
|
||||
async def afrom_texts(
|
||||
cls: Type[VST],
|
||||
texts: List[str],
|
||||
embedding: Embeddings,
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
**kwargs: Any,
|
||||
) -> VST:
|
||||
"""Return VectorStore initialized from texts and embeddings."""
|
||||
raise NotImplementedError
|
||||
|
||||
def as_retriever(self, **kwargs: Any) -> VectorStoreRetriever:
|
||||
return VectorStoreRetriever(vectorstore=self, **kwargs)
|
||||
|
||||
|
||||
class VectorStoreRetriever(BaseRetriever):
|
||||
vectorstore: VectorStore
|
||||
search_type: str = "similarity"
|
||||
search_kwargs: dict = Field(default_factory=dict)
|
||||
allowed_search_types: ClassVar[Collection[str]] = (
|
||||
"similarity",
|
||||
"similarityatscore_threshold",
|
||||
"mmr",
|
||||
)
|
||||
|
||||
class Config:
|
||||
"""Configuration for this pydantic object."""
|
||||
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
@root_validator()
|
||||
def validate_search_type(cls, values: Dict) -> Dict:
|
||||
"""Validate search type."""
|
||||
search_type = values["search_type"]
|
||||
if search_type not in cls.allowed_search_types:
|
||||
raise ValueError(
|
||||
f"search_type of {search_type} not allowed. Valid values are: "
|
||||
f"{cls.allowed_search_types}"
|
||||
)
|
||||
if search_type == "similarity_score_threshold":
|
||||
score_threshold = values["search_kwargs"].get("score_threshold")
|
||||
if (score_threshold is None) or (not isinstance(score_threshold, float)):
|
||||
raise ValueError(
|
||||
"`score_threshold` is not specified with a float value(0~1) "
|
||||
"in `search_kwargs`."
|
||||
)
|
||||
return values
|
||||
|
||||
def _get_relevant_documents(
|
||||
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
|
||||
) -> List[Document]:
|
||||
if self.search_type == "similarity":
|
||||
docs = self.vectorstore.similarity_search(query, **self.search_kwargs)
|
||||
elif self.search_type == "similarity_score_threshold":
|
||||
docs_and_similarities = (
|
||||
self.vectorstore.similarity_search_with_relevance_scores(
|
||||
query, **self.search_kwargs
|
||||
)
|
||||
)
|
||||
docs = [doc for doc, _ in docs_and_similarities]
|
||||
elif self.search_type == "mmr":
|
||||
docs = self.vectorstore.max_marginal_relevance_search(
|
||||
query, **self.search_kwargs
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"search_type of {self.search_type} not allowed.")
|
||||
return docs
|
||||
|
||||
async def _aget_relevant_documents(
|
||||
self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun
|
||||
) -> List[Document]:
|
||||
if self.search_type == "similarity":
|
||||
docs = await self.vectorstore.asimilarity_search(
|
||||
query, **self.search_kwargs
|
||||
)
|
||||
elif self.search_type == "similarity_score_threshold":
|
||||
docs_and_similarities = (
|
||||
await self.vectorstore.asimilarity_search_with_relevance_scores(
|
||||
query, **self.search_kwargs
|
||||
)
|
||||
)
|
||||
docs = [doc for doc, _ in docs_and_similarities]
|
||||
elif self.search_type == "mmr":
|
||||
docs = await self.vectorstore.amax_marginal_relevance_search(
|
||||
query, **self.search_kwargs
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"search_type of {self.search_type} not allowed.")
|
||||
return docs
|
||||
|
||||
def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]:
|
||||
"""Add documents to vectorstore."""
|
||||
return self.vectorstore.add_documents(documents, **kwargs)
|
||||
|
||||
async def aadd_documents(
|
||||
self, documents: List[Document], **kwargs: Any
|
||||
) -> List[str]:
|
||||
"""Add documents to vectorstore."""
|
||||
return await self.vectorstore.aadd_documents(documents, **kwargs)
|
@ -1,176 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from langchain.memory.utils import get_prompt_input_key
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from swarms.models.prompts.base import AIMessage, BaseMessage, HumanMessage
|
||||
from swarms.utils.serializable import Serializable
|
||||
|
||||
|
||||
class BaseMemory(Serializable, ABC):
|
||||
"""Abstract base class for memory in Chains.
|
||||
|
||||
Memory refers to state in Chains. Memory can be used to store information about
|
||||
past executions of a Chain and inject that information into the inputs of
|
||||
future executions of the Chain. For example, for conversational Chains Memory
|
||||
can be used to store conversations and automatically add them to future model
|
||||
prompts so that the model has the necessary context to respond coherently to
|
||||
the latest input.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
class SimpleMemory(BaseMemory):
|
||||
memories: Dict[str, Any] = dict()
|
||||
|
||||
@property
|
||||
def memory_variables(self) -> List[str]:
|
||||
return list(self.memories.keys())
|
||||
|
||||
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, str]:
|
||||
return self.memories
|
||||
|
||||
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
|
||||
pass
|
||||
|
||||
def clear(self) -> None:
|
||||
pass
|
||||
""" # noqa: E501
|
||||
|
||||
class Config:
|
||||
"""Configuration for this pydantic object."""
|
||||
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def memory_variables(self) -> List[str]:
|
||||
"""The string keys this memory class will add to chain inputs."""
|
||||
|
||||
@abstractmethod
|
||||
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Return key-value pairs given the text input to the chain."""
|
||||
|
||||
@abstractmethod
|
||||
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
|
||||
"""Save the context of this chain run to memory."""
|
||||
|
||||
@abstractmethod
|
||||
def clear(self) -> None:
|
||||
"""Clear memory contents."""
|
||||
|
||||
|
||||
|
||||
|
||||
class BaseChatMessageHistory(ABC):
|
||||
"""Abstract base class for storing chat message history.
|
||||
|
||||
See `ChatMessageHistory` for default implementation.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
class FileChatMessageHistory(BaseChatMessageHistory):
|
||||
storage_path: str
|
||||
session_id: str
|
||||
|
||||
@property
|
||||
def messages(self):
|
||||
with open(os.path.join(storage_path, session_id), 'r:utf-8') as f:
|
||||
messages = json.loads(f.read())
|
||||
return messages_from_dict(messages)
|
||||
|
||||
def add_message(self, message: BaseMessage) -> None:
|
||||
messages = self.messages.append(_message_to_dict(message))
|
||||
with open(os.path.join(storage_path, session_id), 'w') as f:
|
||||
json.dump(f, messages)
|
||||
|
||||
def clear(self):
|
||||
with open(os.path.join(storage_path, session_id), 'w') as f:
|
||||
f.write("[]")
|
||||
"""
|
||||
|
||||
messages: List[BaseMessage]
|
||||
"""A list of Messages stored in-memory."""
|
||||
|
||||
def add_user_message(self, message: str) -> None:
|
||||
"""Convenience method for adding a human message string to the store.
|
||||
|
||||
Args:
|
||||
message: The string contents of a human message.
|
||||
"""
|
||||
self.add_message(HumanMessage(content=message))
|
||||
|
||||
def add_ai_message(self, message: str) -> None:
|
||||
"""Convenience method for adding an AI message string to the store.
|
||||
|
||||
Args:
|
||||
message: The string contents of an AI message.
|
||||
"""
|
||||
self.add_message(AIMessage(content=message))
|
||||
|
||||
# TODO: Make this an abstractmethod.
|
||||
def add_message(self, message: BaseMessage) -> None:
|
||||
"""Add a Message object to the store.
|
||||
|
||||
Args:
|
||||
message: A BaseMessage object to store.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def clear(self) -> None:
|
||||
"""Remove all messages from the store"""
|
||||
|
||||
|
||||
class ChatMessageHistory(BaseChatMessageHistory, BaseModel):
|
||||
"""In memory implementation of chat message history.
|
||||
|
||||
Stores messages in an in memory list.
|
||||
"""
|
||||
|
||||
messages: List[BaseMessage] = []
|
||||
|
||||
def add_message(self, message: BaseMessage) -> None:
|
||||
"""Add a self-created message to the store"""
|
||||
self.messages.append(message)
|
||||
|
||||
def clear(self) -> None:
|
||||
self.messages = []
|
||||
|
||||
class BaseChatMemory(BaseMemory, ABC):
|
||||
"""Abstract base class for chat memory."""
|
||||
|
||||
chat_memory: BaseChatMessageHistory = Field(default_factory=ChatMessageHistory)
|
||||
output_key: Optional[str] = None
|
||||
input_key: Optional[str] = None
|
||||
return_messages: bool = False
|
||||
|
||||
def _get_input_output(
|
||||
self, inputs: Dict[str, Any], outputs: Dict[str, str]
|
||||
) -> Tuple[str, str]:
|
||||
if self.input_key is None:
|
||||
prompt_input_key = get_prompt_input_key(inputs, self.memory_variables)
|
||||
else:
|
||||
prompt_input_key = self.input_key
|
||||
if self.output_key is None:
|
||||
if len(outputs) != 1:
|
||||
raise ValueError(f"One output key expected, got {outputs.keys()}")
|
||||
output_key = list(outputs.keys())[0]
|
||||
else:
|
||||
output_key = self.output_key
|
||||
return inputs[prompt_input_key], outputs[output_key]
|
||||
|
||||
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
|
||||
"""Save context from this conversation to buffer."""
|
||||
input_str, output_str = self._get_input_output(inputs, outputs)
|
||||
self.chat_memory.add_user_message(input_str)
|
||||
self.chat_memory.add_ai_message(output_str)
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Clear memory contents."""
|
||||
self.chat_memory.clear()
|
||||
|
@ -1,81 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Sequence
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from swarms.utils.serializable import Serializable
|
||||
|
||||
class Document(Serializable):
|
||||
"""Class for storing a piece of text and associated metadata."""
|
||||
|
||||
page_content: str
|
||||
"""String text."""
|
||||
metadata: dict = Field(default_factory=dict)
|
||||
"""Arbitrary metadata about the page content (e.g., source, relationships to other
|
||||
documents, etc.).
|
||||
"""
|
||||
|
||||
|
||||
class BaseDocumentTransformer(ABC):
|
||||
"""Abstract base class for document transformation systems.
|
||||
|
||||
A document transformation system takes a sequence of Documents and returns a
|
||||
sequence of transformed Documents.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
class EmbeddingsRedundantFilter(BaseDocumentTransformer, BaseModel):
|
||||
embeddings: Embeddings
|
||||
similarity_fn: Callable = cosine_similarity
|
||||
similarity_threshold: float = 0.95
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
def transform_documents(
|
||||
self, documents: Sequence[Document], **kwargs: Any
|
||||
) -> Sequence[Document]:
|
||||
stateful_documents = get_stateful_documents(documents)
|
||||
embedded_documents = _get_embeddings_from_stateful_docs(
|
||||
self.embeddings, stateful_documents
|
||||
)
|
||||
included_idxs = _filter_similar_embeddings(
|
||||
embedded_documents, self.similarity_fn, self.similarity_threshold
|
||||
)
|
||||
return [stateful_documents[i] for i in sorted(included_idxs)]
|
||||
|
||||
async def atransform_documents(
|
||||
self, documents: Sequence[Document], **kwargs: Any
|
||||
) -> Sequence[Document]:
|
||||
raise NotImplementedError
|
||||
|
||||
""" # noqa: E501
|
||||
|
||||
@abstractmethod
|
||||
def transform_documents(
|
||||
self, documents: Sequence[Document], **kwargs: Any
|
||||
) -> Sequence[Document]:
|
||||
"""Transform a list of documents.
|
||||
|
||||
Args:
|
||||
documents: A sequence of Documents to be transformed.
|
||||
|
||||
Returns:
|
||||
A list of transformed Documents.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def atransform_documents(
|
||||
self, documents: Sequence[Document], **kwargs: Any
|
||||
) -> Sequence[Document]:
|
||||
"""Asynchronously transform a list of documents.
|
||||
|
||||
Args:
|
||||
documents: A sequence of Documents to be transformed.
|
||||
|
||||
Returns:
|
||||
A list of transformed Documents.
|
||||
"""
|
@ -1,23 +0,0 @@
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from swarms.memory.base import get_buffer_string
|
||||
|
||||
|
||||
|
||||
def get_prompt_input_key(inputs: Dict[str, Any], memory_variables: List[str]) -> str:
|
||||
"""
|
||||
Get the prompt input key.
|
||||
|
||||
Args:
|
||||
inputs: Dict[str, Any]
|
||||
memory_variables: List[str]
|
||||
|
||||
Returns:
|
||||
A prompt input key.
|
||||
"""
|
||||
# "stop" is a special key that can be passed as input but is not used to
|
||||
# format the prompt.
|
||||
prompt_input_keys = list(set(inputs).difference(memory_variables + ["stop"]))
|
||||
if len(prompt_input_keys) != 1:
|
||||
raise ValueError(f"One input key expected got {prompt_input_keys}")
|
||||
return prompt_input_keys[0]
|
@ -1,128 +0,0 @@
|
||||
#aug 10
|
||||
#Vortex is the name of my Duck pet, ILY Vortex
|
||||
#Kye
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, List, Optional, Union
|
||||
|
||||
import faiss
|
||||
from langchain.agents import Tool
|
||||
from langchain.chat_models import ChatOpenAI
|
||||
from langchain.docstore import InMemoryDocstore
|
||||
from langchain.embeddings import OpenAIEmbeddings
|
||||
from langchain.vectorstores import FAISS
|
||||
from langchain_experimental.autonomous_agents import AutoGPT
|
||||
|
||||
from swarms.tools.autogpt import (
|
||||
FileChatMessageHistory,
|
||||
ReadFileTool,
|
||||
WebpageQATool,
|
||||
WriteFileTool,
|
||||
load_qa_with_sources_chain,
|
||||
process_csv,
|
||||
web_search,
|
||||
)
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
ROOT_DIR = "./data/"
|
||||
|
||||
class VortexWorkerAgent:
|
||||
"""An autonomous agent instance that accomplishes various language tasks like summarization, text generation of any kind, data analysis, websearch and much more"""
|
||||
|
||||
def __init__(self,
|
||||
openai_api_key: str,
|
||||
llm: Optional[Union[InMemoryDocstore, ChatOpenAI]] = None,
|
||||
tools: Optional[Any] = None,
|
||||
embedding_size: Optional[int] = 8192,
|
||||
worker_name: Optional[str] = "Vortex Worker Agent",
|
||||
worker_role: Optional[str] = "Assistant",
|
||||
human_in_the_loop: Optional[bool] = False,
|
||||
search_kwargs: dict = {},
|
||||
verbose: Optional[bool] = False,
|
||||
chat_history_file: str = "chat_history.text"):
|
||||
if not openai_api_key:
|
||||
raise ValueError("openai_api_key cannot be None, try placing in ENV")
|
||||
|
||||
self.openai_api_key = openai_api_key
|
||||
self.worker_name = worker_name
|
||||
self.worker_role = worker_role
|
||||
|
||||
self.embedding_size = embedding_size
|
||||
self.human_in_the_loop = human_in_the_loop
|
||||
self.search_kwargs = search_kwargs
|
||||
|
||||
self.verbose = verbose
|
||||
self.chat_history_file = chat_history_file
|
||||
self.llm = llm or self.init_llm(ChatOpenAI)
|
||||
|
||||
self.tools = tools or self.init_tools()
|
||||
self.vectorstore = self.init_vectorstore()
|
||||
self.agent = self.create_agent()
|
||||
|
||||
def init_llm(self, llm_class, temperature=1.0):
|
||||
try:
|
||||
return llm_class(openai_api_key=self.openai_api_key, temperature=temperature)
|
||||
except Exception:
|
||||
logging.error("Failed to init the language model, make sure the llm function matches the llm abstract type")
|
||||
raise
|
||||
|
||||
def init_tools(self):
|
||||
try:
|
||||
logging.info("Initializing tools for VortexWorkerAgent")
|
||||
tools = [
|
||||
web_search,
|
||||
WriteFileTool,
|
||||
ReadFileTool,
|
||||
process_csv,
|
||||
WebpageQATool(qa_chain=load_qa_with_sources_chain(self.llm))
|
||||
]
|
||||
return tools
|
||||
except Exception as error:
|
||||
logging.error(f"Failed to initialize tools: {error}")
|
||||
raise
|
||||
|
||||
def init_vectorstore(self):
|
||||
try:
|
||||
openai_api_key = self.openai_api_key or os.getenv("OPENAI_API_KEY")
|
||||
embeddings_model = OpenAIEmbeddings(openai_api_key=openai_api_key)
|
||||
index = faiss.IndexFlatL2(8192)
|
||||
return FAISS(embeddings_model, index, InMemoryDocstore({}), {})
|
||||
except Exception as error:
|
||||
logging.error(f"Failed to initialize vector store: {error}")
|
||||
raise
|
||||
|
||||
def create_agent(self):
|
||||
logging.info("Creating agent in VortexWorkerAgent")
|
||||
try:
|
||||
AutoGPT.from_llm_and_tools(
|
||||
ai_name=self.worker_name,
|
||||
ai_role=self.worker_role,
|
||||
tools=self.tools,
|
||||
llm=self.llm,
|
||||
memory=self.vectorstore,
|
||||
human_in_the_loop=self.human_in_the_loop,
|
||||
chat_history_memory=FileChatMessageHistory(self.chat_history_file)
|
||||
)
|
||||
except Exception as error:
|
||||
logging.error(f"Failed while creating agent {str(error)}")
|
||||
raise error
|
||||
|
||||
def add_tool(self, tool: Tool):
|
||||
if not isinstance(tool, Tool):
|
||||
logging.error("Tools must be an instant of Tool")
|
||||
raise TypeError("Tool must be an instance of Tool, try wrapping your tool with the Tool decorator and fill in the requirements")
|
||||
self.tools.append(tool)
|
||||
|
||||
def run(self, prompt) -> str:
|
||||
if not isinstance(prompt, str) or not prompt:
|
||||
raise ValueError("Prompt must be a non empty string")
|
||||
try:
|
||||
self.agent.run([prompt])
|
||||
return "Task completed by VortexWorkerAgent"
|
||||
except Exception as error:
|
||||
logging.error(f"While running the agent: {str(error)}")
|
||||
raise error
|
||||
|
||||
|
@ -1,172 +0,0 @@
|
||||
import logging
|
||||
import os
|
||||
from typing import Dict, List
|
||||
|
||||
from langchain.memory.chat_message_histories import FileChatMessageHistory
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
|
||||
|
||||
|
||||
from typing import Dict, List
|
||||
|
||||
from langchain.memory.chat_message_histories import FileChatMessageHistory
|
||||
|
||||
from swarms.tools.main import (
|
||||
BaseToolSet,
|
||||
CodeEditor,
|
||||
ExitConversation,
|
||||
RequestsGet,
|
||||
Terminal,
|
||||
)
|
||||
from swarms.utils.main import BaseHandler, CsvToDataframe, FileType
|
||||
|
||||
|
||||
class WorkerUltraNode:
|
||||
"""Useful for when you need to spawn an autonomous agent instance as a worker to accomplish complex tasks, it can search the internet or spawn child multi-modality models to process and generate images and text or audio and so on"""
|
||||
|
||||
def __init__(self, llm, toolsets, vectorstore):
|
||||
if not llm or not toolsets or not vectorstore:
|
||||
logging.error("llm, toolsets, and vectorstore cannot be None.")
|
||||
raise ValueError("llm, toolsets, and vectorstore cannot be None.")
|
||||
|
||||
self.llm = llm
|
||||
self.toolsets = toolsets
|
||||
self.vectorstore = vectorstore
|
||||
self.agent = None
|
||||
|
||||
def create_agent(self, ai_name="Swarm Worker AI Assistant", ai_role="Assistant", human_in_the_loop=False, search_kwargs={}, verbose=False):
|
||||
logging.info("Creating agent in WorkerNode")
|
||||
try:
|
||||
tools_list = list(self.toolsets.values())
|
||||
self.agent = AutoGPT.from_llm_and_tools(
|
||||
ai_name=ai_name,
|
||||
ai_role=ai_role,
|
||||
tools=tools_list, # Pass the dictionary instead of the list
|
||||
llm=self.llm,
|
||||
memory=self.vectorstore.as_retriever(search_kwargs=search_kwargs),
|
||||
human_in_the_loop=human_in_the_loop,
|
||||
chat_history_memory=FileChatMessageHistory("chat_history.txt"),
|
||||
)
|
||||
self.agent.chain.verbose = verbose
|
||||
except Exception as e:
|
||||
logging.error(f"Error while creating agent: {str(e)}")
|
||||
raise e
|
||||
|
||||
def add_toolset(self, toolset: BaseToolSet):
|
||||
if not isinstance(toolset, BaseToolSet):
|
||||
logging.error("Toolset must be an instance of BaseToolSet.")
|
||||
raise TypeError("Toolset must be an instance of BaseToolSet.")
|
||||
|
||||
self.toolsets.append(toolset)
|
||||
|
||||
def run(self, prompt: str) -> str:
|
||||
if not isinstance(prompt, str):
|
||||
logging.error("Prompt must be a string.")
|
||||
raise TypeError("Prompt must be a string.")
|
||||
|
||||
if not prompt:
|
||||
logging.error("Prompt is empty.")
|
||||
raise ValueError("Prompt is empty.")
|
||||
|
||||
try:
|
||||
self.agent.run([f"{prompt}"])
|
||||
return "Task completed by WorkerNode"
|
||||
except Exception as e:
|
||||
logging.error(f"While running the agent: {str(e)}")
|
||||
raise e
|
||||
|
||||
class WorkerUltraNodeInitializer:
|
||||
def __init__(self, openai_api_key):
|
||||
if not openai_api_key:
|
||||
logging.error("OpenAI API key is not provided")
|
||||
raise ValueError("openai_api_key cannot be None")
|
||||
|
||||
self.openai_api_key = openai_api_key
|
||||
|
||||
def initialize_llm(self, llm_class, temperature=0.5):
|
||||
if not llm_class:
|
||||
logging.error("llm_class cannot be none")
|
||||
raise ValueError("llm_class cannot be None")
|
||||
|
||||
try:
|
||||
return llm_class(openai_api_key=self.openai_api_key, temperature=temperature)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to initialize language model: {e}")
|
||||
raise
|
||||
|
||||
def initialize_toolsets(self):
|
||||
try:
|
||||
toolsets: List[BaseToolSet] = [
|
||||
Terminal(),
|
||||
CodeEditor(),
|
||||
RequestsGet(),
|
||||
ExitConversation(),
|
||||
]
|
||||
handlers: Dict[FileType, BaseHandler] = {FileType.DATAFRAME: CsvToDataframe()}
|
||||
|
||||
if os.environ.get("USE_GPU", False):
|
||||
import torch
|
||||
|
||||
from swarms.tools.main import (
|
||||
ImageCaptioning,
|
||||
ImageEditing,
|
||||
InstructPix2Pix,
|
||||
Text2Image,
|
||||
VisualQuestionAnswering,
|
||||
)
|
||||
|
||||
if torch.cuda.is_available():
|
||||
toolsets.extend(
|
||||
[
|
||||
Text2Image("cuda"),
|
||||
ImageEditing("cuda"),
|
||||
InstructPix2Pix("cuda"),
|
||||
VisualQuestionAnswering("cuda"),
|
||||
]
|
||||
)
|
||||
handlers[FileType.IMAGE] = ImageCaptioning("cuda")
|
||||
|
||||
return toolsets
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to initialize toolsets: {e}")
|
||||
|
||||
def initialize_vectorstore(self):
|
||||
try:
|
||||
|
||||
embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
|
||||
embedding_size = 1536
|
||||
index = faiss.IndexFlatL2(embedding_size)
|
||||
return FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to initialize vector store: {e}")
|
||||
raise
|
||||
|
||||
def create_worker_node(self, llm_class=ChatOpenAI, ai_name="Swarm Worker AI Assistant", ai_role="Assistant", human_in_the_loop=False, search_kwargs={}, verbose=False):
|
||||
if not llm_class:
|
||||
logging.error("llm_class cannot be None.")
|
||||
raise ValueError("llm_class cannot be None.")
|
||||
try:
|
||||
worker_toolsets = self.initialize_toolsets()
|
||||
vectorstore = self.initialize_vectorstore()
|
||||
worker_node = WorkerUltraNode(llm=self.initialize_llm(llm_class), toolsets=worker_toolsets, vectorstore=vectorstore)
|
||||
worker_node.create_agent(ai_name=ai_name, ai_role=ai_role, human_in_the_loop=human_in_the_loop, search_kwargs=search_kwargs, verbose=verbose)
|
||||
return worker_node
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to create worker node: {e}")
|
||||
raise
|
||||
|
||||
def worker_ultra_node(openai_api_key):
|
||||
if not openai_api_key:
|
||||
logging.error("OpenAI API key is not provided")
|
||||
raise ValueError("OpenAI API key is required")
|
||||
|
||||
try:
|
||||
initializer = WorkerUltraNodeInitializer(openai_api_key)
|
||||
worker_node = initializer.create_worker_node()
|
||||
return worker_node
|
||||
except Exception as e:
|
||||
logging.error(f"An error occurred in worker_node: {e}")
|
||||
raise
|
@ -1,276 +0,0 @@
|
||||
import logging
|
||||
from typing import Any, List, Optional, Union
|
||||
|
||||
import faiss
|
||||
from langchain.agents import Tool
|
||||
from langchain.chat_models import ChatOpenAI
|
||||
from langchain.docstore import InMemoryDocstore
|
||||
from langchain.embeddings import OpenAIEmbeddings
|
||||
from langchain.vectorstores import FAISS
|
||||
from langchain_experimental.autonomous_agents import AutoGPT
|
||||
|
||||
from swarms.tools.autogpt import (
|
||||
DuckDuckGoSearchRun,
|
||||
FileChatMessageHistory,
|
||||
ReadFileTool,
|
||||
WebpageQATool,
|
||||
WriteFileTool,
|
||||
load_qa_with_sources_chain,
|
||||
process_csv,
|
||||
# web_search,
|
||||
)
|
||||
# from swarms.tools.developer import (
|
||||
# code_editor_append,
|
||||
# code_editor_delete,
|
||||
# code_editor_patch,
|
||||
# code_editor_read,
|
||||
# code_editor_summary,
|
||||
# code_editor_write,
|
||||
# terminal_execute,
|
||||
# )
|
||||
|
||||
ROOT_DIR = "./data/"
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
|
||||
class WorkerNodeInitializer:
|
||||
"""Useful for when you need to spawn an autonomous agent instance as a worker to accomplish complex tasks, it can search the internet or spawn child multi-modality models to process and generate images and text or audio and so on"""
|
||||
def __init__(self,
|
||||
openai_api_key: str,
|
||||
llm: Optional[Union[InMemoryDocstore, ChatOpenAI]] = None,
|
||||
tools: Optional[List[Any]] = None,
|
||||
embedding_size: Optional[int] = 8192,
|
||||
worker_name: Optional[str] = "Swarm Worker AI Assistant",
|
||||
worker_role: Optional[str] = "Assistant",
|
||||
human_in_the_loop: Optional[bool] = False,
|
||||
search_kwargs: dict = {},
|
||||
verbose: Optional[bool] = False,
|
||||
chat_history_file: str = "chat_history.txt"):
|
||||
self.openai_api_key = openai_api_key
|
||||
self.llm = llm if llm is not None else ChatOpenAI()
|
||||
self.tools = tools if tools is not None else [ReadFileTool(), WriteFileTool()]
|
||||
# self.vectorstore = vectorstore
|
||||
|
||||
self.worker_name = worker_name
|
||||
self.worker_role = worker_role
|
||||
self.embedding_size = 8192
|
||||
self.human_in_the_loop = human_in_the_loop
|
||||
self.search_kwargs = search_kwargs
|
||||
self.verbose = verbose
|
||||
self.chat_history_file = chat_history_file
|
||||
|
||||
self.create_agent()
|
||||
|
||||
def create_agent(self):
|
||||
|
||||
logging.info("Creating agent in WorkerNode")
|
||||
try:
|
||||
vectorstore = self.initialize_vectorstore()
|
||||
|
||||
self.agent = AutoGPT.from_llm_and_tools(
|
||||
ai_name=self.worker_name,
|
||||
ai_role=self.worker_role,
|
||||
tools=self.tools,
|
||||
llm=self.llm,
|
||||
memory=vectorstore,
|
||||
human_in_the_loop=self.human_in_the_loop,
|
||||
chat_history_memory=FileChatMessageHistory(self.chat_history_file),
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Error while creating agent: {str(e)}")
|
||||
raise e
|
||||
|
||||
def add_tool(self, tool: Optional[Tool] = None):
|
||||
if tool is None:
|
||||
tool = DuckDuckGoSearchRun()
|
||||
|
||||
if not isinstance(tool, Tool):
|
||||
logging.error("Tool must be an instance of Tool.")
|
||||
raise TypeError("Tool must be an instance of Tool.")
|
||||
|
||||
self.tools.append(tool)
|
||||
|
||||
def initialize_vectorstore(self):
|
||||
try:
|
||||
embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
|
||||
embedding_size = self.embedding_size
|
||||
index = faiss.IndexFlatL2(embedding_size=embedding_size)
|
||||
return FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to initialize vector store: {e}")
|
||||
return None
|
||||
|
||||
def run(self, prompt) -> str:
|
||||
if not isinstance(prompt, str):
|
||||
logging.error("Prompt must be a string.")
|
||||
raise TypeError("Prompt must be a string.")
|
||||
|
||||
if not prompt:
|
||||
logging.error("Prompt is empty.")
|
||||
raise ValueError("Prompt is empty.")
|
||||
|
||||
try:
|
||||
self.agent.run([f"{prompt}"])
|
||||
return "Task completed by WorkerNode"
|
||||
except Exception as e:
|
||||
logging.error(f"While running the agent: {str(e)}")
|
||||
raise e
|
||||
|
||||
|
||||
|
||||
class WorkerNode:
|
||||
def __init__(self,
|
||||
openai_api_key: str,
|
||||
temperature: Optional[int] = None,
|
||||
llm: Optional[Union[InMemoryDocstore, ChatOpenAI]] = None,
|
||||
tools: Optional[List[Any]] = None,
|
||||
embedding_size: Optional[int] = 8192,
|
||||
worker_name: Optional[str] = "Swarm Worker AI Assistant",
|
||||
worker_role: Optional[str] = "Assistant",
|
||||
human_in_the_loop: Optional[bool] = False,
|
||||
search_kwargs: dict = {},
|
||||
verbose: Optional[bool] = False,
|
||||
chat_history_file: str = "chat_history.txt"):
|
||||
|
||||
if not openai_api_key:
|
||||
raise ValueError("openai_api_key cannot be None")
|
||||
|
||||
self.openai_api_key = openai_api_key
|
||||
self.llm = llm if llm is not None else ChatOpenAI()
|
||||
self.tools = tools if tools is not None else [ReadFileTool(), WriteFileTool()]
|
||||
self.embedding_size = embedding_size
|
||||
self.worker_name = worker_name
|
||||
self.worker_role = worker_role
|
||||
self.human_in_the_loop = human_in_the_loop
|
||||
self.search_kwargs = search_kwargs
|
||||
self.verbose = verbose
|
||||
self.chat_history_file = chat_history_file
|
||||
self.temperature = temperature
|
||||
self.description = "A worker node that executes tasks"
|
||||
self.create_agent()
|
||||
|
||||
def create_agent(self):
|
||||
logging.info("Creating agent in WorkerNode")
|
||||
try:
|
||||
vectorstore = self.initialize_vectorstore()
|
||||
|
||||
self.agent = AutoGPT.from_llm_and_tools(
|
||||
ai_name=self.worker_name,
|
||||
ai_role=self.worker_role,
|
||||
tools=self.tools,
|
||||
llm=self.llm,
|
||||
memory=vectorstore,
|
||||
human_in_the_loop=self.human_in_the_loop,
|
||||
chat_history_memory=FileChatMessageHistory(self.chat_history_file),
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Error while creating agent: {str(e)}")
|
||||
raise e
|
||||
|
||||
def add_tool(self, tool: Optional[Any] = None):
|
||||
if tool is None:
|
||||
tool = DuckDuckGoSearchRun()
|
||||
|
||||
if not isinstance(tool, Tool):
|
||||
logging.error("Tool must be an instance of Tool.")
|
||||
raise TypeError("Tool must be an instance of Tool.")
|
||||
|
||||
self.tools.append(tool)
|
||||
|
||||
def initialize_vectorstore(self):
|
||||
try:
|
||||
embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
|
||||
# embedding_size = self.embedding_size
|
||||
index = faiss.IndexFlatL2(self.embedding_size)
|
||||
return FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to initialize vector store: {e}")
|
||||
return None
|
||||
|
||||
def run(self, prompt) -> str:
|
||||
if not isinstance(prompt, str):
|
||||
logging.error("Prompt must be a string.")
|
||||
raise TypeError("Prompt must be a string.")
|
||||
|
||||
if not prompt:
|
||||
logging.error("Prompt is empty.")
|
||||
raise ValueError("Prompt is empty.")
|
||||
|
||||
try:
|
||||
self.agent.run([f"{prompt}"])
|
||||
return "Task completed by WorkerNode"
|
||||
except Exception as e:
|
||||
logging.error(f"While running the agent: {str(e)}")
|
||||
raise e
|
||||
|
||||
# Functions from WorkerNode
|
||||
def initialize_llm(self, llm_class, temperature):
|
||||
if not llm_class:
|
||||
logging.error("llm_class cannot be none")
|
||||
raise ValueError("llm_class cannot be None")
|
||||
|
||||
try:
|
||||
return llm_class(openai_api_key=self.openai_api_key, temperature=temperature)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to initialize language model: {e}")
|
||||
raise
|
||||
|
||||
def initialize_tools(self, llm_class):
|
||||
if not llm_class:
|
||||
logging.error("llm_class not cannot be none")
|
||||
raise ValueError("llm_class cannot be none")
|
||||
try:
|
||||
logging.info('Creating WorkerNode')
|
||||
llm = self.initialize_llm(llm_class, self.temperature)
|
||||
|
||||
tools = [
|
||||
# web_search,
|
||||
WriteFileTool(root_dir=ROOT_DIR),
|
||||
ReadFileTool(root_dir=ROOT_DIR),
|
||||
process_csv,
|
||||
WebpageQATool(qa_chain=load_qa_with_sources_chain(llm)),
|
||||
]
|
||||
if not tools:
|
||||
logging.error("Tools are not initialized")
|
||||
raise ValueError("Tools are not initialized")
|
||||
return tools
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to initialize tools: {e}")
|
||||
|
||||
def create_worker_node(self, worker_name, worker_role, human_in_the_loop, llm_class=ChatOpenAI, search_kwargs={}, **kwargs):
|
||||
if not llm_class:
|
||||
logging.error("llm_class cannot be None.")
|
||||
raise ValueError("llm_class cannot be None.")
|
||||
try:
|
||||
worker_tools = self.initialize_tools(llm_class)
|
||||
vectorstore = self.initialize_vectorstore()
|
||||
worker_node = WorkerNode(
|
||||
openai_api_key=self.openai_api_key,
|
||||
llm=self.initialize_llm(llm_class, self.temperature),
|
||||
tools=worker_tools,
|
||||
vectorstore=vectorstore,
|
||||
ai_name=worker_name,
|
||||
ai_role=worker_role,
|
||||
human_in_the_loop=human_in_the_loop,
|
||||
search_kwargs=search_kwargs,
|
||||
)
|
||||
return worker_node
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to create worker node: {e}")
|
||||
raise
|
||||
|
||||
def worker_node(openai_api_key, objective):
|
||||
if not openai_api_key:
|
||||
logging.error("OpenAI API key is not provided")
|
||||
raise ValueError("OpenAI API key is required")
|
||||
|
||||
try:
|
||||
worker_node = WorkerNode(openai_api_key)
|
||||
# worker_node.create_worker_node()
|
||||
return worker_node.run(objective)
|
||||
except Exception as e:
|
||||
logging.error(f"An error occured in worker_node: {e}")
|
||||
raise
|
Loading…
Reference in new issue