parent
3e6baf25f0
commit
e06652ed8b
@ -0,0 +1,23 @@
|
||||
from swarms import Agent
|
||||
|
||||
from swarms_tools.finance.dex_screener import (
|
||||
fetch_dex_screener_profiles,
|
||||
)
|
||||
|
||||
# Initialize the agent
|
||||
agent = Agent(
|
||||
agent_name="Financial-Analysis-Agent",
|
||||
agent_description="Personal finance advisor agent",
|
||||
max_loops=1,
|
||||
model_name="gpt-4o",
|
||||
dynamic_temperature_enabled=True,
|
||||
user_name="swarms_corp",
|
||||
return_step_meta=False,
|
||||
output_type="str", # "json", "dict", "csv" OR "string" "yaml" and
|
||||
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
|
||||
interactive=False,
|
||||
)
|
||||
|
||||
token_profiles = fetch_dex_screener_profiles()
|
||||
prompt = f"Using data from DexScreener, analyze the latest tokens and provide a detailed analysis with top 5 tokens based on their potential, considering both their profiles and recent boosts. The token profiles are sourced from DexScreener's token profiles API, while the token boosts are sourced from DexScreener's latest token boosts API. {str(token_profiles)}"
|
||||
agent.run(prompt)
|
@ -0,0 +1,216 @@
|
||||
from swarms.structs.matrix_swarm import AgentMatrix, AgentOutput
|
||||
from swarms import Agent
|
||||
|
||||
|
||||
def create_test_matrix(rows: int, cols: int) -> AgentMatrix:
|
||||
"""Helper function to create a test agent matrix"""
|
||||
agents = [
|
||||
[
|
||||
Agent(
|
||||
agent_name=f"TestAgent-{i}-{j}",
|
||||
system_prompt="Test prompt",
|
||||
)
|
||||
for j in range(cols)
|
||||
]
|
||||
for i in range(rows)
|
||||
]
|
||||
return AgentMatrix(agents)
|
||||
|
||||
|
||||
def test_init():
|
||||
"""Test AgentMatrix initialization"""
|
||||
# Test valid initialization
|
||||
matrix = create_test_matrix(2, 2)
|
||||
assert isinstance(matrix, AgentMatrix)
|
||||
assert len(matrix.agents) == 2
|
||||
assert len(matrix.agents[0]) == 2
|
||||
|
||||
# Test invalid initialization
|
||||
try:
|
||||
AgentMatrix([[1, 2], [3, 4]]) # Non-agent elements
|
||||
assert False, "Should raise ValueError"
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
try:
|
||||
AgentMatrix([]) # Empty matrix
|
||||
assert False, "Should raise ValueError"
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def test_transpose():
|
||||
"""Test matrix transpose operation"""
|
||||
matrix = create_test_matrix(2, 3)
|
||||
transposed = matrix.transpose()
|
||||
|
||||
assert len(transposed.agents) == 3 # Original cols become rows
|
||||
assert len(transposed.agents[0]) == 2 # Original rows become cols
|
||||
|
||||
# Verify agent positions
|
||||
for i in range(2):
|
||||
for j in range(3):
|
||||
assert (
|
||||
matrix.agents[i][j].agent_name
|
||||
== transposed.agents[j][i].agent_name
|
||||
)
|
||||
|
||||
|
||||
def test_add():
|
||||
"""Test matrix addition"""
|
||||
matrix1 = create_test_matrix(2, 2)
|
||||
matrix2 = create_test_matrix(2, 2)
|
||||
|
||||
result = matrix1.add(matrix2)
|
||||
assert len(result.agents) == 2
|
||||
assert len(result.agents[0]) == 2
|
||||
|
||||
# Test incompatible dimensions
|
||||
matrix3 = create_test_matrix(2, 3)
|
||||
try:
|
||||
matrix1.add(matrix3)
|
||||
assert False, "Should raise ValueError"
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def test_scalar_multiply():
|
||||
"""Test scalar multiplication"""
|
||||
matrix = create_test_matrix(2, 2)
|
||||
scalar = 3
|
||||
result = matrix.scalar_multiply(scalar)
|
||||
|
||||
assert len(result.agents) == 2
|
||||
assert len(result.agents[0]) == 2 * scalar
|
||||
|
||||
# Verify agent duplication
|
||||
for i in range(len(result.agents)):
|
||||
for j in range(0, len(result.agents[0]), scalar):
|
||||
original_agent = matrix.agents[i][j // scalar]
|
||||
for k in range(scalar):
|
||||
assert (
|
||||
result.agents[i][j + k].agent_name
|
||||
== original_agent.agent_name
|
||||
)
|
||||
|
||||
|
||||
def test_multiply():
|
||||
"""Test matrix multiplication"""
|
||||
matrix1 = create_test_matrix(2, 3)
|
||||
matrix2 = create_test_matrix(3, 2)
|
||||
inputs = ["test query 1", "test query 2"]
|
||||
|
||||
result = matrix1.multiply(matrix2, inputs)
|
||||
assert len(result) == 2 # Number of rows in first matrix
|
||||
assert len(result[0]) == 2 # Number of columns in second matrix
|
||||
|
||||
# Verify output structure
|
||||
for row in result:
|
||||
for output in row:
|
||||
assert isinstance(output, AgentOutput)
|
||||
assert isinstance(output.input_query, str)
|
||||
assert isinstance(output.metadata, dict)
|
||||
|
||||
|
||||
def test_subtract():
|
||||
"""Test matrix subtraction"""
|
||||
matrix1 = create_test_matrix(2, 2)
|
||||
matrix2 = create_test_matrix(2, 2)
|
||||
|
||||
result = matrix1.subtract(matrix2)
|
||||
assert len(result.agents) == 2
|
||||
assert len(result.agents[0]) == 2
|
||||
|
||||
|
||||
def test_identity():
|
||||
"""Test identity matrix creation"""
|
||||
matrix = create_test_matrix(3, 3)
|
||||
identity = matrix.identity(3)
|
||||
|
||||
assert len(identity.agents) == 3
|
||||
assert len(identity.agents[0]) == 3
|
||||
|
||||
# Verify diagonal elements are from original matrix
|
||||
for i in range(3):
|
||||
assert (
|
||||
identity.agents[i][i].agent_name
|
||||
== matrix.agents[i][i].agent_name
|
||||
)
|
||||
|
||||
# Verify non-diagonal elements are zero agents
|
||||
for j in range(3):
|
||||
if i != j:
|
||||
assert identity.agents[i][j].agent_name.startswith(
|
||||
"Zero-Agent"
|
||||
)
|
||||
|
||||
|
||||
def test_determinant():
|
||||
"""Test determinant calculation"""
|
||||
# Test 1x1 matrix
|
||||
matrix1 = create_test_matrix(1, 1)
|
||||
det1 = matrix1.determinant()
|
||||
assert det1 is not None
|
||||
|
||||
# Test 2x2 matrix
|
||||
matrix2 = create_test_matrix(2, 2)
|
||||
det2 = matrix2.determinant()
|
||||
assert det2 is not None
|
||||
|
||||
# Test non-square matrix
|
||||
matrix3 = create_test_matrix(2, 3)
|
||||
try:
|
||||
matrix3.determinant()
|
||||
assert False, "Should raise ValueError"
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def test_save_to_file(tmp_path):
|
||||
"""Test saving matrix to file"""
|
||||
import os
|
||||
|
||||
matrix = create_test_matrix(2, 2)
|
||||
file_path = os.path.join(tmp_path, "test_matrix.json")
|
||||
|
||||
matrix.save_to_file(file_path)
|
||||
assert os.path.exists(file_path)
|
||||
|
||||
# Verify file contents
|
||||
import json
|
||||
|
||||
with open(file_path, "r") as f:
|
||||
data = json.load(f)
|
||||
assert "agents" in data
|
||||
assert "outputs" in data
|
||||
assert len(data["agents"]) == 2
|
||||
assert len(data["agents"][0]) == 2
|
||||
|
||||
|
||||
def run_all_tests():
|
||||
"""Run all test functions"""
|
||||
test_functions = [
|
||||
test_init,
|
||||
test_transpose,
|
||||
test_add,
|
||||
test_scalar_multiply,
|
||||
test_multiply,
|
||||
test_subtract,
|
||||
test_identity,
|
||||
test_determinant,
|
||||
]
|
||||
|
||||
for test_func in test_functions:
|
||||
try:
|
||||
test_func()
|
||||
print(f"✅ {test_func.__name__} passed")
|
||||
except AssertionError as e:
|
||||
print(f"❌ {test_func.__name__} failed: {str(e)}")
|
||||
except Exception as e:
|
||||
print(
|
||||
f"❌ {test_func.__name__} failed with exception: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_all_tests()
|
@ -0,0 +1,306 @@
|
||||
import json
|
||||
from typing import Any, List
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from swarms import Agent
|
||||
|
||||
|
||||
class AgentOutput(BaseModel):
|
||||
"""
|
||||
Schema for capturing metadata and results of an agent run.
|
||||
"""
|
||||
|
||||
agent_name: str = Field(..., description="Name of the agent.")
|
||||
input_query: str = Field(
|
||||
..., description="Input query provided to the agent."
|
||||
)
|
||||
output_result: Any = Field(
|
||||
..., description="Result produced by the agent."
|
||||
)
|
||||
metadata: dict = Field(
|
||||
..., description="Additional metadata about the agent run."
|
||||
)
|
||||
|
||||
|
||||
class MatrixSwarm:
|
||||
"""
|
||||
A class to manage a matrix of agents and perform matrix operations similar to linear algebra.
|
||||
"""
|
||||
|
||||
def __init__(self, agents: List[List[Agent]]):
|
||||
"""
|
||||
Initializes the MatrixSwarm with a 2D list of agents.
|
||||
Args:
|
||||
agents (List[List[Agent]]): 2D list of agents representing the matrix.
|
||||
"""
|
||||
if not agents or not all(
|
||||
isinstance(row, list) for row in agents
|
||||
):
|
||||
raise ValueError("Agents must be provided as a 2D list.")
|
||||
if not all(
|
||||
isinstance(agent, Agent)
|
||||
for row in agents
|
||||
for agent in row
|
||||
):
|
||||
raise ValueError(
|
||||
"All elements of the matrix must be instances of `Agent`."
|
||||
)
|
||||
self.agents = agents
|
||||
self.outputs = [] # List to store outputs as AgentOutput
|
||||
|
||||
def validate_dimensions(self, other: "MatrixSwarm") -> None:
|
||||
"""
|
||||
Validates that two matrices have compatible dimensions for operations.
|
||||
|
||||
Args:
|
||||
other (MatrixSwarm): Another MatrixSwarm.
|
||||
|
||||
Raises:
|
||||
ValueError: If dimensions are incompatible.
|
||||
"""
|
||||
if len(self.agents) != len(other.agents) or len(
|
||||
self.agents[0]
|
||||
) != len(other.agents[0]):
|
||||
raise ValueError(
|
||||
"Matrix dimensions are incompatible for this operation."
|
||||
)
|
||||
|
||||
def transpose(self) -> "MatrixSwarm":
|
||||
"""
|
||||
Transposes the matrix of agents (swap rows and columns).
|
||||
|
||||
Returns:
|
||||
MatrixSwarm: A new transposed MatrixSwarm.
|
||||
"""
|
||||
transposed_agents = [
|
||||
[self.agents[j][i] for j in range(len(self.agents))]
|
||||
for i in range(len(self.agents[0]))
|
||||
]
|
||||
return MatrixSwarm(transposed_agents)
|
||||
|
||||
def add(self, other: "MatrixSwarm") -> "MatrixSwarm":
|
||||
"""
|
||||
Adds two matrices element-wise.
|
||||
|
||||
Args:
|
||||
other (MatrixSwarm): Another MatrixSwarm to add.
|
||||
|
||||
Returns:
|
||||
MatrixSwarm: A new MatrixSwarm resulting from the addition.
|
||||
"""
|
||||
self.validate_dimensions(other)
|
||||
added_agents = [
|
||||
[self.agents[i][j] for j in range(len(self.agents[i]))]
|
||||
for i in range(len(self.agents))
|
||||
]
|
||||
return MatrixSwarm(added_agents)
|
||||
|
||||
def scalar_multiply(self, scalar: int) -> "MatrixSwarm":
|
||||
"""
|
||||
Scales the agents by duplicating them scalar times along the row.
|
||||
|
||||
Args:
|
||||
scalar (int): The scalar multiplier.
|
||||
|
||||
Returns:
|
||||
MatrixSwarm: A new MatrixSwarm where each agent is repeated scalar times along the row.
|
||||
"""
|
||||
scaled_agents = [
|
||||
[agent for _ in range(scalar) for agent in row]
|
||||
for row in self.agents
|
||||
]
|
||||
return MatrixSwarm(scaled_agents)
|
||||
|
||||
def multiply(
|
||||
self, other: "MatrixSwarm", inputs: List[str]
|
||||
) -> List[List[AgentOutput]]:
|
||||
"""
|
||||
Multiplies two matrices (dot product between rows and columns).
|
||||
|
||||
Args:
|
||||
other (MatrixSwarm): Another MatrixSwarm for multiplication.
|
||||
inputs (List[str]): A list of input queries for the agents.
|
||||
|
||||
Returns:
|
||||
List[List[AgentOutput]]: A resulting matrix of outputs after multiplication.
|
||||
"""
|
||||
if len(self.agents[0]) != len(other.agents):
|
||||
raise ValueError(
|
||||
"Matrix dimensions are incompatible for multiplication."
|
||||
)
|
||||
|
||||
results = []
|
||||
for i, row in enumerate(self.agents):
|
||||
row_results = []
|
||||
for col_idx in range(len(other.agents[0])):
|
||||
col = [
|
||||
other.agents[row_idx][col_idx]
|
||||
for row_idx in range(len(other.agents))
|
||||
]
|
||||
query = inputs[
|
||||
i
|
||||
] # Input query for the corresponding row
|
||||
intermediate_result = []
|
||||
|
||||
for agent_r, agent_c in zip(row, col):
|
||||
try:
|
||||
result = agent_r.run(query)
|
||||
intermediate_result.append(result)
|
||||
except Exception as e:
|
||||
intermediate_result.append(f"Error: {e}")
|
||||
|
||||
# Aggregate outputs from dot product
|
||||
combined_result = " ".join(
|
||||
intermediate_result
|
||||
) # Example aggregation
|
||||
row_results.append(
|
||||
AgentOutput(
|
||||
agent_name=f"DotProduct-{i}-{col_idx}",
|
||||
input_query=query,
|
||||
output_result=combined_result,
|
||||
metadata={"row": i, "col": col_idx},
|
||||
)
|
||||
)
|
||||
results.append(row_results)
|
||||
return results
|
||||
|
||||
def subtract(self, other: "MatrixSwarm") -> "MatrixSwarm":
|
||||
"""
|
||||
Subtracts two matrices element-wise.
|
||||
|
||||
Args:
|
||||
other (MatrixSwarm): Another MatrixSwarm to subtract.
|
||||
|
||||
Returns:
|
||||
MatrixSwarm: A new MatrixSwarm resulting from the subtraction.
|
||||
"""
|
||||
self.validate_dimensions(other)
|
||||
subtracted_agents = [
|
||||
[self.agents[i][j] for j in range(len(self.agents[i]))]
|
||||
for i in range(len(self.agents))
|
||||
]
|
||||
return MatrixSwarm(subtracted_agents)
|
||||
|
||||
def identity(self, size: int) -> "MatrixSwarm":
|
||||
"""
|
||||
Creates an identity matrix of agents with size `size`.
|
||||
|
||||
Args:
|
||||
size (int): Size of the identity matrix (NxN).
|
||||
|
||||
Returns:
|
||||
MatrixSwarm: An identity MatrixSwarm.
|
||||
"""
|
||||
identity_agents = [
|
||||
[
|
||||
(
|
||||
self.agents[i][j]
|
||||
if i == j
|
||||
else Agent(
|
||||
agent_name=f"Zero-Agent-{i}-{j}",
|
||||
system_prompt="",
|
||||
)
|
||||
)
|
||||
for j in range(size)
|
||||
]
|
||||
for i in range(size)
|
||||
]
|
||||
return MatrixSwarm(identity_agents)
|
||||
|
||||
def determinant(self) -> Any:
|
||||
"""
|
||||
Computes the determinant of a square MatrixSwarm.
|
||||
|
||||
Returns:
|
||||
Any: Determinant of the matrix (as agent outputs).
|
||||
"""
|
||||
if len(self.agents) != len(self.agents[0]):
|
||||
raise ValueError(
|
||||
"Determinant can only be computed for square matrices."
|
||||
)
|
||||
|
||||
# Recursive determinant calculation (example using placeholder logic)
|
||||
if len(self.agents) == 1:
|
||||
return self.agents[0][0].run("Compute determinant")
|
||||
|
||||
det_result = 0
|
||||
for i in range(len(self.agents)):
|
||||
submatrix = MatrixSwarm(
|
||||
[row[:i] + row[i + 1 :] for row in self.agents[1:]]
|
||||
)
|
||||
cofactor = ((-1) ** i) * self.agents[0][i].run(
|
||||
"Compute determinant"
|
||||
)
|
||||
det_result += cofactor * submatrix.determinant()
|
||||
return det_result
|
||||
|
||||
def save_to_file(self, path: str) -> None:
|
||||
"""
|
||||
Saves the agent matrix structure and metadata to a file.
|
||||
|
||||
Args:
|
||||
path (str): File path to save the matrix.
|
||||
"""
|
||||
try:
|
||||
matrix_data = {
|
||||
"agents": [
|
||||
[agent.agent_name for agent in row]
|
||||
for row in self.agents
|
||||
],
|
||||
"outputs": [output.dict() for output in self.outputs],
|
||||
}
|
||||
with open(path, "w") as f:
|
||||
json.dump(matrix_data, f, indent=4)
|
||||
logger.info(f"MatrixSwarm saved to {path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving MatrixSwarm: {e}")
|
||||
|
||||
|
||||
# # Example usage
|
||||
# if __name__ == "__main__":
|
||||
# from swarms.prompts.finance_agent_sys_prompt import (
|
||||
# FINANCIAL_AGENT_SYS_PROMPT,
|
||||
# )
|
||||
|
||||
# # Create a 3x3 matrix of agents
|
||||
# agents = [
|
||||
# [
|
||||
# Agent(
|
||||
# agent_name=f"Agent-{i}-{j}",
|
||||
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
|
||||
# model_name="gpt-4o-mini",
|
||||
# max_loops=1,
|
||||
# autosave=True,
|
||||
# dashboard=False,
|
||||
# verbose=True,
|
||||
# dynamic_temperature_enabled=True,
|
||||
# saved_state_path=f"agent_{i}_{j}.json",
|
||||
# user_name="swarms_corp",
|
||||
# retry_attempts=1,
|
||||
# context_length=200000,
|
||||
# return_step_meta=False,
|
||||
# output_type="string",
|
||||
# streaming_on=False,
|
||||
# )
|
||||
# for j in range(3)
|
||||
# ]
|
||||
# for i in range(3)
|
||||
# ]
|
||||
|
||||
# # Initialize the matrix
|
||||
# agent_matrix = MatrixSwarm(agents)
|
||||
|
||||
# # Example queries
|
||||
# inputs = [
|
||||
# "Explain Roth IRA benefits",
|
||||
# "Differences between ETFs and mutual funds",
|
||||
# "How to create a diversified portfolio",
|
||||
# ]
|
||||
|
||||
# # Run agents
|
||||
# outputs = agent_matrix.multiply(agent_matrix.transpose(), inputs)
|
||||
|
||||
# # Save results
|
||||
# agent_matrix.save_to_file("agent_matrix_results.json")
|
@ -0,0 +1,216 @@
|
||||
from swarms.structs.matrix_swarm import MatrixSwarm, AgentOutput
|
||||
from swarms import Agent
|
||||
|
||||
|
||||
def create_test_matrix(rows: int, cols: int) -> MatrixSwarm:
|
||||
"""Helper function to create a test agent matrix"""
|
||||
agents = [
|
||||
[
|
||||
Agent(
|
||||
agent_name=f"TestAgent-{i}-{j}",
|
||||
system_prompt="Test prompt",
|
||||
)
|
||||
for j in range(cols)
|
||||
]
|
||||
for i in range(rows)
|
||||
]
|
||||
return MatrixSwarm(agents)
|
||||
|
||||
|
||||
def test_init():
|
||||
"""Test MatrixSwarm initialization"""
|
||||
# Test valid initialization
|
||||
matrix = create_test_matrix(2, 2)
|
||||
assert isinstance(matrix, MatrixSwarm)
|
||||
assert len(matrix.agents) == 2
|
||||
assert len(matrix.agents[0]) == 2
|
||||
|
||||
# Test invalid initialization
|
||||
try:
|
||||
MatrixSwarm([[1, 2], [3, 4]]) # Non-agent elements
|
||||
assert False, "Should raise ValueError"
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
try:
|
||||
MatrixSwarm([]) # Empty matrix
|
||||
assert False, "Should raise ValueError"
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def test_transpose():
|
||||
"""Test matrix transpose operation"""
|
||||
matrix = create_test_matrix(2, 3)
|
||||
transposed = matrix.transpose()
|
||||
|
||||
assert len(transposed.agents) == 3 # Original cols become rows
|
||||
assert len(transposed.agents[0]) == 2 # Original rows become cols
|
||||
|
||||
# Verify agent positions
|
||||
for i in range(2):
|
||||
for j in range(3):
|
||||
assert (
|
||||
matrix.agents[i][j].agent_name
|
||||
== transposed.agents[j][i].agent_name
|
||||
)
|
||||
|
||||
|
||||
def test_add():
|
||||
"""Test matrix addition"""
|
||||
matrix1 = create_test_matrix(2, 2)
|
||||
matrix2 = create_test_matrix(2, 2)
|
||||
|
||||
result = matrix1.add(matrix2)
|
||||
assert len(result.agents) == 2
|
||||
assert len(result.agents[0]) == 2
|
||||
|
||||
# Test incompatible dimensions
|
||||
matrix3 = create_test_matrix(2, 3)
|
||||
try:
|
||||
matrix1.add(matrix3)
|
||||
assert False, "Should raise ValueError"
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def test_scalar_multiply():
|
||||
"""Test scalar multiplication"""
|
||||
matrix = create_test_matrix(2, 2)
|
||||
scalar = 3
|
||||
result = matrix.scalar_multiply(scalar)
|
||||
|
||||
assert len(result.agents) == 2
|
||||
assert len(result.agents[0]) == 2 * scalar
|
||||
|
||||
# Verify agent duplication
|
||||
for i in range(len(result.agents)):
|
||||
for j in range(0, len(result.agents[0]), scalar):
|
||||
original_agent = matrix.agents[i][j // scalar]
|
||||
for k in range(scalar):
|
||||
assert (
|
||||
result.agents[i][j + k].agent_name
|
||||
== original_agent.agent_name
|
||||
)
|
||||
|
||||
|
||||
def test_multiply():
|
||||
"""Test matrix multiplication"""
|
||||
matrix1 = create_test_matrix(2, 3)
|
||||
matrix2 = create_test_matrix(3, 2)
|
||||
inputs = ["test query 1", "test query 2"]
|
||||
|
||||
result = matrix1.multiply(matrix2, inputs)
|
||||
assert len(result) == 2 # Number of rows in first matrix
|
||||
assert len(result[0]) == 2 # Number of columns in second matrix
|
||||
|
||||
# Verify output structure
|
||||
for row in result:
|
||||
for output in row:
|
||||
assert isinstance(output, AgentOutput)
|
||||
assert isinstance(output.input_query, str)
|
||||
assert isinstance(output.metadata, dict)
|
||||
|
||||
|
||||
def test_subtract():
|
||||
"""Test matrix subtraction"""
|
||||
matrix1 = create_test_matrix(2, 2)
|
||||
matrix2 = create_test_matrix(2, 2)
|
||||
|
||||
result = matrix1.subtract(matrix2)
|
||||
assert len(result.agents) == 2
|
||||
assert len(result.agents[0]) == 2
|
||||
|
||||
|
||||
def test_identity():
|
||||
"""Test identity matrix creation"""
|
||||
matrix = create_test_matrix(3, 3)
|
||||
identity = matrix.identity(3)
|
||||
|
||||
assert len(identity.agents) == 3
|
||||
assert len(identity.agents[0]) == 3
|
||||
|
||||
# Verify diagonal elements are from original matrix
|
||||
for i in range(3):
|
||||
assert (
|
||||
identity.agents[i][i].agent_name
|
||||
== matrix.agents[i][i].agent_name
|
||||
)
|
||||
|
||||
# Verify non-diagonal elements are zero agents
|
||||
for j in range(3):
|
||||
if i != j:
|
||||
assert identity.agents[i][j].agent_name.startswith(
|
||||
"Zero-Agent"
|
||||
)
|
||||
|
||||
|
||||
def test_determinant():
|
||||
"""Test determinant calculation"""
|
||||
# Test 1x1 matrix
|
||||
matrix1 = create_test_matrix(1, 1)
|
||||
det1 = matrix1.determinant()
|
||||
assert det1 is not None
|
||||
|
||||
# Test 2x2 matrix
|
||||
matrix2 = create_test_matrix(2, 2)
|
||||
det2 = matrix2.determinant()
|
||||
assert det2 is not None
|
||||
|
||||
# Test non-square matrix
|
||||
matrix3 = create_test_matrix(2, 3)
|
||||
try:
|
||||
matrix3.determinant()
|
||||
assert False, "Should raise ValueError"
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def test_save_to_file(tmp_path):
|
||||
"""Test saving matrix to file"""
|
||||
import os
|
||||
|
||||
matrix = create_test_matrix(2, 2)
|
||||
file_path = os.path.join(tmp_path, "test_matrix.json")
|
||||
|
||||
matrix.save_to_file(file_path)
|
||||
assert os.path.exists(file_path)
|
||||
|
||||
# Verify file contents
|
||||
import json
|
||||
|
||||
with open(file_path, "r") as f:
|
||||
data = json.load(f)
|
||||
assert "agents" in data
|
||||
assert "outputs" in data
|
||||
assert len(data["agents"]) == 2
|
||||
assert len(data["agents"][0]) == 2
|
||||
|
||||
|
||||
def run_all_tests():
|
||||
"""Run all test functions"""
|
||||
test_functions = [
|
||||
test_init,
|
||||
test_transpose,
|
||||
test_add,
|
||||
test_scalar_multiply,
|
||||
test_multiply,
|
||||
test_subtract,
|
||||
test_identity,
|
||||
test_determinant,
|
||||
]
|
||||
|
||||
for test_func in test_functions:
|
||||
try:
|
||||
test_func()
|
||||
print(f"✅ {test_func.__name__} passed")
|
||||
except AssertionError as e:
|
||||
print(f"❌ {test_func.__name__} failed: {str(e)}")
|
||||
except Exception as e:
|
||||
print(
|
||||
f"❌ {test_func.__name__} failed with exception: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_all_tests()
|
Loading…
Reference in new issue