Former-commit-id: 4197920802
grit/923f7c6f-0958-480b-8748-ea6bbf1c2084
parent
7d375b72e3
commit
bab2835472
@ -0,0 +1,335 @@
|
||||
from typing import Any, AsyncIterator, Dict, Iterator, List, Optional
|
||||
|
||||
from langchain.callbacks.manager import (
|
||||
AsyncCallbackManagerForLLMRun,
|
||||
CallbackManagerForLLMRun,
|
||||
)
|
||||
from langchain.chat_models.base import (
|
||||
BaseChatModel,
|
||||
_agenerate_from_stream,
|
||||
_generate_from_stream,
|
||||
)
|
||||
from langchain.llms.cohere import BaseCohere
|
||||
from langchain.schema.messages import (
|
||||
AIMessage,
|
||||
AIMessageChunk,
|
||||
BaseMessage,
|
||||
ChatMessage,
|
||||
HumanMessage,
|
||||
SystemMessage,
|
||||
)
|
||||
from langchain.schema.output import ChatGeneration, ChatGenerationChunk, ChatResult
|
||||
|
||||
|
||||
def get_role(message: BaseMessage) -> str:
|
||||
"""Get the role of the message.
|
||||
|
||||
Args:
|
||||
message: The message.
|
||||
|
||||
Returns:
|
||||
The role of the message.
|
||||
|
||||
Raises:
|
||||
ValueError: If the message is of an unknown type.
|
||||
"""
|
||||
if isinstance(message, ChatMessage) or isinstance(message, HumanMessage):
|
||||
return "User"
|
||||
elif isinstance(message, AIMessage):
|
||||
return "Chatbot"
|
||||
elif isinstance(message, SystemMessage):
|
||||
return "System"
|
||||
else:
|
||||
raise ValueError(f"Got unknown type {message}")
|
||||
|
||||
|
||||
def get_cohere_chat_request(
|
||||
messages: List[BaseMessage],
|
||||
*,
|
||||
connectors: Optional[List[Dict[str, str]]] = None,
|
||||
**kwargs: Any,
|
||||
) -> Dict[str, Any]:
|
||||
"""Get the request for the Cohere chat API.
|
||||
|
||||
Args:
|
||||
messages: The messages.
|
||||
connectors: The connectors.
|
||||
**kwargs: The keyword arguments.
|
||||
|
||||
Returns:
|
||||
The request for the Cohere chat API.
|
||||
"""
|
||||
documents = (
|
||||
None
|
||||
if "source_documents" not in kwargs
|
||||
else [
|
||||
{
|
||||
"snippet": doc.page_content,
|
||||
"id": doc.metadata.get("id") or f"doc-{str(i)}",
|
||||
}
|
||||
for i, doc in enumerate(kwargs["source_documents"])
|
||||
]
|
||||
)
|
||||
kwargs.pop("source_documents", None)
|
||||
maybe_connectors = connectors if documents is None else None
|
||||
|
||||
# by enabling automatic prompt truncation, the probability of request failure is
|
||||
# reduced with minimal impact on response quality
|
||||
prompt_truncation = (
|
||||
"AUTO" if documents is not None or connectors is not None else None
|
||||
)
|
||||
|
||||
return {
|
||||
"message": messages[0].content,
|
||||
"chat_history": [
|
||||
{"role": get_role(x), "message": x.content} for x in messages[1:]
|
||||
],
|
||||
"documents": documents,
|
||||
"connectors": maybe_connectors,
|
||||
"prompt_truncation": prompt_truncation,
|
||||
**kwargs,
|
||||
}
|
||||
|
||||
|
||||
class CohereChat(BaseChatModel, BaseCohere):
|
||||
"""`Cohere` chat large language models.
|
||||
|
||||
To use, you should have the ``cohere`` python package installed, and the
|
||||
environment variable ``COHERE_API_KEY`` set with your API key, or pass
|
||||
it as a named parameter to the constructor.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
from swarms.models.cohere import CohereChat, HumanMessage
|
||||
|
||||
chat = CohereChat(model="foo")
|
||||
result = chat([HumanMessage(content="Hello")])
|
||||
print(result.content)
|
||||
"""
|
||||
|
||||
class Config:
|
||||
"""Configuration for this pydantic object."""
|
||||
|
||||
allow_population_by_field_name = True
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
@property
|
||||
def _llm_type(self) -> str:
|
||||
"""Return type of chat model."""
|
||||
return "cohere-chat"
|
||||
|
||||
@property
|
||||
def _default_params(self) -> Dict[str, Any]:
|
||||
"""Get the default parameters for calling Cohere API."""
|
||||
return {
|
||||
"temperature": self.temperature,
|
||||
}
|
||||
|
||||
@property
|
||||
def _identifying_params(self) -> Dict[str, Any]:
|
||||
"""Get the identifying parameters."""
|
||||
return {**{"model": self.model}, **self._default_params}
|
||||
|
||||
def _stream(
|
||||
self,
|
||||
messages: List[BaseMessage],
|
||||
stop: Optional[List[str]] = None,
|
||||
run_manager: Optional[CallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> Iterator[ChatGenerationChunk]:
|
||||
"""
|
||||
Stream the output
|
||||
|
||||
Args:
|
||||
messages: The messages.
|
||||
stop: The stop tokens.
|
||||
run_manager: The callback manager.
|
||||
**kwargs: The keyword arguments.
|
||||
|
||||
"""
|
||||
request = get_cohere_chat_request(messages, **self._default_params, **kwargs)
|
||||
stream = self.client.chat(**request, stream=True)
|
||||
|
||||
for data in stream:
|
||||
if data.event_type == "text-generation":
|
||||
delta = data.text
|
||||
yield ChatGenerationChunk(message=AIMessageChunk(content=delta))
|
||||
if run_manager:
|
||||
run_manager.on_llm_new_token(delta)
|
||||
|
||||
async def _astream(
|
||||
self,
|
||||
messages: List[BaseMessage],
|
||||
stop: Optional[List[str]] = None,
|
||||
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> AsyncIterator[ChatGenerationChunk]:
|
||||
"""
|
||||
Stream generations from the model.
|
||||
|
||||
Args:
|
||||
messages: The messages.
|
||||
stop: The stop tokens.
|
||||
run_manager: The callback manager.
|
||||
**kwargs: The keyword arguments.
|
||||
|
||||
Yields:
|
||||
The generations.
|
||||
|
||||
Examples:
|
||||
.. code-block:: python
|
||||
|
||||
async for generation in model._astream(messages):
|
||||
print(generation.message.content)
|
||||
"""
|
||||
request = get_cohere_chat_request(messages, **self._default_params, **kwargs)
|
||||
stream = await self.async_client.chat(**request, stream=True)
|
||||
|
||||
async for data in stream:
|
||||
if data.event_type == "text-generation":
|
||||
delta = data.text
|
||||
yield ChatGenerationChunk(message=AIMessageChunk(content=delta))
|
||||
if run_manager:
|
||||
await run_manager.on_llm_new_token(delta)
|
||||
|
||||
def _get_generation_info(self, response: Any) -> Dict[str, Any]:
|
||||
"""Get the generation info from cohere API response."""
|
||||
return {
|
||||
"documents": response.documents,
|
||||
"citations": response.citations,
|
||||
"search_results": response.search_results,
|
||||
"search_queries": response.search_queries,
|
||||
"token_count": response.token_count,
|
||||
}
|
||||
|
||||
def _run(
|
||||
self,
|
||||
messages: List[BaseMessage],
|
||||
stop: Optional[List[str]] = None,
|
||||
run_manager: Optional[CallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> ChatResult:
|
||||
"""
|
||||
Run the model.
|
||||
|
||||
Args:
|
||||
messages: The messages.
|
||||
stop: The stop tokens.
|
||||
run_manager: The callback manager.
|
||||
**kwargs: The keyword arguments.
|
||||
|
||||
Returns:
|
||||
The result.
|
||||
|
||||
Examples:
|
||||
.. code-block:: python
|
||||
|
||||
result = model._run(messages)
|
||||
print(result.content)
|
||||
"""
|
||||
if self.streaming:
|
||||
stream_iter = self._stream(
|
||||
messages, stop=stop, run_manager=run_manager, **kwargs
|
||||
)
|
||||
return _generate_from_stream(stream_iter)
|
||||
|
||||
request = get_cohere_chat_request(messages, **self._default_params, **kwargs)
|
||||
response = self.client.chat(**request)
|
||||
|
||||
message = AIMessage(content=response.text)
|
||||
generation_info = None
|
||||
if hasattr(response, "documents"):
|
||||
generation_info = self._get_generation_info(response)
|
||||
return ChatResult(
|
||||
generations=[
|
||||
ChatGeneration(message=message, generation_info=generation_info)
|
||||
]
|
||||
)
|
||||
|
||||
def __call__(
|
||||
self,
|
||||
messages: List[BaseMessage],
|
||||
stop: Optional[List[str]] = None,
|
||||
run_manager: Optional[CallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> ChatResult:
|
||||
"""
|
||||
__Call__ the model.
|
||||
|
||||
Args:
|
||||
messages: The messages.
|
||||
stop: The stop tokens.
|
||||
run_manager: The callback manager.
|
||||
**kwargs: The keyword arguments.
|
||||
|
||||
Returns:
|
||||
The result.
|
||||
"""
|
||||
if self.streaming:
|
||||
stream_iter = self._stream(
|
||||
messages, stop=stop, run_manager=run_manager, **kwargs
|
||||
)
|
||||
return _generate_from_stream(stream_iter)
|
||||
|
||||
request = get_cohere_chat_request(messages, **self._default_params, **kwargs)
|
||||
response = self.client.chat(**request)
|
||||
|
||||
message = AIMessage(content=response.text)
|
||||
generation_info = None
|
||||
if hasattr(response, "documents"):
|
||||
generation_info = self._get_generation_info(response)
|
||||
return ChatResult(
|
||||
generations=[
|
||||
ChatGeneration(message=message, generation_info=generation_info)
|
||||
]
|
||||
)
|
||||
|
||||
async def _arun(
|
||||
self,
|
||||
messages: List[BaseMessage],
|
||||
stop: Optional[List[str]] = None,
|
||||
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> ChatResult:
|
||||
"""
|
||||
Asynchronously run the model.
|
||||
|
||||
Args:
|
||||
messages: The messages.
|
||||
stop: The stop tokens.
|
||||
run_manager: The callback manager.
|
||||
**kwargs: The keyword arguments.
|
||||
|
||||
Returns:
|
||||
The result.
|
||||
|
||||
Examples:
|
||||
.. code-block:: python
|
||||
|
||||
result = await model._arun(messages)
|
||||
print(result.content)
|
||||
|
||||
"""
|
||||
if self.streaming:
|
||||
stream_iter = self._astream(
|
||||
messages, stop=stop, run_manager=run_manager, **kwargs
|
||||
)
|
||||
return await _agenerate_from_stream(stream_iter)
|
||||
|
||||
request = get_cohere_chat_request(messages, **self._default_params, **kwargs)
|
||||
response = self.client.chat(**request, stream=False)
|
||||
|
||||
message = AIMessage(content=response.text)
|
||||
generation_info = None
|
||||
if hasattr(response, "documents"):
|
||||
generation_info = self._get_generation_info(response)
|
||||
return ChatResult(
|
||||
generations=[
|
||||
ChatGeneration(message=message, generation_info=generation_info)
|
||||
]
|
||||
)
|
||||
|
||||
def get_num_tokens(self, text: str) -> int:
|
||||
"""Calculate number of tokens."""
|
||||
return len(self.client.tokenize(text).tokens)
|
@ -1,144 +0,0 @@
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
from swarms.agents.agents import (
|
||||
AgentNodeInitializer,
|
||||
AgentNode,
|
||||
agent,
|
||||
) # replace with actual import
|
||||
|
||||
|
||||
# For initializing AgentNodeInitializer in multiple tests
|
||||
@pytest.fixture
|
||||
def mock_agent_node_initializer():
|
||||
with patch("swarms.agents.agents.ChatOpenAI") as mock_llm, patch(
|
||||
"swarms.agents.agents.AutoGPT"
|
||||
) as mock_agent:
|
||||
initializer = AgentNodeInitializer(
|
||||
model_type="openai",
|
||||
model_id="test",
|
||||
openai_api_key="test_key",
|
||||
temperature=0.5,
|
||||
)
|
||||
initializer.llm = mock_llm
|
||||
initializer.tools = [Mock(spec=BaseTool)]
|
||||
initializer.vectorstore = Mock()
|
||||
initializer.agent = mock_agent
|
||||
|
||||
return initializer
|
||||
|
||||
|
||||
# Test initialize_llm method of AgentNodeInitializer class
|
||||
@pytest.mark.parametrize("model_type", ["openai", "huggingface", "invalid"])
|
||||
def test_agent_node_initializer_initialize_llm(model_type, mock_agent_node_initializer):
|
||||
with patch("swarms.agents.agents.ChatOpenAI") as mock_openai, patch(
|
||||
"swarms.agents.agents.HuggingFaceLLM"
|
||||
) as mock_huggingface:
|
||||
if model_type == "invalid":
|
||||
with pytest.raises(ValueError):
|
||||
mock_agent_node_initializer.initialize_llm(
|
||||
model_type, "model_id", "openai_api_key", 0.5
|
||||
)
|
||||
else:
|
||||
mock_agent_node_initializer.initialize_llm(
|
||||
model_type, "model_id", "openai_api_key", 0.5
|
||||
)
|
||||
if model_type == "openai":
|
||||
mock_openai.assert_called_once()
|
||||
elif model_type == "huggingface":
|
||||
mock_huggingface.assert_called_once()
|
||||
|
||||
|
||||
# Test add_tool method of AgentNodeInitializer class
|
||||
def test_agent_node_initializer_add_tool(mock_agent_node_initializer):
|
||||
with patch("swarms.agents.agents.BaseTool") as mock_base_tool:
|
||||
mock_agent_node_initializer.add_tool(mock_base_tool)
|
||||
assert mock_base_tool in mock_agent_node_initializer.tools
|
||||
|
||||
|
||||
# Test run method of AgentNodeInitializer class
|
||||
@pytest.mark.parametrize("prompt", ["valid prompt", ""])
|
||||
def test_agent_node_initializer_run(prompt, mock_agent_node_initializer):
|
||||
if prompt == "":
|
||||
with pytest.raises(ValueError):
|
||||
mock_agent_node_initializer.run(prompt)
|
||||
else:
|
||||
assert mock_agent_node_initializer.run(prompt) == "Task completed by AgentNode"
|
||||
|
||||
|
||||
# For initializing AgentNode in multiple tests
|
||||
@pytest.fixture
|
||||
def mock_agent_node():
|
||||
with patch("swarms.agents.agents.ChatOpenAI") as mock_llm, patch(
|
||||
"swarms.agents.agents.AgentNodeInitializer"
|
||||
) as mock_agent_node_initializer:
|
||||
mock_agent_node = AgentNode("test_key")
|
||||
mock_agent_node.llm_class = mock_llm
|
||||
mock_agent_node.vectorstore = Mock()
|
||||
mock_agent_node_initializer.llm = mock_llm
|
||||
|
||||
return mock_agent_node
|
||||
|
||||
|
||||
# Test initialize_llm method of AgentNode class
|
||||
@pytest.mark.parametrize("llm_class", ["openai", "huggingface"])
|
||||
def test_agent_node_initialize_llm(llm_class, mock_agent_node):
|
||||
with patch("swarms.agents.agents.ChatOpenAI") as mock_openai, patch(
|
||||
"swarms.agents.agents.HuggingFaceLLM"
|
||||
) as mock_huggingface:
|
||||
mock_agent_node.initialize_llm(llm_class)
|
||||
if llm_class == "openai":
|
||||
mock_openai.assert_called_once()
|
||||
elif llm_class == "huggingface":
|
||||
mock_huggingface.assert_called_once()
|
||||
|
||||
|
||||
# Test initialize_tools method of AgentNode class
|
||||
def test_agent_node_initialize_tools(mock_agent_node):
|
||||
with patch("swarms.agents.agents.DuckDuckGoSearchRun") as mock_ddg, patch(
|
||||
"swarms.agents.agents.WriteFileTool"
|
||||
) as mock_write_file, patch(
|
||||
"swarms.agents.agents.ReadFileTool"
|
||||
) as mock_read_file, patch(
|
||||
"swarms.agents.agents.process_csv"
|
||||
) as mock_process_csv, patch(
|
||||
"swarms.agents.agents.WebpageQATool"
|
||||
) as mock_webpage_qa:
|
||||
mock_agent_node.initialize_tools("openai")
|
||||
assert mock_ddg.called
|
||||
assert mock_write_file.called
|
||||
assert mock_read_file.called
|
||||
assert mock_process_csv.called
|
||||
assert mock_webpage_qa.called
|
||||
|
||||
|
||||
# Test create_agent method of AgentNode class
|
||||
def test_agent_node_create_agent(mock_agent_node):
|
||||
with patch.object(mock_agent_node, "initialize_llm"), patch.object(
|
||||
mock_agent_node, "initialize_tools"
|
||||
), patch.object(mock_agent_node, "initialize_vectorstore"), patch(
|
||||
"swarms.agents.agents.AgentNodeInitializer"
|
||||
) as mock_agent_node_initializer:
|
||||
mock_agent_node.create_agent()
|
||||
mock_agent_node_initializer.assert_called_once()
|
||||
mock_agent_node_initializer.return_value.create_agent.assert_called_once()
|
||||
|
||||
|
||||
# Test agent function
|
||||
@pytest.mark.parametrize(
|
||||
"openai_api_key,objective",
|
||||
[("valid_key", "valid_objective"), ("", "valid_objective"), ("valid_key", "")],
|
||||
)
|
||||
def test_agent(openai_api_key, objective):
|
||||
if openai_api_key == "" or objective == "":
|
||||
with pytest.raises(ValueError):
|
||||
agent(openai_api_key, objective)
|
||||
else:
|
||||
with patch(
|
||||
"swarms.agents.agents.AgentNodeInitializer"
|
||||
) as mock_agent_node_initializer:
|
||||
mock_agent_node = (
|
||||
mock_agent_node_initializer.return_value.create_agent.return_value
|
||||
)
|
||||
mock_agent_node.run.return_value = "Agent output"
|
||||
result = agent(openai_api_key, objective)
|
||||
assert result == "Agent output"
|
@ -1,59 +0,0 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, Mock
|
||||
from apps.discord import (
|
||||
Bot,
|
||||
) # Replace 'Bot' with the name of the file containing your bot's code.
|
||||
|
||||
|
||||
class TestBot(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.llm_mock = Mock()
|
||||
self.agent_mock = Mock()
|
||||
self.bot = Bot(agent=self.agent_mock, llm=self.llm_mock)
|
||||
|
||||
@patch("Bot.load_dotenv") # Mocking the `load_dotenv` function call.
|
||||
def test_initialization(self, mock_load_dotenv):
|
||||
self.assertIsNotNone(self.bot.bot)
|
||||
self.assertEqual(self.bot.agent, self.agent_mock)
|
||||
self.assertEqual(self.bot.llm, self.llm_mock)
|
||||
mock_load_dotenv.assert_called_once()
|
||||
|
||||
@patch("Bot.commands.bot")
|
||||
def test_greet(self, mock_bot):
|
||||
ctx_mock = Mock()
|
||||
ctx_mock.author.name = "TestUser"
|
||||
self.bot.bot.clear()
|
||||
self.bot.bot.greet(ctx_mock)
|
||||
ctx_mock.send.assert_called_with("hello, TestUser!")
|
||||
|
||||
# Similarly, you can add tests for other commands.
|
||||
|
||||
@patch("Bot.commands.bot")
|
||||
def test_help_me(self, mock_bot):
|
||||
ctx_mock = Mock()
|
||||
self.bot.bot.clear()
|
||||
self.bot.bot.help_me(ctx_mock)
|
||||
# Verify the help text was sent. You can check for a substring to make it shorter.
|
||||
ctx_mock.send.assert_called()
|
||||
|
||||
@patch("Bot.commands.bot")
|
||||
def test_on_command_error(self, mock_bot):
|
||||
ctx_mock = Mock()
|
||||
error_mock = Mock()
|
||||
error_mock.__class__.__name__ = "CommandNotFound"
|
||||
self.bot.bot.clear()
|
||||
self.bot.bot.on_command_error(ctx_mock, error_mock)
|
||||
ctx_mock.send.assert_called_with("that command does not exist!")
|
||||
|
||||
def test_add_command(self):
|
||||
def sample_function(*args):
|
||||
return "Test Response"
|
||||
|
||||
self.bot.add_command("test_command", sample_function)
|
||||
# Here, you can further test by triggering the command and checking the response.
|
||||
|
||||
# You can add more tests for other commands and functionalities.
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Loading…
Reference in new issue