You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
457 lines
15 KiB
457 lines
15 KiB
6 months ago
|
import logging
|
||
|
from unittest.mock import patch
|
||
|
|
||
|
import pytest
|
||
|
import torch
|
||
|
|
||
|
from swarms.models.huggingface import HuggingfaceLLM
|
||
|
|
||
|
|
||
|
# Fixture for the class instance
|
||
|
@pytest.fixture
|
||
|
def llm_instance():
|
||
|
model_id = "NousResearch/Nous-Hermes-2-Vision-Alpha"
|
||
|
instance = HuggingfaceLLM(model_id=model_id)
|
||
|
return instance
|
||
|
|
||
|
|
||
|
# Test for instantiation and attributes
|
||
|
def test_llm_initialization(llm_instance):
|
||
|
assert (
|
||
|
llm_instance.model_id == "NousResearch/Nous-Hermes-2-Vision-Alpha"
|
||
|
)
|
||
|
assert llm_instance.max_length == 500
|
||
|
# ... add more assertions for all default attributes
|
||
|
|
||
|
|
||
|
# Parameterized test for setting devices
|
||
|
@pytest.mark.parametrize("device", ["cpu", "cuda"])
|
||
|
def test_llm_set_device(llm_instance, device):
|
||
|
llm_instance.set_device(device)
|
||
|
assert llm_instance.device == device
|
||
|
|
||
|
|
||
|
# Test exception during initialization with a bad model_id
|
||
|
def test_llm_bad_model_initialization():
|
||
|
with pytest.raises(Exception):
|
||
|
HuggingfaceLLM(model_id="unknown-model")
|
||
|
|
||
|
|
||
|
# # Mocking the tokenizer and model to test run method
|
||
|
# @patch("swarms.models.huggingface.AutoTokenizer.from_pretrained")
|
||
|
# @patch(
|
||
|
# "swarms.models.huggingface.AutoModelForCausalLM.from_pretrained"
|
||
|
# )
|
||
|
# def test_llm_run(mock_model, mock_tokenizer, llm_instance):
|
||
|
# mock_model.return_value.generate.return_value = "mocked output"
|
||
|
# mock_tokenizer.return_value.encode.return_value = "mocked input"
|
||
|
# result = llm_instance.run("test task")
|
||
|
# assert result == "mocked output"
|
||
|
|
||
|
|
||
|
# Async test (requires pytest-asyncio plugin)
|
||
|
@pytest.mark.asyncio
|
||
|
async def test_llm_run_async(llm_instance):
|
||
|
result = await llm_instance.run_async("test task")
|
||
|
assert isinstance(result, str)
|
||
|
|
||
|
|
||
|
# Test for checking GPU availability
|
||
|
def test_llm_gpu_availability(llm_instance):
|
||
|
# Assuming the test is running on a machine where the GPU availability is known
|
||
|
expected_result = torch.cuda.is_available()
|
||
|
assert llm_instance.gpu_available() == expected_result
|
||
|
|
||
|
|
||
|
# Test for memory consumption reporting
|
||
|
def test_llm_memory_consumption(llm_instance):
|
||
|
# Mocking torch.cuda functions for consistent results
|
||
|
with patch("torch.cuda.memory_allocated", return_value=1024):
|
||
|
with patch("torch.cuda.memory_reserved", return_value=2048):
|
||
|
memory = llm_instance.memory_consumption()
|
||
|
assert memory == {"allocated": 1024, "reserved": 2048}
|
||
|
|
||
|
|
||
|
# Test different initialization parameters
|
||
|
@pytest.mark.parametrize(
|
||
|
"model_id, max_length",
|
||
|
[
|
||
|
("NousResearch/Nous-Hermes-2-Vision-Alpha", 100),
|
||
|
("microsoft/Orca-2-13b", 200),
|
||
|
(
|
||
|
"berkeley-nest/Starling-LM-7B-alpha",
|
||
|
None,
|
||
|
), # None to check default behavior
|
||
|
],
|
||
|
)
|
||
|
def test_llm_initialization_params(model_id, max_length):
|
||
|
if max_length:
|
||
|
instance = HuggingfaceLLM(model_id=model_id, max_length=max_length)
|
||
|
assert instance.max_length == max_length
|
||
|
else:
|
||
|
instance = HuggingfaceLLM(model_id=model_id)
|
||
|
assert (
|
||
|
instance.max_length == 500
|
||
|
) # Assuming 500 is the default max_length
|
||
|
|
||
|
|
||
|
# Test for setting an invalid device
|
||
|
def test_llm_set_invalid_device(llm_instance):
|
||
|
with pytest.raises(ValueError):
|
||
|
llm_instance.set_device("quantum_processor")
|
||
|
|
||
|
|
||
|
# Mocking external API call to test run method without network
|
||
|
@patch("swarms.models.huggingface.HuggingfaceLLM.run")
|
||
|
def test_llm_run_without_network(mock_run, llm_instance):
|
||
|
mock_run.return_value = "mocked output"
|
||
|
result = llm_instance.run("test task without network")
|
||
|
assert result == "mocked output"
|
||
|
|
||
|
|
||
|
# Test handling of empty input for the run method
|
||
|
def test_llm_run_empty_input(llm_instance):
|
||
|
with pytest.raises(ValueError):
|
||
|
llm_instance.run("")
|
||
|
|
||
|
|
||
|
# Test the generation with a provided seed for reproducibility
|
||
|
@patch("swarms.models.huggingface.HuggingfaceLLM.run")
|
||
|
def test_llm_run_with_seed(mock_run, llm_instance):
|
||
|
seed = 42
|
||
|
llm_instance.set_seed(seed)
|
||
|
# Assuming set_seed method affects the randomness in the model
|
||
|
# You would typically ensure that setting the seed gives reproducible results
|
||
|
mock_run.return_value = "mocked deterministic output"
|
||
|
result = llm_instance.run("test task", seed=seed)
|
||
|
assert result == "mocked deterministic output"
|
||
|
|
||
|
|
||
|
# Test the output length is as expected
|
||
|
@patch("swarms.models.huggingface.HuggingfaceLLM.run")
|
||
|
def test_llm_run_output_length(mock_run, llm_instance):
|
||
|
input_text = "test task"
|
||
|
llm_instance.max_length = 50 # set a max_length for the output
|
||
|
mock_run.return_value = "mocked output" * 10 # some long text
|
||
|
result = llm_instance.run(input_text)
|
||
|
assert len(result.split()) <= llm_instance.max_length
|
||
|
|
||
|
|
||
|
# Test the tokenizer handling special tokens correctly
|
||
|
@patch("swarms.models.huggingface.HuggingfaceLLM._tokenizer.encode")
|
||
|
@patch("swarms.models.huggingface.HuggingfaceLLM._tokenizer.decode")
|
||
|
def test_llm_tokenizer_special_tokens(
|
||
|
mock_decode, mock_encode, llm_instance
|
||
|
):
|
||
|
mock_encode.return_value = "encoded input with special tokens"
|
||
|
mock_decode.return_value = "decoded output with special tokens"
|
||
|
result = llm_instance.run("test task with special tokens")
|
||
|
mock_encode.assert_called_once()
|
||
|
mock_decode.assert_called_once()
|
||
|
assert "special tokens" in result
|
||
|
|
||
|
|
||
|
# Test for correct handling of timeouts
|
||
|
@patch("swarms.models.huggingface.HuggingfaceLLM.run")
|
||
|
def test_llm_timeout_handling(mock_run, llm_instance):
|
||
|
mock_run.side_effect = TimeoutError
|
||
|
with pytest.raises(TimeoutError):
|
||
|
llm_instance.run("test task with timeout")
|
||
|
|
||
|
|
||
|
# Test for response time within a threshold (performance test)
|
||
|
@patch("swarms.models.huggingface.HuggingfaceLLM.run")
|
||
|
def test_llm_response_time(mock_run, llm_instance):
|
||
|
import time
|
||
|
|
||
|
mock_run.return_value = "mocked output"
|
||
|
start_time = time.time()
|
||
|
llm_instance.run("test task for response time")
|
||
|
end_time = time.time()
|
||
|
assert (
|
||
|
end_time - start_time < 1
|
||
|
) # Assuming the response should be faster than 1 second
|
||
|
|
||
|
|
||
|
# Test the logging of a warning for long inputs
|
||
|
@patch("swarms.models.huggingface.logging.warning")
|
||
|
def test_llm_long_input_warning(mock_warning, llm_instance):
|
||
|
long_input = "x" * 10000 # input longer than the typical limit
|
||
|
llm_instance.run(long_input)
|
||
|
mock_warning.assert_called_once()
|
||
|
|
||
|
|
||
|
# Test for run method behavior when model raises an exception
|
||
|
@patch(
|
||
|
"swarms.models.huggingface.HuggingfaceLLM._model.generate",
|
||
|
side_effect=RuntimeError,
|
||
|
)
|
||
|
def test_llm_run_model_exception(mock_generate, llm_instance):
|
||
|
with pytest.raises(RuntimeError):
|
||
|
llm_instance.run("test task when model fails")
|
||
|
|
||
|
|
||
|
# Test the behavior when GPU is forced but not available
|
||
|
@patch("torch.cuda.is_available", return_value=False)
|
||
|
def test_llm_force_gpu_when_unavailable(mock_is_available, llm_instance):
|
||
|
with pytest.raises(EnvironmentError):
|
||
|
llm_instance.set_device(
|
||
|
"cuda"
|
||
|
) # Attempt to set CUDA when it's not available
|
||
|
|
||
|
|
||
|
# Test for proper cleanup after model use (releasing resources)
|
||
|
@patch("swarms.models.huggingface.HuggingfaceLLM._model")
|
||
|
def test_llm_cleanup(mock_model, mock_tokenizer, llm_instance):
|
||
|
llm_instance.cleanup()
|
||
|
# Assuming cleanup method is meant to free resources
|
||
|
mock_model.delete.assert_called_once()
|
||
|
mock_tokenizer.delete.assert_called_once()
|
||
|
|
||
|
|
||
|
# Test model's ability to handle multilingual input
|
||
|
@patch("swarms.models.huggingface.HuggingfaceLLM.run")
|
||
|
def test_llm_multilingual_input(mock_run, llm_instance):
|
||
|
mock_run.return_value = "mocked multilingual output"
|
||
|
multilingual_input = "Bonjour, ceci est un test multilingue."
|
||
|
result = llm_instance.run(multilingual_input)
|
||
|
assert isinstance(
|
||
|
result, str
|
||
|
) # Simple check to ensure output is string type
|
||
|
|
||
|
|
||
|
# Test caching mechanism to prevent re-running the same inputs
|
||
|
@patch("swarms.models.huggingface.HuggingfaceLLM.run")
|
||
|
def test_llm_caching_mechanism(mock_run, llm_instance):
|
||
|
input_text = "test caching mechanism"
|
||
|
mock_run.return_value = "cached output"
|
||
|
# Run the input twice
|
||
|
first_run_result = llm_instance.run(input_text)
|
||
|
second_run_result = llm_instance.run(input_text)
|
||
|
mock_run.assert_called_once() # Should only be called once due to caching
|
||
|
assert first_run_result == second_run_result
|
||
|
|
||
|
|
||
|
# These tests are provided as examples. In real-world scenarios, you will need to adapt these tests to the actual logic of your `HuggingfaceLLM` class.
|
||
|
# For instance, "mock_model.delete.assert_called_once()" and similar lines are based on hypothetical methods and behaviors that you need to replace with actual implementations.
|
||
|
|
||
|
|
||
|
# Mock some functions and objects for testing
|
||
|
@pytest.fixture
|
||
|
def mock_huggingface_llm(monkeypatch):
|
||
|
# Mock the model and tokenizer creation
|
||
|
def mock_init(
|
||
|
self,
|
||
|
model_id,
|
||
|
device="cpu",
|
||
|
max_length=500,
|
||
|
quantize=False,
|
||
|
quantization_config=None,
|
||
|
verbose=False,
|
||
|
distributed=False,
|
||
|
decoding=False,
|
||
|
max_workers=5,
|
||
|
repitition_penalty=1.3,
|
||
|
no_repeat_ngram_size=5,
|
||
|
temperature=0.7,
|
||
|
top_k=40,
|
||
|
top_p=0.8,
|
||
|
):
|
||
|
pass
|
||
|
|
||
|
# Mock the model loading
|
||
|
def mock_load_model(self):
|
||
|
pass
|
||
|
|
||
|
# Mock the model generation
|
||
|
def mock_run(self, task):
|
||
|
pass
|
||
|
|
||
|
monkeypatch.setattr(HuggingfaceLLM, "__init__", mock_init)
|
||
|
monkeypatch.setattr(HuggingfaceLLM, "load_model", mock_load_model)
|
||
|
monkeypatch.setattr(HuggingfaceLLM, "run", mock_run)
|
||
|
|
||
|
|
||
|
# Basic tests for initialization and attribute settings
|
||
|
def test_init_huggingface_llm():
|
||
|
llm = HuggingfaceLLM(
|
||
|
model_id="test_model",
|
||
|
device="cuda",
|
||
|
max_length=1000,
|
||
|
quantize=True,
|
||
|
quantization_config={"config_key": "config_value"},
|
||
|
verbose=True,
|
||
|
distributed=True,
|
||
|
decoding=True,
|
||
|
max_workers=3,
|
||
|
repitition_penalty=1.5,
|
||
|
no_repeat_ngram_size=4,
|
||
|
temperature=0.8,
|
||
|
top_k=50,
|
||
|
top_p=0.7,
|
||
|
)
|
||
|
|
||
|
assert llm.model_id == "test_model"
|
||
|
assert llm.device == "cuda"
|
||
|
assert llm.max_length == 1000
|
||
|
assert llm.quantize is True
|
||
|
assert llm.quantization_config == {"config_key": "config_value"}
|
||
|
assert llm.verbose is True
|
||
|
assert llm.distributed is True
|
||
|
assert llm.decoding is True
|
||
|
assert llm.max_workers == 3
|
||
|
assert llm.repitition_penalty == 1.5
|
||
|
assert llm.no_repeat_ngram_size == 4
|
||
|
assert llm.temperature == 0.8
|
||
|
assert llm.top_k == 50
|
||
|
assert llm.top_p == 0.7
|
||
|
|
||
|
|
||
|
# Test loading the model
|
||
|
def test_load_model(mock_huggingface_llm):
|
||
|
llm = HuggingfaceLLM(model_id="test_model")
|
||
|
llm.load_model()
|
||
|
|
||
|
|
||
|
# Test running the model
|
||
|
def test_run(mock_huggingface_llm):
|
||
|
llm = HuggingfaceLLM(model_id="test_model")
|
||
|
llm.run("Test prompt")
|
||
|
|
||
|
|
||
|
# Test for setting max_length
|
||
|
def test_llm_set_max_length(llm_instance):
|
||
|
new_max_length = 1000
|
||
|
llm_instance.set_max_length(new_max_length)
|
||
|
assert llm_instance.max_length == new_max_length
|
||
|
|
||
|
|
||
|
# Test for setting verbose
|
||
|
def test_llm_set_verbose(llm_instance):
|
||
|
llm_instance.set_verbose(True)
|
||
|
assert llm_instance.verbose is True
|
||
|
|
||
|
|
||
|
# Test for setting distributed
|
||
|
def test_llm_set_distributed(llm_instance):
|
||
|
llm_instance.set_distributed(True)
|
||
|
assert llm_instance.distributed is True
|
||
|
|
||
|
|
||
|
# Test for setting decoding
|
||
|
def test_llm_set_decoding(llm_instance):
|
||
|
llm_instance.set_decoding(True)
|
||
|
assert llm_instance.decoding is True
|
||
|
|
||
|
|
||
|
# Test for setting max_workers
|
||
|
def test_llm_set_max_workers(llm_instance):
|
||
|
new_max_workers = 10
|
||
|
llm_instance.set_max_workers(new_max_workers)
|
||
|
assert llm_instance.max_workers == new_max_workers
|
||
|
|
||
|
|
||
|
# Test for setting repitition_penalty
|
||
|
def test_llm_set_repitition_penalty(llm_instance):
|
||
|
new_repitition_penalty = 1.5
|
||
|
llm_instance.set_repitition_penalty(new_repitition_penalty)
|
||
|
assert llm_instance.repitition_penalty == new_repitition_penalty
|
||
|
|
||
|
|
||
|
# Test for setting no_repeat_ngram_size
|
||
|
def test_llm_set_no_repeat_ngram_size(llm_instance):
|
||
|
new_no_repeat_ngram_size = 6
|
||
|
llm_instance.set_no_repeat_ngram_size(new_no_repeat_ngram_size)
|
||
|
assert llm_instance.no_repeat_ngram_size == new_no_repeat_ngram_size
|
||
|
|
||
|
|
||
|
# Test for setting temperature
|
||
|
def test_llm_set_temperature(llm_instance):
|
||
|
new_temperature = 0.8
|
||
|
llm_instance.set_temperature(new_temperature)
|
||
|
assert llm_instance.temperature == new_temperature
|
||
|
|
||
|
|
||
|
# Test for setting top_k
|
||
|
def test_llm_set_top_k(llm_instance):
|
||
|
new_top_k = 50
|
||
|
llm_instance.set_top_k(new_top_k)
|
||
|
assert llm_instance.top_k == new_top_k
|
||
|
|
||
|
|
||
|
# Test for setting top_p
|
||
|
def test_llm_set_top_p(llm_instance):
|
||
|
new_top_p = 0.9
|
||
|
llm_instance.set_top_p(new_top_p)
|
||
|
assert llm_instance.top_p == new_top_p
|
||
|
|
||
|
|
||
|
# Test for setting quantize
|
||
|
def test_llm_set_quantize(llm_instance):
|
||
|
llm_instance.set_quantize(True)
|
||
|
assert llm_instance.quantize is True
|
||
|
|
||
|
|
||
|
# Test for setting quantization_config
|
||
|
def test_llm_set_quantization_config(llm_instance):
|
||
|
new_quantization_config = {
|
||
|
"load_in_4bit": False,
|
||
|
"bnb_4bit_use_double_quant": False,
|
||
|
"bnb_4bit_quant_type": "nf4",
|
||
|
"bnb_4bit_compute_dtype": torch.bfloat16,
|
||
|
}
|
||
|
llm_instance.set_quantization_config(new_quantization_config)
|
||
|
assert llm_instance.quantization_config == new_quantization_config
|
||
|
|
||
|
|
||
|
# Test for setting model_id
|
||
|
def test_llm_set_model_id(llm_instance):
|
||
|
new_model_id = "EleutherAI/gpt-neo-2.7B"
|
||
|
llm_instance.set_model_id(new_model_id)
|
||
|
assert llm_instance.model_id == new_model_id
|
||
|
|
||
|
|
||
|
# Test for setting model
|
||
|
@patch("swarms.models.huggingface.AutoModelForCausalLM.from_pretrained")
|
||
|
def test_llm_set_model(mock_model, llm_instance):
|
||
|
mock_model.return_value = "mocked model"
|
||
|
llm_instance.set_model(mock_model)
|
||
|
assert llm_instance.model == "mocked model"
|
||
|
|
||
|
|
||
|
# Test for setting tokenizer
|
||
|
@patch("swarms.models.huggingface.AutoTokenizer.from_pretrained")
|
||
|
def test_llm_set_tokenizer(mock_tokenizer, llm_instance):
|
||
|
mock_tokenizer.return_value = "mocked tokenizer"
|
||
|
llm_instance.set_tokenizer(mock_tokenizer)
|
||
|
assert llm_instance.tokenizer == "mocked tokenizer"
|
||
|
|
||
|
|
||
|
# Test for setting logger
|
||
|
def test_llm_set_logger(llm_instance):
|
||
|
new_logger = logging.getLogger("test_logger")
|
||
|
llm_instance.set_logger(new_logger)
|
||
|
assert llm_instance.logger == new_logger
|
||
|
|
||
|
|
||
|
# Test for saving model
|
||
|
@patch("torch.save")
|
||
|
def test_llm_save_model(mock_save, llm_instance):
|
||
|
llm_instance.save_model("path/to/save")
|
||
|
mock_save.assert_called_once()
|
||
|
|
||
|
|
||
|
# Test for print_dashboard
|
||
|
@patch("builtins.print")
|
||
|
def test_llm_print_dashboard(mock_print, llm_instance):
|
||
|
llm_instance.print_dashboard("test task")
|
||
|
mock_print.assert_called()
|
||
|
|
||
|
|
||
|
# Test for __call__ method
|
||
|
@patch("swarms.models.huggingface.HuggingfaceLLM.run")
|
||
|
def test_llm_call(mock_run, llm_instance):
|
||
|
mock_run.return_value = "mocked output"
|
||
|
result = llm_instance("test task")
|
||
|
assert result == "mocked output"
|