You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
599 lines
17 KiB
599 lines
17 KiB
3 weeks ago
|
import asyncio
|
||
|
from swarms import Agent
|
||
|
from swarm_models import OpenAIChat
|
||
|
import os
|
||
|
import time
|
||
|
import json
|
||
|
import yaml
|
||
|
import tempfile
|
||
|
|
||
|
|
||
|
def test_basic_agent_functionality():
|
||
|
"""Test basic agent initialization and simple task execution"""
|
||
|
print("\nTesting basic agent functionality...")
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(agent_name="Test-Agent", llm=model, max_loops=1)
|
||
|
|
||
|
response = agent.run("What is 2+2?")
|
||
|
assert response is not None, "Agent response should not be None"
|
||
|
|
||
|
# Test agent properties
|
||
|
assert (
|
||
|
agent.agent_name == "Test-Agent"
|
||
|
), "Agent name not set correctly"
|
||
|
assert agent.max_loops == 1, "Max loops not set correctly"
|
||
|
assert agent.llm is not None, "LLM not initialized"
|
||
|
|
||
|
print("✓ Basic agent functionality test passed")
|
||
|
|
||
|
|
||
|
def test_memory_management():
|
||
|
"""Test agent memory management functionality"""
|
||
|
print("\nTesting memory management...")
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(
|
||
|
agent_name="Memory-Test-Agent",
|
||
|
llm=model,
|
||
|
max_loops=1,
|
||
|
context_length=8192,
|
||
|
)
|
||
|
|
||
|
# Test adding to memory
|
||
|
agent.add_memory("Test memory entry")
|
||
|
assert (
|
||
|
"Test memory entry"
|
||
|
in agent.short_memory.return_history_as_string()
|
||
|
)
|
||
|
|
||
|
# Test memory query
|
||
|
agent.memory_query("Test query")
|
||
|
|
||
|
# Test token counting
|
||
|
tokens = agent.check_available_tokens()
|
||
|
assert isinstance(tokens, int), "Token count should be an integer"
|
||
|
|
||
|
print("✓ Memory management test passed")
|
||
|
|
||
|
|
||
|
def test_agent_output_formats():
|
||
|
"""Test all available output formats"""
|
||
|
print("\nTesting all output formats...")
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
test_task = "Say hello!"
|
||
|
|
||
|
output_types = {
|
||
|
"str": str,
|
||
|
"string": str,
|
||
|
"list": str, # JSON string containing list
|
||
|
"json": str, # JSON string
|
||
|
"dict": dict,
|
||
|
"yaml": str,
|
||
|
}
|
||
|
|
||
|
for output_type, expected_type in output_types.items():
|
||
|
agent = Agent(
|
||
|
agent_name=f"{output_type.capitalize()}-Output-Agent",
|
||
|
llm=model,
|
||
|
max_loops=1,
|
||
|
output_type=output_type,
|
||
|
)
|
||
|
|
||
|
response = agent.run(test_task)
|
||
|
assert (
|
||
|
response is not None
|
||
|
), f"{output_type} output should not be None"
|
||
|
|
||
|
if output_type == "yaml":
|
||
|
# Verify YAML can be parsed
|
||
|
try:
|
||
|
yaml.safe_load(response)
|
||
|
print(f"✓ {output_type} output valid")
|
||
|
except yaml.YAMLError:
|
||
|
assert False, f"Invalid YAML output for {output_type}"
|
||
|
elif output_type in ["json", "list"]:
|
||
|
# Verify JSON can be parsed
|
||
|
try:
|
||
|
json.loads(response)
|
||
|
print(f"✓ {output_type} output valid")
|
||
|
except json.JSONDecodeError:
|
||
|
assert False, f"Invalid JSON output for {output_type}"
|
||
|
|
||
|
print("✓ Output formats test passed")
|
||
|
|
||
|
|
||
|
def test_agent_state_management():
|
||
|
"""Test comprehensive state management functionality"""
|
||
|
print("\nTesting state management...")
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
|
||
|
# Create temporary directory for test files
|
||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||
|
state_path = os.path.join(temp_dir, "agent_state.json")
|
||
|
|
||
|
# Create agent with initial state
|
||
|
agent1 = Agent(
|
||
|
agent_name="State-Test-Agent",
|
||
|
llm=model,
|
||
|
max_loops=1,
|
||
|
saved_state_path=state_path,
|
||
|
)
|
||
|
|
||
|
# Add some data to the agent
|
||
|
agent1.run("Remember this: Test message 1")
|
||
|
agent1.add_memory("Test message 2")
|
||
|
|
||
|
# Save state
|
||
|
agent1.save()
|
||
|
assert os.path.exists(state_path), "State file not created"
|
||
|
|
||
|
# Create new agent and load state
|
||
|
agent2 = Agent(
|
||
|
agent_name="State-Test-Agent", llm=model, max_loops=1
|
||
|
)
|
||
|
agent2.load(state_path)
|
||
|
|
||
|
# Verify state loaded correctly
|
||
|
history2 = agent2.short_memory.return_history_as_string()
|
||
|
assert (
|
||
|
"Test message 1" in history2
|
||
|
), "State not loaded correctly"
|
||
|
assert (
|
||
|
"Test message 2" in history2
|
||
|
), "Memory not loaded correctly"
|
||
|
|
||
|
# Test autosave functionality
|
||
|
agent3 = Agent(
|
||
|
agent_name="Autosave-Test-Agent",
|
||
|
llm=model,
|
||
|
max_loops=1,
|
||
|
saved_state_path=os.path.join(
|
||
|
temp_dir, "autosave_state.json"
|
||
|
),
|
||
|
autosave=True,
|
||
|
)
|
||
|
|
||
|
agent3.run("Test autosave")
|
||
|
time.sleep(2) # Wait for autosave
|
||
|
assert os.path.exists(
|
||
|
os.path.join(temp_dir, "autosave_state.json")
|
||
|
), "Autosave file not created"
|
||
|
|
||
|
print("✓ State management test passed")
|
||
|
|
||
|
|
||
|
def test_agent_tools_and_execution():
|
||
|
"""Test agent tool handling and execution"""
|
||
|
print("\nTesting tools and execution...")
|
||
|
|
||
|
def sample_tool(x: int, y: int) -> int:
|
||
|
"""Sample tool that adds two numbers"""
|
||
|
return x + y
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(
|
||
|
agent_name="Tools-Test-Agent",
|
||
|
llm=model,
|
||
|
max_loops=1,
|
||
|
tools=[sample_tool],
|
||
|
)
|
||
|
|
||
|
# Test adding tools
|
||
|
agent.add_tool(lambda x: x * 2)
|
||
|
assert len(agent.tools) == 2, "Tool not added correctly"
|
||
|
|
||
|
# Test removing tools
|
||
|
agent.remove_tool(sample_tool)
|
||
|
assert len(agent.tools) == 1, "Tool not removed correctly"
|
||
|
|
||
|
# Test tool execution
|
||
|
response = agent.run("Calculate 2 + 2 using the sample tool")
|
||
|
assert response is not None, "Tool execution failed"
|
||
|
|
||
|
print("✓ Tools and execution test passed")
|
||
|
|
||
|
|
||
|
def test_agent_concurrent_execution():
|
||
|
"""Test agent concurrent execution capabilities"""
|
||
|
print("\nTesting concurrent execution...")
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(
|
||
|
agent_name="Concurrent-Test-Agent", llm=model, max_loops=1
|
||
|
)
|
||
|
|
||
|
# Test bulk run
|
||
|
tasks = [
|
||
|
{"task": "Count to 3"},
|
||
|
{"task": "Say hello"},
|
||
|
{"task": "Tell a short joke"},
|
||
|
]
|
||
|
|
||
|
responses = agent.bulk_run(tasks)
|
||
|
assert len(responses) == len(tasks), "Not all tasks completed"
|
||
|
assert all(
|
||
|
response is not None for response in responses
|
||
|
), "Some tasks failed"
|
||
|
|
||
|
# Test concurrent tasks
|
||
|
concurrent_responses = agent.run_concurrent_tasks(
|
||
|
["Task 1", "Task 2", "Task 3"]
|
||
|
)
|
||
|
assert (
|
||
|
len(concurrent_responses) == 3
|
||
|
), "Not all concurrent tasks completed"
|
||
|
|
||
|
print("✓ Concurrent execution test passed")
|
||
|
|
||
|
|
||
|
def test_agent_error_handling():
|
||
|
"""Test agent error handling and recovery"""
|
||
|
print("\nTesting error handling...")
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(
|
||
|
agent_name="Error-Test-Agent",
|
||
|
llm=model,
|
||
|
max_loops=1,
|
||
|
retry_attempts=3,
|
||
|
retry_interval=1,
|
||
|
)
|
||
|
|
||
|
# Test invalid tool execution
|
||
|
try:
|
||
|
agent.parse_and_execute_tools("invalid_json")
|
||
|
print("✓ Invalid tool execution handled")
|
||
|
except Exception:
|
||
|
assert True, "Expected error caught"
|
||
|
|
||
|
# Test recovery after error
|
||
|
response = agent.run("Continue after error")
|
||
|
assert response is not None, "Agent failed to recover after error"
|
||
|
|
||
|
print("✓ Error handling test passed")
|
||
|
|
||
|
|
||
|
def test_agent_configuration():
|
||
|
"""Test agent configuration and parameters"""
|
||
|
print("\nTesting agent configuration...")
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(
|
||
|
agent_name="Config-Test-Agent",
|
||
|
llm=model,
|
||
|
max_loops=1,
|
||
|
temperature=0.7,
|
||
|
max_tokens=4000,
|
||
|
context_length=8192,
|
||
|
)
|
||
|
|
||
|
# Test configuration methods
|
||
|
agent.update_system_prompt("New system prompt")
|
||
|
agent.update_max_loops(2)
|
||
|
agent.update_loop_interval(2)
|
||
|
|
||
|
# Verify updates
|
||
|
assert agent.max_loops == 2, "Max loops not updated"
|
||
|
assert agent.loop_interval == 2, "Loop interval not updated"
|
||
|
|
||
|
# Test configuration export
|
||
|
config_dict = agent.to_dict()
|
||
|
assert isinstance(
|
||
|
config_dict, dict
|
||
|
), "Configuration export failed"
|
||
|
|
||
|
# Test YAML export
|
||
|
yaml_config = agent.to_yaml()
|
||
|
assert isinstance(yaml_config, str), "YAML export failed"
|
||
|
|
||
|
print("✓ Configuration test passed")
|
||
|
|
||
|
|
||
|
def test_agent_with_stopping_condition():
|
||
|
"""Test agent with custom stopping condition"""
|
||
|
print("\nTesting agent with stopping condition...")
|
||
|
|
||
|
def custom_stopping_condition(response: str) -> bool:
|
||
|
return "STOP" in response.upper()
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(
|
||
|
agent_name="Stopping-Condition-Agent",
|
||
|
llm=model,
|
||
|
max_loops=5,
|
||
|
stopping_condition=custom_stopping_condition,
|
||
|
)
|
||
|
|
||
|
response = agent.run("Count up until you see the word STOP")
|
||
|
assert response is not None, "Stopping condition test failed"
|
||
|
print("✓ Stopping condition test passed")
|
||
|
|
||
|
|
||
|
def test_agent_with_retry_mechanism():
|
||
|
"""Test agent retry mechanism"""
|
||
|
print("\nTesting agent retry mechanism...")
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(
|
||
|
agent_name="Retry-Test-Agent",
|
||
|
llm=model,
|
||
|
max_loops=1,
|
||
|
retry_attempts=3,
|
||
|
retry_interval=1,
|
||
|
)
|
||
|
|
||
|
response = agent.run("Tell me a joke.")
|
||
|
assert response is not None, "Retry mechanism test failed"
|
||
|
print("✓ Retry mechanism test passed")
|
||
|
|
||
|
|
||
|
def test_bulk_and_filtered_operations():
|
||
|
"""Test bulk operations and response filtering"""
|
||
|
print("\nTesting bulk and filtered operations...")
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(
|
||
|
agent_name="Bulk-Filter-Test-Agent", llm=model, max_loops=1
|
||
|
)
|
||
|
|
||
|
# Test bulk run
|
||
|
bulk_tasks = [
|
||
|
{"task": "What is 2+2?"},
|
||
|
{"task": "Name a color"},
|
||
|
{"task": "Count to 3"},
|
||
|
]
|
||
|
bulk_responses = agent.bulk_run(bulk_tasks)
|
||
|
assert len(bulk_responses) == len(
|
||
|
bulk_tasks
|
||
|
), "Bulk run should return same number of responses as tasks"
|
||
|
|
||
|
# Test response filtering
|
||
|
agent.add_response_filter("color")
|
||
|
filtered_response = agent.filtered_run(
|
||
|
"What is your favorite color?"
|
||
|
)
|
||
|
assert (
|
||
|
"[FILTERED]" in filtered_response
|
||
|
), "Response filter not applied"
|
||
|
|
||
|
print("✓ Bulk and filtered operations test passed")
|
||
|
|
||
|
|
||
|
async def test_async_operations():
|
||
|
"""Test asynchronous operations"""
|
||
|
print("\nTesting async operations...")
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(
|
||
|
agent_name="Async-Test-Agent", llm=model, max_loops=1
|
||
|
)
|
||
|
|
||
|
# Test single async run
|
||
|
response = await agent.arun("What is 1+1?")
|
||
|
assert response is not None, "Async run failed"
|
||
|
|
||
|
# Test concurrent async runs
|
||
|
tasks = ["Task 1", "Task 2", "Task 3"]
|
||
|
responses = await asyncio.gather(
|
||
|
*[agent.arun(task) for task in tasks]
|
||
|
)
|
||
|
assert len(responses) == len(
|
||
|
tasks
|
||
|
), "Not all async tasks completed"
|
||
|
|
||
|
print("✓ Async operations test passed")
|
||
|
|
||
|
|
||
|
def test_memory_and_state_persistence():
|
||
|
"""Test memory management and state persistence"""
|
||
|
print("\nTesting memory and state persistence...")
|
||
|
|
||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||
|
state_path = os.path.join(temp_dir, "test_state.json")
|
||
|
|
||
|
# Create agent with memory configuration
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent1 = Agent(
|
||
|
agent_name="Memory-State-Test-Agent",
|
||
|
llm=model,
|
||
|
max_loops=1,
|
||
|
saved_state_path=state_path,
|
||
|
context_length=8192,
|
||
|
autosave=True,
|
||
|
)
|
||
|
|
||
|
# Test memory operations
|
||
|
agent1.add_memory("Important fact: The sky is blue")
|
||
|
agent1.memory_query("What color is the sky?")
|
||
|
|
||
|
# Save state
|
||
|
agent1.save()
|
||
|
|
||
|
# Create new agent and load state
|
||
|
agent2 = Agent(
|
||
|
agent_name="Memory-State-Test-Agent",
|
||
|
llm=model,
|
||
|
max_loops=1,
|
||
|
)
|
||
|
agent2.load(state_path)
|
||
|
|
||
|
# Verify memory persistence
|
||
|
memory_content = (
|
||
|
agent2.short_memory.return_history_as_string()
|
||
|
)
|
||
|
assert (
|
||
|
"sky is blue" in memory_content
|
||
|
), "Memory not properly persisted"
|
||
|
|
||
|
print("✓ Memory and state persistence test passed")
|
||
|
|
||
|
|
||
|
def test_sentiment_and_evaluation():
|
||
|
"""Test sentiment analysis and response evaluation"""
|
||
|
print("\nTesting sentiment analysis and evaluation...")
|
||
|
|
||
|
def mock_sentiment_analyzer(text):
|
||
|
"""Mock sentiment analyzer that returns a score between 0 and 1"""
|
||
|
return 0.7 if "positive" in text.lower() else 0.3
|
||
|
|
||
|
def mock_evaluator(response):
|
||
|
"""Mock evaluator that checks response quality"""
|
||
|
return "GOOD" if len(response) > 10 else "BAD"
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(
|
||
|
agent_name="Sentiment-Eval-Test-Agent",
|
||
|
llm=model,
|
||
|
max_loops=1,
|
||
|
sentiment_analyzer=mock_sentiment_analyzer,
|
||
|
sentiment_threshold=0.5,
|
||
|
evaluator=mock_evaluator,
|
||
|
)
|
||
|
|
||
|
# Test sentiment analysis
|
||
|
agent.run("Generate a positive message")
|
||
|
|
||
|
# Test evaluation
|
||
|
agent.run("Generate a detailed response")
|
||
|
|
||
|
print("✓ Sentiment and evaluation test passed")
|
||
|
|
||
|
|
||
|
def test_tool_management():
|
||
|
"""Test tool management functionality"""
|
||
|
print("\nTesting tool management...")
|
||
|
|
||
|
def tool1(x: int) -> int:
|
||
|
"""Sample tool 1"""
|
||
|
return x * 2
|
||
|
|
||
|
def tool2(x: int) -> int:
|
||
|
"""Sample tool 2"""
|
||
|
return x + 2
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(
|
||
|
agent_name="Tool-Test-Agent",
|
||
|
llm=model,
|
||
|
max_loops=1,
|
||
|
tools=[tool1],
|
||
|
)
|
||
|
|
||
|
# Test adding tools
|
||
|
agent.add_tool(tool2)
|
||
|
assert len(agent.tools) == 2, "Tool not added correctly"
|
||
|
|
||
|
# Test removing tools
|
||
|
agent.remove_tool(tool1)
|
||
|
assert len(agent.tools) == 1, "Tool not removed correctly"
|
||
|
|
||
|
# Test adding multiple tools
|
||
|
agent.add_tools([tool1, tool2])
|
||
|
assert len(agent.tools) == 3, "Multiple tools not added correctly"
|
||
|
|
||
|
print("✓ Tool management test passed")
|
||
|
|
||
|
|
||
|
def test_system_prompt_and_configuration():
|
||
|
"""Test system prompt and configuration updates"""
|
||
|
print("\nTesting system prompt and configuration...")
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(
|
||
|
agent_name="Config-Test-Agent", llm=model, max_loops=1
|
||
|
)
|
||
|
|
||
|
# Test updating system prompt
|
||
|
new_prompt = "You are a helpful assistant."
|
||
|
agent.update_system_prompt(new_prompt)
|
||
|
assert (
|
||
|
agent.system_prompt == new_prompt
|
||
|
), "System prompt not updated"
|
||
|
|
||
|
# Test configuration updates
|
||
|
agent.update_max_loops(5)
|
||
|
assert agent.max_loops == 5, "Max loops not updated"
|
||
|
|
||
|
agent.update_loop_interval(2)
|
||
|
assert agent.loop_interval == 2, "Loop interval not updated"
|
||
|
|
||
|
# Test configuration export
|
||
|
config_dict = agent.to_dict()
|
||
|
assert isinstance(
|
||
|
config_dict, dict
|
||
|
), "Configuration export failed"
|
||
|
|
||
|
print("✓ System prompt and configuration test passed")
|
||
|
|
||
|
|
||
|
def test_agent_with_dynamic_temperature():
|
||
|
"""Test agent with dynamic temperature"""
|
||
|
print("\nTesting agent with dynamic temperature...")
|
||
|
|
||
|
model = OpenAIChat(model_name="gpt-4o")
|
||
|
agent = Agent(
|
||
|
agent_name="Dynamic-Temp-Agent",
|
||
|
llm=model,
|
||
|
max_loops=2,
|
||
|
dynamic_temperature_enabled=True,
|
||
|
)
|
||
|
|
||
|
response = agent.run("Generate a creative story.")
|
||
|
assert response is not None, "Dynamic temperature test failed"
|
||
|
print("✓ Dynamic temperature test passed")
|
||
|
|
||
|
|
||
|
def run_all_tests():
|
||
|
"""Run all test functions"""
|
||
|
print("Starting Extended Agent functional tests...\n")
|
||
|
|
||
|
test_functions = [
|
||
|
test_basic_agent_functionality,
|
||
|
test_memory_management,
|
||
|
test_agent_output_formats,
|
||
|
test_agent_state_management,
|
||
|
test_agent_tools_and_execution,
|
||
|
test_agent_concurrent_execution,
|
||
|
test_agent_error_handling,
|
||
|
test_agent_configuration,
|
||
|
test_agent_with_stopping_condition,
|
||
|
test_agent_with_retry_mechanism,
|
||
|
test_agent_with_dynamic_temperature,
|
||
|
test_bulk_and_filtered_operations,
|
||
|
test_memory_and_state_persistence,
|
||
|
test_sentiment_and_evaluation,
|
||
|
test_tool_management,
|
||
|
test_system_prompt_and_configuration,
|
||
|
]
|
||
|
|
||
|
# Run synchronous tests
|
||
|
total_tests = len(test_functions) + 1 # +1 for async test
|
||
|
passed_tests = 0
|
||
|
|
||
|
for test in test_functions:
|
||
|
try:
|
||
|
test()
|
||
|
passed_tests += 1
|
||
|
except Exception as e:
|
||
|
print(f"✗ Test {test.__name__} failed: {str(e)}")
|
||
|
|
||
|
# Run async test
|
||
|
try:
|
||
|
asyncio.run(test_async_operations())
|
||
|
passed_tests += 1
|
||
|
except Exception as e:
|
||
|
print(f"✗ Async operations test failed: {str(e)}")
|
||
|
|
||
|
print("\nExtended Test Summary:")
|
||
|
print(f"Total Tests: {total_tests}")
|
||
|
print(f"Passed: {passed_tests}")
|
||
|
print(f"Failed: {total_tests - passed_tests}")
|
||
|
print(f"Success Rate: {(passed_tests/total_tests)*100:.2f}%")
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
run_all_tests()
|