code cleanup op

pull/1175/head
Kye Gomez 4 days ago
parent 156bc595df
commit 638e9e2ba2

@ -29,7 +29,7 @@ agent_router = AgentRouter(
"You are a medical lab and imaging interpretation agent. Take the patient's test results, imaging findings, and vitals, "
"and interpret them in context of their symptoms. Suggest relevant follow-up diagnostics or considerations for the physician."
),
),
),
],
)
@ -37,4 +37,4 @@ result = agent_router.run(
"I have a headache, fever, and cough. What could be wrong?"
)
print(result.agent_name)
print(result.agent_name)

@ -83,11 +83,13 @@ class AgentRouter:
f"Unexpected embedding response structure: {type(response.data[0])}"
)
else:
logger.error(f"Unexpected response structure: {response}")
logger.error(
f"Unexpected response structure: {response}"
)
raise ValueError(
f"Unexpected embedding response structure: {type(response)}"
)
return embedding_vector
except Exception as e:
@ -278,7 +280,7 @@ class AgentRouter:
except Exception as e:
logger.error(f"Error finding best agent: {str(e)}")
raise
def run(self, task: str) -> Optional[AgentType]:
"""
Run the agent router on a given task.

@ -168,7 +168,15 @@ class MajorityVoting:
title="Majority Voting",
)
def run(self, task: str, streaming_callback: Optional[Callable[[str, str, bool], None]] = None, *args, **kwargs) -> List[Any]:
def run(
self,
task: str,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
*args,
**kwargs,
) -> List[Any]:
"""
Runs the majority voting system with multi-loop functionality and returns the majority vote.
@ -200,24 +208,30 @@ class MajorityVoting:
role=agent.agent_name,
content=output,
)
# Set streaming_on for the consensus agent based on the provided streaming_callback
self.consensus_agent.streaming_on = streaming_callback is not None
self.consensus_agent.streaming_on = (
streaming_callback is not None
)
# Instead of a simple passthrough wrapper, match the callback invocation pattern from the provided reference for the consensus agent:
consensus_agent_name = self.consensus_agent.agent_name
if streaming_callback is not None:
def consensus_streaming_callback(chunk: str):
"""Wrapper for consensus agent streaming callback."""
try:
if chunk is not None and chunk.strip():
streaming_callback(consensus_agent_name, chunk, False)
streaming_callback(
consensus_agent_name, chunk, False
)
except Exception as callback_error:
if self.verbose:
logger.warning(
f"[STREAMING] Callback failed for {consensus_agent_name}: {str(callback_error)}"
)
else:
consensus_streaming_callback = None

@ -12,11 +12,13 @@ def pdf_to_text(pdf_path: str) -> str:
FileNotFoundError: If the PDF file is not found at the specified path.
Exception: If there is an error in reading the PDF file.
"""
try:
import pypdf
except ImportError:
raise ImportError("pypdf is not installed. Please install it using `pip install pypdf`.")
raise ImportError(
"pypdf is not installed. Please install it using `pip install pypdf`."
)
try:
# Open the PDF file

@ -1,4 +1,4 @@
import pytest
# from unittest.mock import Mock, patch
from swarms.structs.agent_router import AgentRouter
@ -98,7 +98,7 @@ def test_generate_embedding_success():
"""Test successful embedding generation with real API."""
router = AgentRouter()
result = router._generate_embedding("test text")
assert result is not None
assert isinstance(result, list)
assert len(result) > 0
@ -118,7 +118,7 @@ def test_add_agent_success():
print_on=False,
streaming_on=True,
)
router.add_agent(agent)
assert len(router.agents) == 1
@ -126,16 +126,20 @@ def test_add_agent_success():
assert len(router.agent_metadata) == 1
assert router.agents[0] == agent
assert router.agent_metadata[0]["name"] == "test_agent"
# Test that agent can stream
streamed_chunks = []
def streaming_callback(chunk: str):
streamed_chunks.append(chunk)
response = agent.run("Say hello", streaming_callback=streaming_callback)
response = agent.run(
"Say hello", streaming_callback=streaming_callback
)
assert response is not None
assert len(streamed_chunks) > 0 or response != "", "Agent should stream or return response"
assert (
len(streamed_chunks) > 0 or response != ""
), "Agent should stream or return response"
def test_add_agents_multiple():
@ -173,17 +177,21 @@ def test_add_agents_multiple():
assert len(router.agents) == 3
assert len(router.agent_embeddings) == 3
assert len(router.agent_metadata) == 3
# Test that all agents can stream
for agent in agents:
streamed_chunks = []
def streaming_callback(chunk: str):
streamed_chunks.append(chunk)
response = agent.run("Say hi", streaming_callback=streaming_callback)
response = agent.run(
"Say hi", streaming_callback=streaming_callback
)
assert response is not None
assert len(streamed_chunks) > 0 or response != "", f"Agent {agent.agent_name} should stream or return response"
assert (
len(streamed_chunks) > 0 or response != ""
), f"Agent {agent.agent_name} should stream or return response"
def test_find_best_agent_success():
@ -214,19 +222,23 @@ def test_find_best_agent_success():
router.add_agent(agent2)
result = router.find_best_agent("Extract data from documents")
assert result is not None
assert result in [agent1, agent2]
# Test that the found agent can stream
streamed_chunks = []
def streaming_callback(chunk: str):
streamed_chunks.append(chunk)
response = result.run("Test task", streaming_callback=streaming_callback)
response = result.run(
"Test task", streaming_callback=streaming_callback
)
assert response is not None
assert len(streamed_chunks) > 0 or response != "", "Found agent should stream or return response"
assert (
len(streamed_chunks) > 0 or response != ""
), "Found agent should stream or return response"
def test_find_best_agent_no_agents():
@ -253,14 +265,16 @@ def test_update_agent_history_success():
)
router.add_agent(agent)
# Run agent to create history
streamed_chunks = []
def streaming_callback(chunk: str):
streamed_chunks.append(chunk)
agent.run("Hello, how are you?", streaming_callback=streaming_callback)
agent.run(
"Hello, how are you?", streaming_callback=streaming_callback
)
# Update agent history
router.update_agent_history("test_agent")
@ -322,7 +336,7 @@ def test_agent_router_edge_cases():
def test_router_with_agent_streaming():
"""Test that agents in router can stream when run."""
router = AgentRouter()
agent1 = Agent(
agent_name="streaming_agent1",
agent_description="Agent for testing streaming",
@ -333,7 +347,7 @@ def test_router_with_agent_streaming():
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="streaming_agent2",
agent_description="Another agent for testing streaming",
@ -344,27 +358,32 @@ def test_router_with_agent_streaming():
print_on=False,
streaming_on=True,
)
router.add_agent(agent1)
router.add_agent(agent2)
# Test each agent streams
for agent in router.agents:
streamed_chunks = []
def streaming_callback(chunk: str):
if chunk:
streamed_chunks.append(chunk)
response = agent.run("Tell me a short joke", streaming_callback=streaming_callback)
response = agent.run(
"Tell me a short joke",
streaming_callback=streaming_callback,
)
assert response is not None
assert len(streamed_chunks) > 0 or response != "", f"Agent {agent.agent_name} should stream"
assert (
len(streamed_chunks) > 0 or response != ""
), f"Agent {agent.agent_name} should stream"
def test_router_find_and_run_with_streaming():
"""Test finding best agent and running it with streaming."""
router = AgentRouter()
agent1 = Agent(
agent_name="math_agent",
agent_description="Handles mathematical problems",
@ -375,7 +394,7 @@ def test_router_find_and_run_with_streaming():
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="writing_agent",
agent_description="Handles writing tasks",
@ -386,23 +405,27 @@ def test_router_find_and_run_with_streaming():
print_on=False,
streaming_on=True,
)
router.add_agent(agent1)
router.add_agent(agent2)
# Find best agent for a math task
best_agent = router.find_best_agent("Solve 2 + 2")
if best_agent:
streamed_chunks = []
def streaming_callback(chunk: str):
if chunk:
streamed_chunks.append(chunk)
response = best_agent.run("What is 2 + 2?", streaming_callback=streaming_callback)
response = best_agent.run(
"What is 2 + 2?", streaming_callback=streaming_callback
)
assert response is not None
assert len(streamed_chunks) > 0 or response != "", "Best agent should stream when run"
assert (
len(streamed_chunks) > 0 or response != ""
), "Best agent should stream when run"
if __name__ == "__main__":
@ -426,12 +449,12 @@ if __name__ == "__main__":
test_router_with_agent_streaming,
test_router_find_and_run_with_streaming,
]
# Run all tests
print("Running all tests...")
passed = 0
failed = 0
for test_func in tests:
try:
print(f"\n{'='*60}")
@ -444,9 +467,10 @@ if __name__ == "__main__":
print(f"✗ FAILED: {test_func.__name__}")
print(f" Error: {str(e)}")
import traceback
traceback.print_exc()
failed += 1
print(f"\n{'='*60}")
print(f"Test Summary: {passed} passed, {failed} failed")
print(f"{'='*60}")

@ -202,12 +202,16 @@ def test_majority_voting_different_output_types():
# Assert majority vote is correct
assert majority_vote is not None
def test_streaming_majority_voting():
"""
Test the streaming_majority_voting with logging/try-except and assertion.
"""
logs = []
def streaming_callback(agent_name: str, chunk: str, is_final: bool):
def streaming_callback(
agent_name: str, chunk: str, is_final: bool
):
# Chunk buffer static per call (reset each session)
if not hasattr(streaming_callback, "_buffer"):
streaming_callback._buffer = ""
@ -218,7 +222,10 @@ def test_streaming_majority_voting():
if chunk:
streaming_callback._buffer += chunk
streaming_callback._buffer_size += len(chunk)
if streaming_callback._buffer_size >= min_chunk_size or is_final:
if (
streaming_callback._buffer_size >= min_chunk_size
or is_final
):
if streaming_callback._buffer:
print(streaming_callback._buffer, end="", flush=True)
logs.append(streaming_callback._buffer)
@ -226,7 +233,7 @@ def test_streaming_majority_voting():
streaming_callback._buffer_size = 0
if is_final:
print()
try:
# Initialize the agent
agent = Agent(
@ -245,11 +252,11 @@ def test_streaming_majority_voting():
max_tokens=4000, # max output tokens
saved_state_path="agent_00.json",
interactive=False,
streaming_on=True, #if concurrent agents want to be streamed
streaming_on=True, # if concurrent agents want to be streamed
)
swarm = MajorityVoting(agents=[agent, agent, agent])
result = swarm.run(
"Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
streaming_callback=streaming_callback,

Loading…
Cancel
Save