You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
80 lines
2.3 KiB
80 lines
2.3 KiB
6 months ago
|
import queue
|
||
|
import subprocess
|
||
|
import threading
|
||
|
|
||
|
import pytest
|
||
|
|
||
|
from swarms.tools.prebuilt.code_interpreter import ( # Adjust the import according to your project structure
|
||
|
SubprocessCodeInterpreter,
|
||
|
)
|
||
|
|
||
|
|
||
|
# Fixture for the SubprocessCodeInterpreter instance
|
||
|
@pytest.fixture
|
||
|
def interpreter():
|
||
|
return SubprocessCodeInterpreter()
|
||
|
|
||
|
|
||
|
# Test for correct initialization
|
||
|
def test_initialization(interpreter):
|
||
|
assert interpreter.start_cmd == ""
|
||
|
assert interpreter.process is None
|
||
|
assert not interpreter.debug_mode
|
||
|
assert isinstance(interpreter.output_queue, queue.Queue)
|
||
|
assert isinstance(interpreter.done, threading.Event)
|
||
|
|
||
|
|
||
|
# Test for starting and terminating process
|
||
|
def test_start_and_terminate_process(interpreter):
|
||
|
interpreter.start_cmd = "echo Hello"
|
||
|
interpreter.start_process()
|
||
|
assert isinstance(interpreter.process, subprocess.Popen)
|
||
|
interpreter.terminate()
|
||
|
assert (
|
||
|
interpreter.process.poll() is not None
|
||
|
) # Process should be terminated
|
||
|
|
||
|
|
||
|
# Test preprocess_code method
|
||
|
def test_preprocess_code(interpreter):
|
||
|
code = "print('Hello, World!')"
|
||
|
processed_code = interpreter.preprocess_code(code)
|
||
|
# Add assertions based on expected behavior of preprocess_code
|
||
|
assert processed_code == code # Example assertion
|
||
|
|
||
|
|
||
|
# Test detect_active_line method
|
||
|
def test_detect_active_line(interpreter):
|
||
|
line = "Some line of code"
|
||
|
assert (
|
||
|
interpreter.detect_active_line(line) is None
|
||
|
) # Adjust assertion based on implementation
|
||
|
|
||
|
|
||
|
# Test detect_end_of_execution method
|
||
|
def test_detect_end_of_execution(interpreter):
|
||
|
line = "End of execution line"
|
||
|
assert (
|
||
|
interpreter.detect_end_of_execution(line) is None
|
||
|
) # Adjust assertion based on implementation
|
||
|
|
||
|
|
||
|
# Test line_postprocessor method
|
||
|
def test_line_postprocessor(interpreter):
|
||
|
line = "Some output line"
|
||
|
assert (
|
||
|
interpreter.line_postprocessor(line) == line
|
||
|
) # Adjust assertion based on implementation
|
||
|
|
||
|
|
||
|
# Test handle_stream_output method
|
||
|
def test_handle_stream_output(interpreter, monkeypatch):
|
||
|
# This requires more complex setup, including monkeypatching and simulating stream output
|
||
|
# Example setup
|
||
|
def mock_readline():
|
||
|
yield "output line"
|
||
|
yield ""
|
||
|
|
||
|
monkeypatch.setattr("sys.stdout", mock_readline())
|
||
|
# More test code needed here to simulate and assert the behavior of handle_stream_output
|