parent
e1b0ba42eb
commit
9ebdc9f939
@ -1,26 +0,0 @@
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pyo3::types::IntoPyDict;
|
||||
|
||||
#[test]
|
||||
fn test_execute_on_device() {
|
||||
let gil = Python::acquire_gil();
|
||||
let py = gil.python();
|
||||
|
||||
// Define a Python module for testing
|
||||
let rust_cuda = PyModule::new(py, "rust_cuda").unwrap();
|
||||
rust_cuda.add_function(wrap_pyfunction!(execute_on_device, rust_cuda).unwrap()).unwrap();
|
||||
|
||||
// Test the execute_on_device function
|
||||
let result: PyResult<f32> = rust_cuda.call1("execute_on_device", (0, 1.0f32, 2.0f32)).unwrap().extract().unwrap();
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_execute_cuda() {
|
||||
// Test the execute_cuda function
|
||||
let result = execute_cuda(0, 1.0f32, 2.0f32);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
}
|
@ -1,62 +0,0 @@
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pyo3::types::IntoPyDict;
|
||||
|
||||
#[test]
|
||||
fn test_process_python_modules() {
|
||||
let gil = Python::acquire_gil();
|
||||
let py = gil.python();
|
||||
|
||||
// Define a Python module for testing
|
||||
let code = r#"
|
||||
def test_function():
|
||||
return "Hello, World!"
|
||||
"#;
|
||||
let test_module = PyModule::new(py, "test_module").unwrap();
|
||||
test_module.add_function(wrap_pyfunction!(test_function, test_module).unwrap()).unwrap();
|
||||
test_module.add(py, "test_function", code).unwrap();
|
||||
|
||||
// Define a PythonModule struct for testing
|
||||
let test_python_module = PythonModule {
|
||||
name: "test_module",
|
||||
function: "test_function",
|
||||
};
|
||||
|
||||
// Test the process_python_modules function
|
||||
let result = process_python_modules(vec![test_python_module], 1);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_process_python_modules_import_error() {
|
||||
// Define a PythonModule struct with a non-existent module
|
||||
let test_python_module = PythonModule {
|
||||
name: "non_existent_module",
|
||||
function: "test_function",
|
||||
};
|
||||
|
||||
// Test the process_python_modules function
|
||||
let result = process_python_modules(vec![test_python_module], 1);
|
||||
assert!(matches!(result, Err(PythonError::ImportError(_))));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_process_python_modules_function_error() {
|
||||
let gil = Python::acquire_gil();
|
||||
let py = gil.python();
|
||||
|
||||
// Define a Python module for testing
|
||||
let test_module = PyModule::new(py, "test_module").unwrap();
|
||||
|
||||
// Define a PythonModule struct with a non-existent function
|
||||
let test_python_module = PythonModule {
|
||||
name: "test_module",
|
||||
function: "non_existent_function",
|
||||
};
|
||||
|
||||
// Test the process_python_modules function
|
||||
let result = process_python_modules(vec![test_python_module], 1);
|
||||
assert!(matches!(result, Err(PythonError::FunctionError(_))));
|
||||
}
|
||||
}
|
@ -1,50 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from swarms.models import OpenAIChat
|
||||
from swarms.structs import NonlinearWorkflow, Task
|
||||
|
||||
|
||||
class TestNonlinearWorkflow:
|
||||
def test_add_task(self):
|
||||
llm = OpenAIChat(openai_api_key="")
|
||||
task = Task(llm, "What's the weather in miami")
|
||||
workflow = NonlinearWorkflow()
|
||||
workflow.add(task)
|
||||
assert task.name in workflow.tasks
|
||||
assert task.name in workflow.edges
|
||||
|
||||
def test_run_without_tasks(self):
|
||||
workflow = NonlinearWorkflow()
|
||||
# No exception should be raised
|
||||
workflow.run()
|
||||
|
||||
def test_run_with_single_task(self):
|
||||
llm = OpenAIChat(openai_api_key="")
|
||||
task = Task(llm, "What's the weather in miami")
|
||||
workflow = NonlinearWorkflow()
|
||||
workflow.add(task)
|
||||
# No exception should be raised
|
||||
workflow.run()
|
||||
|
||||
def test_run_with_circular_dependency(self):
|
||||
llm = OpenAIChat(openai_api_key="")
|
||||
task1 = Task(llm, "What's the weather in miami")
|
||||
task2 = Task(llm, "What's the weather in new york")
|
||||
workflow = NonlinearWorkflow()
|
||||
workflow.add(task1, task2.name)
|
||||
workflow.add(task2, task1.name)
|
||||
with pytest.raises(
|
||||
Exception, match="Circular dependency detected"
|
||||
):
|
||||
workflow.run()
|
||||
|
||||
def test_run_with_stopping_token(self):
|
||||
llm = OpenAIChat(openai_api_key="")
|
||||
task1 = Task(llm, "What's the weather in miami")
|
||||
task2 = Task(llm, "What's the weather in new york")
|
||||
workflow = NonlinearWorkflow(stopping_token="stop")
|
||||
workflow.add(task1)
|
||||
workflow.add(task2)
|
||||
# Assuming that task1's execute method returns "stop"
|
||||
# No exception should be raised
|
||||
workflow.run()
|
Loading…
Reference in new issue