From 638e9e2ba2eb968fa4d3a48c509081f663c5236d Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Sun, 2 Nov 2025 20:52:08 -0800 Subject: [PATCH] code cleanup op --- .../agent_router_example.py | 4 +- swarms/structs/agent_router.py | 8 +- swarms/structs/majority_voting.py | 22 +++- swarms/utils/pdf_to_text.py | 6 +- tests/structs/test_agent_router.py | 110 +++++++++++------- tests/structs/test_majority_voting.py | 19 ++- 6 files changed, 109 insertions(+), 60 deletions(-) diff --git a/examples/multi_agent/agent_router_examples/agent_router_example.py b/examples/multi_agent/agent_router_examples/agent_router_example.py index e0586647..0a174aed 100644 --- a/examples/multi_agent/agent_router_examples/agent_router_example.py +++ b/examples/multi_agent/agent_router_examples/agent_router_example.py @@ -29,7 +29,7 @@ agent_router = AgentRouter( "You are a medical lab and imaging interpretation agent. Take the patient's test results, imaging findings, and vitals, " "and interpret them in context of their symptoms. Suggest relevant follow-up diagnostics or considerations for the physician." ), - ), + ), ], ) @@ -37,4 +37,4 @@ result = agent_router.run( "I have a headache, fever, and cough. What could be wrong?" ) -print(result.agent_name) \ No newline at end of file +print(result.agent_name) diff --git a/swarms/structs/agent_router.py b/swarms/structs/agent_router.py index 55876bb2..bb000fd1 100644 --- a/swarms/structs/agent_router.py +++ b/swarms/structs/agent_router.py @@ -83,11 +83,13 @@ class AgentRouter: f"Unexpected embedding response structure: {type(response.data[0])}" ) else: - logger.error(f"Unexpected response structure: {response}") + logger.error( + f"Unexpected response structure: {response}" + ) raise ValueError( f"Unexpected embedding response structure: {type(response)}" ) - + return embedding_vector except Exception as e: @@ -278,7 +280,7 @@ class AgentRouter: except Exception as e: logger.error(f"Error finding best agent: {str(e)}") raise - + def run(self, task: str) -> Optional[AgentType]: """ Run the agent router on a given task. diff --git a/swarms/structs/majority_voting.py b/swarms/structs/majority_voting.py index 85ff31d7..7659e721 100644 --- a/swarms/structs/majority_voting.py +++ b/swarms/structs/majority_voting.py @@ -168,7 +168,15 @@ class MajorityVoting: title="Majority Voting", ) - def run(self, task: str, streaming_callback: Optional[Callable[[str, str, bool], None]] = None, *args, **kwargs) -> List[Any]: + def run( + self, + task: str, + streaming_callback: Optional[ + Callable[[str, str, bool], None] + ] = None, + *args, + **kwargs, + ) -> List[Any]: """ Runs the majority voting system with multi-loop functionality and returns the majority vote. @@ -200,24 +208,30 @@ class MajorityVoting: role=agent.agent_name, content=output, ) - + # Set streaming_on for the consensus agent based on the provided streaming_callback - self.consensus_agent.streaming_on = streaming_callback is not None + self.consensus_agent.streaming_on = ( + streaming_callback is not None + ) # Instead of a simple passthrough wrapper, match the callback invocation pattern from the provided reference for the consensus agent: consensus_agent_name = self.consensus_agent.agent_name if streaming_callback is not None: + def consensus_streaming_callback(chunk: str): """Wrapper for consensus agent streaming callback.""" try: if chunk is not None and chunk.strip(): - streaming_callback(consensus_agent_name, chunk, False) + streaming_callback( + consensus_agent_name, chunk, False + ) except Exception as callback_error: if self.verbose: logger.warning( f"[STREAMING] Callback failed for {consensus_agent_name}: {str(callback_error)}" ) + else: consensus_streaming_callback = None diff --git a/swarms/utils/pdf_to_text.py b/swarms/utils/pdf_to_text.py index c54d178b..a7bf396c 100644 --- a/swarms/utils/pdf_to_text.py +++ b/swarms/utils/pdf_to_text.py @@ -12,11 +12,13 @@ def pdf_to_text(pdf_path: str) -> str: FileNotFoundError: If the PDF file is not found at the specified path. Exception: If there is an error in reading the PDF file. """ - + try: import pypdf except ImportError: - raise ImportError("pypdf is not installed. Please install it using `pip install pypdf`.") + raise ImportError( + "pypdf is not installed. Please install it using `pip install pypdf`." + ) try: # Open the PDF file diff --git a/tests/structs/test_agent_router.py b/tests/structs/test_agent_router.py index e045faba..80d9d214 100644 --- a/tests/structs/test_agent_router.py +++ b/tests/structs/test_agent_router.py @@ -1,4 +1,4 @@ -import pytest + # from unittest.mock import Mock, patch from swarms.structs.agent_router import AgentRouter @@ -98,7 +98,7 @@ def test_generate_embedding_success(): """Test successful embedding generation with real API.""" router = AgentRouter() result = router._generate_embedding("test text") - + assert result is not None assert isinstance(result, list) assert len(result) > 0 @@ -118,7 +118,7 @@ def test_add_agent_success(): print_on=False, streaming_on=True, ) - + router.add_agent(agent) assert len(router.agents) == 1 @@ -126,16 +126,20 @@ def test_add_agent_success(): assert len(router.agent_metadata) == 1 assert router.agents[0] == agent assert router.agent_metadata[0]["name"] == "test_agent" - + # Test that agent can stream streamed_chunks = [] - + def streaming_callback(chunk: str): streamed_chunks.append(chunk) - - response = agent.run("Say hello", streaming_callback=streaming_callback) + + response = agent.run( + "Say hello", streaming_callback=streaming_callback + ) assert response is not None - assert len(streamed_chunks) > 0 or response != "", "Agent should stream or return response" + assert ( + len(streamed_chunks) > 0 or response != "" + ), "Agent should stream or return response" def test_add_agents_multiple(): @@ -173,17 +177,21 @@ def test_add_agents_multiple(): assert len(router.agents) == 3 assert len(router.agent_embeddings) == 3 assert len(router.agent_metadata) == 3 - + # Test that all agents can stream for agent in agents: streamed_chunks = [] - + def streaming_callback(chunk: str): streamed_chunks.append(chunk) - - response = agent.run("Say hi", streaming_callback=streaming_callback) + + response = agent.run( + "Say hi", streaming_callback=streaming_callback + ) assert response is not None - assert len(streamed_chunks) > 0 or response != "", f"Agent {agent.agent_name} should stream or return response" + assert ( + len(streamed_chunks) > 0 or response != "" + ), f"Agent {agent.agent_name} should stream or return response" def test_find_best_agent_success(): @@ -214,19 +222,23 @@ def test_find_best_agent_success(): router.add_agent(agent2) result = router.find_best_agent("Extract data from documents") - + assert result is not None assert result in [agent1, agent2] - + # Test that the found agent can stream streamed_chunks = [] - + def streaming_callback(chunk: str): streamed_chunks.append(chunk) - - response = result.run("Test task", streaming_callback=streaming_callback) + + response = result.run( + "Test task", streaming_callback=streaming_callback + ) assert response is not None - assert len(streamed_chunks) > 0 or response != "", "Found agent should stream or return response" + assert ( + len(streamed_chunks) > 0 or response != "" + ), "Found agent should stream or return response" def test_find_best_agent_no_agents(): @@ -253,14 +265,16 @@ def test_update_agent_history_success(): ) router.add_agent(agent) - + # Run agent to create history streamed_chunks = [] - + def streaming_callback(chunk: str): streamed_chunks.append(chunk) - - agent.run("Hello, how are you?", streaming_callback=streaming_callback) + + agent.run( + "Hello, how are you?", streaming_callback=streaming_callback + ) # Update agent history router.update_agent_history("test_agent") @@ -322,7 +336,7 @@ def test_agent_router_edge_cases(): def test_router_with_agent_streaming(): """Test that agents in router can stream when run.""" router = AgentRouter() - + agent1 = Agent( agent_name="streaming_agent1", agent_description="Agent for testing streaming", @@ -333,7 +347,7 @@ def test_router_with_agent_streaming(): print_on=False, streaming_on=True, ) - + agent2 = Agent( agent_name="streaming_agent2", agent_description="Another agent for testing streaming", @@ -344,27 +358,32 @@ def test_router_with_agent_streaming(): print_on=False, streaming_on=True, ) - + router.add_agent(agent1) router.add_agent(agent2) - + # Test each agent streams for agent in router.agents: streamed_chunks = [] - + def streaming_callback(chunk: str): if chunk: streamed_chunks.append(chunk) - - response = agent.run("Tell me a short joke", streaming_callback=streaming_callback) + + response = agent.run( + "Tell me a short joke", + streaming_callback=streaming_callback, + ) assert response is not None - assert len(streamed_chunks) > 0 or response != "", f"Agent {agent.agent_name} should stream" + assert ( + len(streamed_chunks) > 0 or response != "" + ), f"Agent {agent.agent_name} should stream" def test_router_find_and_run_with_streaming(): """Test finding best agent and running it with streaming.""" router = AgentRouter() - + agent1 = Agent( agent_name="math_agent", agent_description="Handles mathematical problems", @@ -375,7 +394,7 @@ def test_router_find_and_run_with_streaming(): print_on=False, streaming_on=True, ) - + agent2 = Agent( agent_name="writing_agent", agent_description="Handles writing tasks", @@ -386,23 +405,27 @@ def test_router_find_and_run_with_streaming(): print_on=False, streaming_on=True, ) - + router.add_agent(agent1) router.add_agent(agent2) - + # Find best agent for a math task best_agent = router.find_best_agent("Solve 2 + 2") - + if best_agent: streamed_chunks = [] - + def streaming_callback(chunk: str): if chunk: streamed_chunks.append(chunk) - - response = best_agent.run("What is 2 + 2?", streaming_callback=streaming_callback) + + response = best_agent.run( + "What is 2 + 2?", streaming_callback=streaming_callback + ) assert response is not None - assert len(streamed_chunks) > 0 or response != "", "Best agent should stream when run" + assert ( + len(streamed_chunks) > 0 or response != "" + ), "Best agent should stream when run" if __name__ == "__main__": @@ -426,12 +449,12 @@ if __name__ == "__main__": test_router_with_agent_streaming, test_router_find_and_run_with_streaming, ] - + # Run all tests print("Running all tests...") passed = 0 failed = 0 - + for test_func in tests: try: print(f"\n{'='*60}") @@ -444,9 +467,10 @@ if __name__ == "__main__": print(f"✗ FAILED: {test_func.__name__}") print(f" Error: {str(e)}") import traceback + traceback.print_exc() failed += 1 - + print(f"\n{'='*60}") print(f"Test Summary: {passed} passed, {failed} failed") print(f"{'='*60}") diff --git a/tests/structs/test_majority_voting.py b/tests/structs/test_majority_voting.py index 46e2e452..e36e94a0 100644 --- a/tests/structs/test_majority_voting.py +++ b/tests/structs/test_majority_voting.py @@ -202,12 +202,16 @@ def test_majority_voting_different_output_types(): # Assert majority vote is correct assert majority_vote is not None + def test_streaming_majority_voting(): """ Test the streaming_majority_voting with logging/try-except and assertion. """ logs = [] - def streaming_callback(agent_name: str, chunk: str, is_final: bool): + + def streaming_callback( + agent_name: str, chunk: str, is_final: bool + ): # Chunk buffer static per call (reset each session) if not hasattr(streaming_callback, "_buffer"): streaming_callback._buffer = "" @@ -218,7 +222,10 @@ def test_streaming_majority_voting(): if chunk: streaming_callback._buffer += chunk streaming_callback._buffer_size += len(chunk) - if streaming_callback._buffer_size >= min_chunk_size or is_final: + if ( + streaming_callback._buffer_size >= min_chunk_size + or is_final + ): if streaming_callback._buffer: print(streaming_callback._buffer, end="", flush=True) logs.append(streaming_callback._buffer) @@ -226,7 +233,7 @@ def test_streaming_majority_voting(): streaming_callback._buffer_size = 0 if is_final: print() - + try: # Initialize the agent agent = Agent( @@ -245,11 +252,11 @@ def test_streaming_majority_voting(): max_tokens=4000, # max output tokens saved_state_path="agent_00.json", interactive=False, - streaming_on=True, #if concurrent agents want to be streamed + streaming_on=True, # if concurrent agents want to be streamed ) - + swarm = MajorityVoting(agents=[agent, agent, agent]) - + result = swarm.run( "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", streaming_callback=streaming_callback,