swarms banner

Former-commit-id: 339f086b76
pull/47/head
Kye 1 year ago
parent ca55d478ae
commit e41c707fa9

@ -1,7 +1,7 @@
Introduction to Agents in Swarms Introduction to Agents in Swarms
================================ ================================
Welcome to the revolutionary world of Agents in Swarms. If you're familiar with my philosophy from the Linux world, you'll know that I'm a big believer in simplicity, modularity, and the power of open collaboration. The same principles apply here. Welcome to the revolutionary world of Agents in Swarms. I'm a big believer in simplicity, modularity, and the power of open collaboration. The same principles apply here.
Agents are the individual building blocks in a swarm. They are the worker bees, each with a specific task, but all working together towards a common goal. In our case, an agent is a combination of a Language Model (LLM), Long Term Memory, and Tools. Agents are the individual building blocks in a swarm. They are the worker bees, each with a specific task, but all working together towards a common goal. In our case, an agent is a combination of a Language Model (LLM), Long Term Memory, and Tools.

@ -1,9 +1,9 @@
<div align="center"> <div align="center">
# Swarms of Autonomous AI Agents 🤖 🤖 🤖 <!-- # Swarms 🤖 🤖 🤖 -->
<!-- ![Swarming banner icon](images/swarmsbannernew.png) --> ![Swarming banner icon](images/github-banner-swarms.png)
Introducing Swarms, automating all digital activities with multi-agent collaboration, get started in 30 seconds in a seamless onboarding experience. Introducing Swarms, automating all digital activities with multi-agent collaboration, get started in 30 seconds in a seamless onboarding experience.

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

@ -1,7 +1,7 @@
Introduction to Agents in Swarms Introduction to Agents in Swarms
================================ ================================
Welcome to the revolutionary world of Agents in Swarms. If you're familiar with my philosophy from the Linux world, you'll know that I'm a big believer in simplicity, modularity, and the power of open collaboration. The same principles apply here. Welcome to the revolutionary world of Agents in Swarms. I'm a big believer in simplicity, modularity, and the power of open collaboration. The same principles apply here.
Agents are the individual building blocks in a swarm. They are the worker bees, each with a specific task, but all working together towards a common goal. In our case, an agent is a combination of a Language Model (LLM), Long Term Memory, and Tools. Agents are the individual building blocks in a swarm. They are the worker bees, each with a specific task, but all working together towards a common goal. In our case, an agent is a combination of a Language Model (LLM), Long Term Memory, and Tools.

@ -3,15 +3,23 @@ from __future__ import annotations
from typing import List, Optional from typing import List, Optional
from langchain.chains.llm import LLMChain from langchain.chains.llm import LLMChain
from langchain.memory import ChatMessageHistory
from langchain.schema import BaseChatMessageHistory, Document
from langchain.vectorstores.base import VectorStoreRetriever
from pydantic import ValidationError from pydantic import ValidationError
from swarms.agents.memory.base import VectorStoreRetriever
from swarms.agents.memory.base_memory import BaseChatMessageHistory
from swarms.agents.memory.chat_message_history import ChatMessageHistory
from swarms.agents.memory.document import Document
from swarms.agents.models.base import AbstractModel from swarms.agents.models.base import AbstractModel
from swarms.agents.models.prompts.agent_output_parser import AgentOutputParser from swarms.agents.models.prompts.agent_output_parser import AgentOutputParser
from swarms.agents.models.prompts.agent_prompt import AIMessage, HumanMessage, SystemMessage from swarms.agents.models.prompts.agent_prompt import (
from swarms.agents.models.prompts.agent_prompt_auto import MessageFormatter, PromptConstructor AIMessage,
HumanMessage,
SystemMessage,
)
from swarms.agents.models.prompts.agent_prompt_auto import (
MessageFormatter,
PromptConstructor,
)
from swarms.agents.models.prompts.prompt_generator import FINISH_NAME from swarms.agents.models.prompts.prompt_generator import FINISH_NAME
from swarms.agents.tools.base import BaseTool from swarms.agents.tools.base import BaseTool
from swarms.agents.utils.Agent import AgentOutputParser from swarms.agents.utils.Agent import AgentOutputParser

@ -1,12 +1,10 @@
from typing import Any, Dict, List from typing import Any, Dict, List
from pydantic import Field from pydantic import Field
from langchain.memory.chat_memory import BaseChatMemory, get_prompt_input_key from swarms.agents.memory.base_memory import BaseChatMemory, get_prompt_input_key
from langchain.vectorstores.base import VectorStoreRetriever from swarms.agents.memory.base import VectorStoreRetriever
class AgentMemory(BaseChatMemory):
class AutoGPTMemory(BaseChatMemory):
retriever: VectorStoreRetriever = Field(exclude=True) retriever: VectorStoreRetriever = Field(exclude=True)
"""VectorStoreRetriever object to connect to.""" """VectorStoreRetriever object to connect to."""

@ -25,9 +25,12 @@ from langchain.callbacks.manager import (
AsyncCallbackManagerForRetrieverRun, AsyncCallbackManagerForRetrieverRun,
CallbackManagerForRetrieverRun, CallbackManagerForRetrieverRun,
) )
from langchain.docstore.document import Document
from langchain.embeddings.base import Embeddings from swarms.agents.memory.document import Document
from swarms.utils.embeddings.base import Embeddings
from langchain.schema import BaseRetriever from langchain.schema import BaseRetriever
from pydantic import Field, root_validator from pydantic import Field, root_validator
VST = TypeVar("VST", bound="VectorStore") VST = TypeVar("VST", bound="VectorStore")

@ -0,0 +1,167 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Dict, List
from abc import ABC
from typing import Any, Dict, Optional, Tuple
from pydantic import Field
from swarms.agents.models.prompts.base import AIMessage, BaseMessage, HumanMessage
from swarms.utils.serializable import Serializable
from swarms.agents.memory.chat_message_history import ChatMessageHistory
from langchain.memory.utils import get_prompt_input_key
class BaseMemory(Serializable, ABC):
"""Abstract base class for memory in Chains.
Memory refers to state in Chains. Memory can be used to store information about
past executions of a Chain and inject that information into the inputs of
future executions of the Chain. For example, for conversational Chains Memory
can be used to store conversations and automatically add them to future model
prompts so that the model has the necessary context to respond coherently to
the latest input.
Example:
.. code-block:: python
class SimpleMemory(BaseMemory):
memories: Dict[str, Any] = dict()
@property
def memory_variables(self) -> List[str]:
return list(self.memories.keys())
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, str]:
return self.memories
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
pass
def clear(self) -> None:
pass
""" # noqa: E501
class Config:
"""Configuration for this pydantic object."""
arbitrary_types_allowed = True
@property
@abstractmethod
def memory_variables(self) -> List[str]:
"""The string keys this memory class will add to chain inputs."""
@abstractmethod
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Return key-value pairs given the text input to the chain."""
@abstractmethod
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""Save the context of this chain run to memory."""
@abstractmethod
def clear(self) -> None:
"""Clear memory contents."""
class BaseChatMessageHistory(ABC):
"""Abstract base class for storing chat message history.
See `ChatMessageHistory` for default implementation.
Example:
.. code-block:: python
class FileChatMessageHistory(BaseChatMessageHistory):
storage_path: str
session_id: str
@property
def messages(self):
with open(os.path.join(storage_path, session_id), 'r:utf-8') as f:
messages = json.loads(f.read())
return messages_from_dict(messages)
def add_message(self, message: BaseMessage) -> None:
messages = self.messages.append(_message_to_dict(message))
with open(os.path.join(storage_path, session_id), 'w') as f:
json.dump(f, messages)
def clear(self):
with open(os.path.join(storage_path, session_id), 'w') as f:
f.write("[]")
"""
messages: List[BaseMessage]
"""A list of Messages stored in-memory."""
def add_user_message(self, message: str) -> None:
"""Convenience method for adding a human message string to the store.
Args:
message: The string contents of a human message.
"""
self.add_message(HumanMessage(content=message))
def add_ai_message(self, message: str) -> None:
"""Convenience method for adding an AI message string to the store.
Args:
message: The string contents of an AI message.
"""
self.add_message(AIMessage(content=message))
# TODO: Make this an abstractmethod.
def add_message(self, message: BaseMessage) -> None:
"""Add a Message object to the store.
Args:
message: A BaseMessage object to store.
"""
raise NotImplementedError
@abstractmethod
def clear(self) -> None:
"""Remove all messages from the store"""
class BaseChatMemory(BaseMemory, ABC):
"""Abstract base class for chat memory."""
chat_memory: BaseChatMessageHistory = Field(default_factory=ChatMessageHistory)
output_key: Optional[str] = None
input_key: Optional[str] = None
return_messages: bool = False
def _get_input_output(
self, inputs: Dict[str, Any], outputs: Dict[str, str]
) -> Tuple[str, str]:
if self.input_key is None:
prompt_input_key = get_prompt_input_key(inputs, self.memory_variables)
else:
prompt_input_key = self.input_key
if self.output_key is None:
if len(outputs) != 1:
raise ValueError(f"One output key expected, got {outputs.keys()}")
output_key = list(outputs.keys())[0]
else:
output_key = self.output_key
return inputs[prompt_input_key], outputs[output_key]
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""Save context from this conversation to buffer."""
input_str, output_str = self._get_input_output(inputs, outputs)
self.chat_memory.add_user_message(input_str)
self.chat_memory.add_ai_message(output_str)
def clear(self) -> None:
"""Clear memory contents."""
self.chat_memory.clear()

@ -0,0 +1,21 @@
from typing import List
from pydantic import BaseModel
from swarms.agents.memory.base_memory import BaseChatMessageHistory, BaseMessage
class ChatMessageHistory(BaseChatMessageHistory, BaseModel):
"""In memory implementation of chat message history.
Stores messages in an in memory list.
"""
messages: List[BaseMessage] = []
def add_message(self, message: BaseMessage) -> None:
"""Add a self-created message to the store"""
self.messages.append(message)
def clear(self) -> None:
self.messages = []

@ -0,0 +1,81 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Sequence
from pydantic import Field
from swarms.utils.serializable import Serializable
class Document(Serializable):
"""Class for storing a piece of text and associated metadata."""
page_content: str
"""String text."""
metadata: dict = Field(default_factory=dict)
"""Arbitrary metadata about the page content (e.g., source, relationships to other
documents, etc.).
"""
class BaseDocumentTransformer(ABC):
"""Abstract base class for document transformation systems.
A document transformation system takes a sequence of Documents and returns a
sequence of transformed Documents.
Example:
.. code-block:: python
class EmbeddingsRedundantFilter(BaseDocumentTransformer, BaseModel):
embeddings: Embeddings
similarity_fn: Callable = cosine_similarity
similarity_threshold: float = 0.95
class Config:
arbitrary_types_allowed = True
def transform_documents(
self, documents: Sequence[Document], **kwargs: Any
) -> Sequence[Document]:
stateful_documents = get_stateful_documents(documents)
embedded_documents = _get_embeddings_from_stateful_docs(
self.embeddings, stateful_documents
)
included_idxs = _filter_similar_embeddings(
embedded_documents, self.similarity_fn, self.similarity_threshold
)
return [stateful_documents[i] for i in sorted(included_idxs)]
async def atransform_documents(
self, documents: Sequence[Document], **kwargs: Any
) -> Sequence[Document]:
raise NotImplementedError
""" # noqa: E501
@abstractmethod
def transform_documents(
self, documents: Sequence[Document], **kwargs: Any
) -> Sequence[Document]:
"""Transform a list of documents.
Args:
documents: A sequence of Documents to be transformed.
Returns:
A list of transformed Documents.
"""
@abstractmethod
async def atransform_documents(
self, documents: Sequence[Document], **kwargs: Any
) -> Sequence[Document]:
"""Asynchronously transform a list of documents.
Args:
documents: A sequence of Documents to be transformed.
Returns:
A list of transformed Documents.
"""

@ -0,0 +1,23 @@
from typing import Any, Dict, List
from swarms.agents.memory.base import get_buffer_string
def get_prompt_input_key(inputs: Dict[str, Any], memory_variables: List[str]) -> str:
"""
Get the prompt input key.
Args:
inputs: Dict[str, Any]
memory_variables: List[str]
Returns:
A prompt input key.
"""
# "stop" is a special key that can be passed as input but is not used to
# format the prompt.
prompt_input_keys = list(set(inputs).difference(memory_variables + ["stop"]))
if len(prompt_input_keys) != 1:
raise ValueError(f"One input key expected got {prompt_input_keys}")
return prompt_input_keys[0]

@ -557,221 +557,3 @@ class ChatOpenAI(BaseChatModel):
#=================================
# from typing import (
# Any,
# Dict,
# List,
# Mapping,
# Optional,
# )
# import openai
# class ChatResult:
# """Wrapper for the result of the chat generation process."""
# def __init__(
# self,
# generations: List[ChatGeneration],
# llm_output: Optional[Mapping[str, Any]] = None,
# ):
# self.generations = generations
# self.llm_output = llm_output or {}
# class BaseMessage:
# """Base class for different types of messages."""
# def __init__(self, content: str):
# self.content = content
# class AIMessage(BaseMessage):
# """Message from the AI Assistant."""
# def __init__(self, content: str, additional_kwargs: Optional[Dict[str, Any]] = None):
# super().__init__(content)
# self.additional_kwargs = additional_kwargs or {}
# class HumanMessage(BaseMessage):
# """Message from the User."""
# pass
# class SystemMessage(BaseMessage):
# """System message."""
# pass
# class FunctionMessage(BaseMessage):
# """Function message."""
# def __init__(self, content: str, name: str):
# super().__init__(content)
# self.name = name
# class ChatGeneration:
# """Wrapper for the chat generation information."""
# def __init__(
# self, message: BaseMessage, generation_info: Optional[Mapping[str, Any]] = None
# ):
# self.message = message
# self.generation_info = generation_info or {}
# class ChatGenerationChunk:
# """Wrapper for a chunk of chat generation."""
# def __init__(self, message: BaseMessage):
# self.message = message
# def get_from_env_or_raise(var_name: str) -> str:
# value = os.getenv(var_name)
# if value is None:
# raise ValueError(f"Environment variable {var_name} is not set.")
# return value
# class OpenAI:
# """Wrapper around OpenAI Chat large language models.
# To use, you should have the ``openai`` python package installed, and the
# environment variable ``OPENAI_API_KEY`` set with your API key.
# Example:
# .. code-block:: python
# from langchain.chat_models import OpenAI
# openai = OpenAI(model_name="gpt-3.5-turbo")
# """
# def __init__(
# self,
# model_name: str = "gpt-3.5-turbo",
# temperature: float = 0.7,
# openai_api_key: Optional[str] = None,
# request_timeout: Optional[float] = None,
# max_retries: int = 6,
# ):
# self.model_name = model_name
# self.temperature = temperature
# self.openai_api_key = openai_api_key
# self.request_timeout = request_timeout
# self.max_retries = max_retries
# self._initialize_openai()
# def _initialize_openai(self):
# """Initialize the OpenAI client."""
# if self.openai_api_key is None:
# raise ValueError("OPENAI_API_KEY environment variable is not set.")
# openai.api_key = self.openai_api_key
# def _create_retry_decorator(self):
# """Create a decorator to handle API call retries."""
# errors = [
# openai.error.Timeout,
# openai.error.APIError,
# openai.error.APIConnectionError,
# openai.error.RateLimitError,
# openai.error.ServiceUnavailableError,
# ]
# def retry_decorator(func):
# @wraps(func)
# def wrapper(*args, **kwargs):
# for _ in range(self.max_retries):
# try:
# return func(*args, **kwargs)
# except tuple(errors):
# continue
# raise ValueError("Max retries reached. Unable to complete the API call.")
# return wrapper
# return retry_decorator
# def _create_message_dict(self, message: BaseMessage) -> Dict[str, Any]:
# """Convert a LangChain message to an OpenAI message dictionary."""
# role = message.role
# content = message.content
# message_dict = {"role": role, "content": content}
# if role == "assistant" and isinstance(message, AIMessage):
# message_dict["function_call"] = message.additional_kwargs.get("function_call", {})
# if role == "function" and isinstance(message, FunctionMessage):
# message_dict["name"] = message.name
# return message_dict
# def _create_message_dicts(self, messages: List[BaseMessage]) -> List[Dict[str, Any]]:
# """Convert a list of LangChain messages to a list of OpenAI message dictionaries."""
# return [self._create_message_dict(message) for message in messages]
# @retry_decorator
# def _openai_completion(self, messages: List[Dict[str, Any]], params: Dict[str, Any]) -> Any:
# """Call the OpenAI Chat Completion API."""
# response = openai.ChatCompletion.create(messages=messages, **params)
# return response
# def generate(
# self,
# messages: List[BaseMessage],
# stop: Optional[List[str]] = None,
# **kwargs: Any,
# ) -> ChatResult:
# """Generate a response using the OpenAI Chat model.
# Args:
# messages (List[BaseMessage]): List of messages in the conversation.
# stop (Optional[List[str]]): List of stop sequences to stop generation.
# Returns:
# ChatResult: The generated response wrapped in ChatResult object.
# """
# params = {
# "model": self.model_name,
# "temperature": self.temperature,
# "max_tokens": kwargs.get("max_tokens"),
# "stream": kwargs.get("streaming", False),
# "n": kwargs.get("n", 1),
# "request_timeout": kwargs.get("request_timeout", self.request_timeout),
# }
# messages_dicts = self._create_message_dicts(messages)
# response = self._openai_completion(messages_dicts, params)
# # Process the response and create ChatResult
# generations = []
# for choice in response["choices"]:
# message = self._convert_message(choice["message"])
# generation_info = {"finish_reason": choice.get("finish_reason")}
# generation = ChatGeneration(message=message, generation_info=generation_info)
# generations.append(generation)
# return ChatResult(generations=generations)
# def _convert_message(self, message_dict: Dict[str, Any]) -> BaseMessage:
# """Convert an OpenAI message dictionary to a LangChain message."""
# role = message_dict["role"]
# content = message_dict["content"]
# if role == "user":
# return HumanMessage(content=content)
# elif role == "assistant":
# additional_kwargs = message_dict.get("function_call", {})
# return AIMessage(content=content, additional_kwargs=additional_kwargs)
# elif role == "system":
# return SystemMessage(content=content)
# elif role == "function":
# name = message_dict.get("name", "")
# return FunctionMessage(content=content, name=name)
# else:
# raise ValueError(f"Invalid role found in the message: {role}")

@ -0,0 +1,256 @@
from __future__ import annotations
from abc import abstractmethod
from typing import TYPE_CHECKING, Any, Dict, List, Sequence
from pydantic import Field
from swarms.utils.serializable import Serializable
if TYPE_CHECKING:
from langchain.prompts.chat import ChatPromptTemplate
def get_buffer_string(
messages: Sequence[BaseMessage], human_prefix: str = "Human", ai_prefix: str = "AI"
) -> str:
"""Convert sequence of Messages to strings and concatenate them into one string.
Args:
messages: Messages to be converted to strings.
human_prefix: The prefix to prepend to contents of HumanMessages.
ai_prefix: THe prefix to prepend to contents of AIMessages.
Returns:
A single string concatenation of all input messages.
Example:
.. code-block:: python
from langchain.schema import AIMessage, HumanMessage
messages = [
HumanMessage(content="Hi, how are you?"),
AIMessage(content="Good, how are you?"),
]
get_buffer_string(messages)
# -> "Human: Hi, how are you?\nAI: Good, how are you?"
"""
string_messages = []
for m in messages:
if isinstance(m, HumanMessage):
role = human_prefix
elif isinstance(m, AIMessage):
role = ai_prefix
elif isinstance(m, SystemMessage):
role = "System"
elif isinstance(m, FunctionMessage):
role = "Function"
elif isinstance(m, ChatMessage):
role = m.role
else:
raise ValueError(f"Got unsupported message type: {m}")
message = f"{role}: {m.content}"
if isinstance(m, AIMessage) and "function_call" in m.additional_kwargs:
message += f"{m.additional_kwargs['function_call']}"
string_messages.append(message)
return "\n".join(string_messages)
class BaseMessage(Serializable):
"""The base abstract Message class.
Messages are the inputs and outputs of ChatModels.
"""
content: str
"""The string contents of the message."""
additional_kwargs: dict = Field(default_factory=dict)
"""Any additional information."""
@property
@abstractmethod
def type(self) -> str:
"""Type of the Message, used for serialization."""
@property
def lc_serializable(self) -> bool:
"""Whether this class is LangChain serializable."""
return True
def __add__(self, other: Any) -> ChatPromptTemplate:
from langchain.prompts.chat import ChatPromptTemplate
prompt = ChatPromptTemplate(messages=[self])
return prompt + other
class BaseMessageChunk(BaseMessage):
def _merge_kwargs_dict(
self, left: Dict[str, Any], right: Dict[str, Any]
) -> Dict[str, Any]:
"""Merge additional_kwargs from another BaseMessageChunk into this one."""
merged = left.copy()
for k, v in right.items():
if k not in merged:
merged[k] = v
elif type(merged[k]) != type(v):
raise ValueError(
f'additional_kwargs["{k}"] already exists in this message,'
" but with a different type."
)
elif isinstance(merged[k], str):
merged[k] += v
elif isinstance(merged[k], dict):
merged[k] = self._merge_kwargs_dict(merged[k], v)
else:
raise ValueError(
f"Additional kwargs key {k} already exists in this message."
)
return merged
def __add__(self, other: Any) -> BaseMessageChunk: # type: ignore
if isinstance(other, BaseMessageChunk):
# If both are (subclasses of) BaseMessageChunk,
# concat into a single BaseMessageChunk
return self.__class__(
content=self.content + other.content,
additional_kwargs=self._merge_kwargs_dict(
self.additional_kwargs, other.additional_kwargs
),
)
else:
raise TypeError(
'unsupported operand type(s) for +: "'
f"{self.__class__.__name__}"
f'" and "{other.__class__.__name__}"'
)
class HumanMessage(BaseMessage):
"""A Message from a human."""
example: bool = False
"""Whether this Message is being passed in to the model as part of an example
conversation.
"""
@property
def type(self) -> str:
"""Type of the message, used for serialization."""
return "human"
class HumanMessageChunk(HumanMessage, BaseMessageChunk):
pass
class AIMessage(BaseMessage):
"""A Message from an AI."""
example: bool = False
"""Whether this Message is being passed in to the model as part of an example
conversation.
"""
@property
def type(self) -> str:
"""Type of the message, used for serialization."""
return "ai"
class AIMessageChunk(AIMessage, BaseMessageChunk):
pass
class SystemMessage(BaseMessage):
"""A Message for priming AI behavior, usually passed in as the first of a sequence
of input messages.
"""
@property
def type(self) -> str:
"""Type of the message, used for serialization."""
return "system"
class SystemMessageChunk(SystemMessage, BaseMessageChunk):
pass
class FunctionMessage(BaseMessage):
"""A Message for passing the result of executing a function back to a model."""
name: str
"""The name of the function that was executed."""
@property
def type(self) -> str:
"""Type of the message, used for serialization."""
return "function"
class FunctionMessageChunk(FunctionMessage, BaseMessageChunk):
pass
class ChatMessage(BaseMessage):
"""A Message that can be assigned an arbitrary speaker (i.e. role)."""
role: str
"""The speaker / role of the Message."""
@property
def type(self) -> str:
"""Type of the message, used for serialization."""
return "chat"
class ChatMessageChunk(ChatMessage, BaseMessageChunk):
pass
def _message_to_dict(message: BaseMessage) -> dict:
return {"type": message.type, "data": message.dict()}
def messages_to_dict(messages: Sequence[BaseMessage]) -> List[dict]:
"""Convert a sequence of Messages to a list of dictionaries.
Args:
messages: Sequence of messages (as BaseMessages) to convert.
Returns:
List of messages as dicts.
"""
return [_message_to_dict(m) for m in messages]
def _message_from_dict(message: dict) -> BaseMessage:
_type = message["type"]
if _type == "human":
return HumanMessage(**message["data"])
elif _type == "ai":
return AIMessage(**message["data"])
elif _type == "system":
return SystemMessage(**message["data"])
elif _type == "chat":
return ChatMessage(**message["data"])
elif _type == "function":
return FunctionMessage(**message["data"])
else:
raise ValueError(f"Got unexpected message type: {_type}")
def messages_from_dict(messages: List[dict]) -> List[BaseMessage]:
"""Convert a sequence of messages from dicts to Message objects.
Args:
messages: Sequence of messages (as dicts) to convert.
Returns:
List of messages (BaseMessages).
"""
return [_message_from_dict(m) for m in messages]

@ -1,3 +0,0 @@
.env
__pycache__
.venv

@ -0,0 +1,163 @@
from abc import ABC
from typing import Any, Dict, List, Literal, TypedDict, Union, cast
from pydantic import BaseModel, PrivateAttr
class BaseSerialized(TypedDict):
"""Base class for serialized objects."""
lc: int
id: List[str]
class SerializedConstructor(BaseSerialized):
"""Serialized constructor."""
type: Literal["constructor"]
kwargs: Dict[str, Any]
class SerializedSecret(BaseSerialized):
"""Serialized secret."""
type: Literal["secret"]
class SerializedNotImplemented(BaseSerialized):
"""Serialized not implemented."""
type: Literal["not_implemented"]
class Serializable(BaseModel, ABC):
"""Serializable base class."""
@property
def lc_serializable(self) -> bool:
"""
Return whether or not the class is serializable.
"""
return False
@property
def lc_namespace(self) -> List[str]:
"""
Return the namespace of the langchain object.
eg. ["langchain", "llms", "openai"]
"""
return self.__class__.__module__.split(".")
@property
def lc_secrets(self) -> Dict[str, str]:
"""
Return a map of constructor argument names to secret ids.
eg. {"openai_api_key": "OPENAI_API_KEY"}
"""
return dict()
@property
def lc_attributes(self) -> Dict:
"""
Return a list of attribute names that should be included in the
serialized kwargs. These attributes must be accepted by the
constructor.
"""
return {}
class Config:
extra = "ignore"
_lc_kwargs = PrivateAttr(default_factory=dict)
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._lc_kwargs = kwargs
def to_json(self) -> Union[SerializedConstructor, SerializedNotImplemented]:
if not self.lc_serializable:
return self.to_json_not_implemented()
secrets = dict()
# Get latest values for kwargs if there is an attribute with same name
lc_kwargs = {
k: getattr(self, k, v)
for k, v in self._lc_kwargs.items()
if not (self.__exclude_fields__ or {}).get(k, False) # type: ignore
}
# Merge the lc_secrets and lc_attributes from every class in the MRO
for cls in [None, *self.__class__.mro()]:
# Once we get to Serializable, we're done
if cls is Serializable:
break
# Get a reference to self bound to each class in the MRO
this = cast(Serializable, self if cls is None else super(cls, self))
secrets.update(this.lc_secrets)
lc_kwargs.update(this.lc_attributes)
# include all secrets, even if not specified in kwargs
# as these secrets may be passed as an environment variable instead
for key in secrets.keys():
secret_value = getattr(self, key, None) or lc_kwargs.get(key)
if secret_value is not None:
lc_kwargs.update({key: secret_value})
return {
"lc": 1,
"type": "constructor",
"id": [*self.lc_namespace, self.__class__.__name__],
"kwargs": lc_kwargs
if not secrets
else _replace_secrets(lc_kwargs, secrets),
}
def to_json_not_implemented(self) -> SerializedNotImplemented:
return to_json_not_implemented(self)
def _replace_secrets(
root: Dict[Any, Any], secrets_map: Dict[str, str]
) -> Dict[Any, Any]:
result = root.copy()
for path, secret_id in secrets_map.items():
[*parts, last] = path.split(".")
current = result
for part in parts:
if part not in current:
break
current[part] = current[part].copy()
current = current[part]
if last in current:
current[last] = {
"lc": 1,
"type": "secret",
"id": [secret_id],
}
return result
def to_json_not_implemented(obj: object) -> SerializedNotImplemented:
"""Serialize a "not implemented" object.
Args:
obj: object to serialize
Returns:
SerializedNotImplemented
"""
_id: List[str] = []
try:
if hasattr(obj, "__name__"):
_id = [*obj.__module__.split("."), obj.__name__]
elif hasattr(obj, "__class__"):
_id = [*obj.__class__.__module__.split("."), obj.__class__.__name__]
except Exception:
pass
return {
"lc": 1,
"type": "not_implemented",
"id": _id,
}
Loading…
Cancel
Save