From 6b8010132356e1b7a2d149e0e1360b7468183e86 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Sat, 18 Oct 2025 15:12:41 -0700 Subject: [PATCH 1/9] [FEAT] Majority Voting Streaming Feature Added streaming callback support to the majority voting agent. --- swarms/structs/majority_voting.py | 37 ++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/swarms/structs/majority_voting.py b/swarms/structs/majority_voting.py index dc9ad9d8..85ff31d7 100644 --- a/swarms/structs/majority_voting.py +++ b/swarms/structs/majority_voting.py @@ -13,6 +13,7 @@ from swarms.utils.history_output_formatter import ( ) from swarms.utils.loguru_logger import initialize_logger from swarms.utils.output_types import OutputType +from typing import Callable, Optional logger = initialize_logger(log_folder="majority_voting") @@ -65,9 +66,16 @@ def default_consensus_agent( system_prompt: str = None, description: str = "An agent that uses consensus to generate a final answer.", model_name: str = "gpt-4.1", + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ): + # If streaming_on is not None, force it to True; else, set to False + if streaming_callback is not None: + streaming_on_value = True + else: + streaming_on_value = False + return Agent( agent_name=name, agent_description=description, @@ -76,6 +84,7 @@ def default_consensus_agent( system_prompt=system_prompt, dynamic_context_window=True, dynamic_temperature_enabled=True, + streaming_on=streaming_on_value, *args, **kwargs, ) @@ -159,7 +168,7 @@ class MajorityVoting: title="Majority Voting", ) - def run(self, task: str, *args, **kwargs) -> List[Any]: + def run(self, task: str, streaming_callback: Optional[Callable[[str, str, bool], None]] = None, *args, **kwargs) -> List[Any]: """ Runs the majority voting system with multi-loop functionality and returns the majority vote. @@ -179,6 +188,7 @@ class MajorityVoting: ) for i in range(self.max_loops): + output = run_agents_concurrently( agents=self.agents, task=self.conversation.get_str(), @@ -190,10 +200,31 @@ class MajorityVoting: role=agent.agent_name, content=output, ) - - # Now run the consensus agent + + # Set streaming_on for the consensus agent based on the provided streaming_callback + self.consensus_agent.streaming_on = streaming_callback is not None + + # Instead of a simple passthrough wrapper, match the callback invocation pattern from the provided reference for the consensus agent: + consensus_agent_name = self.consensus_agent.agent_name + + if streaming_callback is not None: + def consensus_streaming_callback(chunk: str): + """Wrapper for consensus agent streaming callback.""" + try: + if chunk is not None and chunk.strip(): + streaming_callback(consensus_agent_name, chunk, False) + except Exception as callback_error: + if self.verbose: + logger.warning( + f"[STREAMING] Callback failed for {consensus_agent_name}: {str(callback_error)}" + ) + else: + consensus_streaming_callback = None + + # Run the consensus agent with the streaming callback, if any consensus_output = self.consensus_agent.run( task=(f"History: {self.conversation.get_str()}"), + streaming_callback=consensus_streaming_callback, ) self.conversation.add( From d54857eacf5e880bd2f63f190c7f41bfa9b5234f Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Sat, 18 Oct 2025 15:13:33 -0700 Subject: [PATCH 2/9] Create test_majority_voting_streaming.py --- tests/structs/test_majority_voting_streaming.py | 1 + 1 file changed, 1 insertion(+) create mode 100644 tests/structs/test_majority_voting_streaming.py diff --git a/tests/structs/test_majority_voting_streaming.py b/tests/structs/test_majority_voting_streaming.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/tests/structs/test_majority_voting_streaming.py @@ -0,0 +1 @@ + From d81f2ffaf7c144baf3d82f6d37ad2fb78fa120f7 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Sat, 18 Oct 2025 15:14:05 -0700 Subject: [PATCH 3/9] [TEST] Streaming Callback for Majority Voting with custom Streaming Function --- .../structs/test_majority_voting_streaming.py | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/structs/test_majority_voting_streaming.py b/tests/structs/test_majority_voting_streaming.py index 8b137891..2872d75c 100644 --- a/tests/structs/test_majority_voting_streaming.py +++ b/tests/structs/test_majority_voting_streaming.py @@ -1 +1,57 @@ +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) +from swarms.structs.majority_voting import MajorityVoting +from dotenv import load_dotenv + +def streaming_callback(agent_name: str, chunk: str, is_final: bool): + # Chunk buffer static per call (reset each session) + if not hasattr(streaming_callback, "_buffer"): + streaming_callback._buffer = "" + streaming_callback._buffer_size = 0 + + min_chunk_size = 512 # or any large chunk size you want + + if chunk: + streaming_callback._buffer += chunk + streaming_callback._buffer_size += len(chunk) + if streaming_callback._buffer_size >= min_chunk_size or is_final: + if streaming_callback._buffer: + print(streaming_callback._buffer, end="", flush=True) + streaming_callback._buffer = "" + streaming_callback._buffer_size = 0 + if is_final: + print() + +load_dotenv() + + +# Initialize the agent +agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4.1", + dynamic_temperature_enabled=True, + user_name="swarms_corp", + retry_attempts=3, + context_length=8192, + return_step_meta=False, + output_type="str", # "json", "dict", "csv" OR "string" "yaml" and + auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task + max_tokens=4000, # max output tokens + saved_state_path="agent_00.json", + interactive=False, + streaming_on=True, #if concurrent agents want to be streamed +) + +swarm = MajorityVoting(agents=[agent, agent, agent]) + +swarm.run( + "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", + streaming_callback=streaming_callback, + +) From 090d731dab9d358576a4366a1b6b2a61d1b76c2e Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Sat, 18 Oct 2025 15:18:47 -0700 Subject: [PATCH 4/9] [DOCS] Streaming for Majority Voting Added an example demonstrating majority voting with a custom streaming callback function, including agent initialization and usage. --- docs/swarms/structs/majorityvoting.md | 63 +++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/docs/swarms/structs/majorityvoting.md b/docs/swarms/structs/majorityvoting.md index 5e62ffef..26e6c10e 100644 --- a/docs/swarms/structs/majorityvoting.md +++ b/docs/swarms/structs/majorityvoting.md @@ -854,3 +854,66 @@ for i, result in enumerate(concurrent_results, 1): else: print(f"Result: {result}") ``` +### Example 4: Majorty Voting with Custom Streaming Features + +This example demonstrates streaming callback with a custom streaming function. + +```python +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) +from swarms.structs.majority_voting import MajorityVoting +from dotenv import load_dotenv + +def streaming_callback(agent_name: str, chunk: str, is_final: bool): + # Chunk buffer static per call (reset each session) + if not hasattr(streaming_callback, "_buffer"): + streaming_callback._buffer = "" + streaming_callback._buffer_size = 0 + + min_chunk_size = 512 # or any large chunk size you want + + if chunk: + streaming_callback._buffer += chunk + streaming_callback._buffer_size += len(chunk) + if streaming_callback._buffer_size >= min_chunk_size or is_final: + if streaming_callback._buffer: + print(streaming_callback._buffer, end="", flush=True) + streaming_callback._buffer = "" + streaming_callback._buffer_size = 0 + if is_final: + print() + +load_dotenv() + + +# Initialize the agent +agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4.1", + dynamic_temperature_enabled=True, + user_name="swarms_corp", + retry_attempts=3, + context_length=8192, + return_step_meta=False, + output_type="str", # "json", "dict", "csv" OR "string" "yaml" and + auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task + max_tokens=4000, # max output tokens + saved_state_path="agent_00.json", + interactive=False, + streaming_on=True, #if concurrent agents want to be streamed +) + +swarm = MajorityVoting(agents=[agent, agent, agent]) + +swarm.run( + "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", + streaming_callback=streaming_callback, + +) + +``` From de76a3ad394531e22aafbc6d75f7923d5b7b57d0 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Tue, 21 Oct 2025 19:16:56 -0700 Subject: [PATCH 5/9] Update test_majority_voting_streaming.py --- .../structs/test_majority_voting_streaming.py | 56 ------------------- 1 file changed, 56 deletions(-) diff --git a/tests/structs/test_majority_voting_streaming.py b/tests/structs/test_majority_voting_streaming.py index 2872d75c..8b137891 100644 --- a/tests/structs/test_majority_voting_streaming.py +++ b/tests/structs/test_majority_voting_streaming.py @@ -1,57 +1 @@ -from swarms import Agent -from swarms.prompts.finance_agent_sys_prompt import ( - FINANCIAL_AGENT_SYS_PROMPT, -) -from swarms.structs.majority_voting import MajorityVoting -from dotenv import load_dotenv - -def streaming_callback(agent_name: str, chunk: str, is_final: bool): - # Chunk buffer static per call (reset each session) - if not hasattr(streaming_callback, "_buffer"): - streaming_callback._buffer = "" - streaming_callback._buffer_size = 0 - - min_chunk_size = 512 # or any large chunk size you want - - if chunk: - streaming_callback._buffer += chunk - streaming_callback._buffer_size += len(chunk) - if streaming_callback._buffer_size >= min_chunk_size or is_final: - if streaming_callback._buffer: - print(streaming_callback._buffer, end="", flush=True) - streaming_callback._buffer = "" - streaming_callback._buffer_size = 0 - if is_final: - print() - -load_dotenv() - - -# Initialize the agent -agent = Agent( - agent_name="Financial-Analysis-Agent", - agent_description="Personal finance advisor agent", - system_prompt=FINANCIAL_AGENT_SYS_PROMPT, - max_loops=1, - model_name="gpt-4.1", - dynamic_temperature_enabled=True, - user_name="swarms_corp", - retry_attempts=3, - context_length=8192, - return_step_meta=False, - output_type="str", # "json", "dict", "csv" OR "string" "yaml" and - auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task - max_tokens=4000, # max output tokens - saved_state_path="agent_00.json", - interactive=False, - streaming_on=True, #if concurrent agents want to be streamed -) - -swarm = MajorityVoting(agents=[agent, agent, agent]) - -swarm.run( - "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", - streaming_callback=streaming_callback, - -) From 946ae70af939124f8d22e49d60f326bb7f7ee0cf Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Tue, 21 Oct 2025 19:18:03 -0700 Subject: [PATCH 6/9] [TEST] Majority Voting with Streaming Feature --- tests/structs/test_majority_voting.py | 206 +++++++------------------- 1 file changed, 55 insertions(+), 151 deletions(-) diff --git a/tests/structs/test_majority_voting.py b/tests/structs/test_majority_voting.py index dcd25f0b..ae5fc998 100644 --- a/tests/structs/test_majority_voting.py +++ b/tests/structs/test_majority_voting.py @@ -1,152 +1,56 @@ -from unittest.mock import MagicMock - -import pytest - -from swarms.structs.agent import Agent +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) from swarms.structs.majority_voting import MajorityVoting - - -def test_majority_voting_run_concurrent(mocker): - # Create mock agents - agent1 = MagicMock(spec=Agent) - agent2 = MagicMock(spec=Agent) - agent3 = MagicMock(spec=Agent) - - # Create mock majority voting - mv = MajorityVoting( - agents=[agent1, agent2, agent3], - concurrent=True, - multithreaded=False, - ) - - # Create mock conversation - conversation = MagicMock() - mv.conversation = conversation - - # Create mock results - results = ["Paris", "Paris", "Lyon"] - - # Mock agent.run method - agent1.run.return_value = results[0] - agent2.run.return_value = results[1] - agent3.run.return_value = results[2] - - # Run majority voting - majority_vote = mv.run("What is the capital of France?") - - # Assert agent.run method was called with the correct task - agent1.run.assert_called_once_with( - "What is the capital of France?" - ) - agent2.run.assert_called_once_with( - "What is the capital of France?" - ) - agent3.run.assert_called_once_with( - "What is the capital of France?" - ) - - # Assert conversation.add method was called with the correct responses - conversation.add.assert_any_call(agent1.agent_name, results[0]) - conversation.add.assert_any_call(agent2.agent_name, results[1]) - conversation.add.assert_any_call(agent3.agent_name, results[2]) - - # Assert majority vote is correct - assert majority_vote is not None - - -def test_majority_voting_run_multithreaded(mocker): - # Create mock agents - agent1 = MagicMock(spec=Agent) - agent2 = MagicMock(spec=Agent) - agent3 = MagicMock(spec=Agent) - - # Create mock majority voting - mv = MajorityVoting( - agents=[agent1, agent2, agent3], - concurrent=False, - multithreaded=True, - ) - - # Create mock conversation - conversation = MagicMock() - mv.conversation = conversation - - # Create mock results - results = ["Paris", "Paris", "Lyon"] - - # Mock agent.run method - agent1.run.return_value = results[0] - agent2.run.return_value = results[1] - agent3.run.return_value = results[2] - - # Run majority voting - majority_vote = mv.run("What is the capital of France?") - - # Assert agent.run method was called with the correct task - agent1.run.assert_called_once_with( - "What is the capital of France?" - ) - agent2.run.assert_called_once_with( - "What is the capital of France?" - ) - agent3.run.assert_called_once_with( - "What is the capital of France?" - ) - - # Assert conversation.add method was called with the correct responses - conversation.add.assert_any_call(agent1.agent_name, results[0]) - conversation.add.assert_any_call(agent2.agent_name, results[1]) - conversation.add.assert_any_call(agent3.agent_name, results[2]) - - # Assert majority vote is correct - assert majority_vote is not None - - -@pytest.mark.asyncio -async def test_majority_voting_run_asynchronous(mocker): - # Create mock agents - agent1 = MagicMock(spec=Agent) - agent2 = MagicMock(spec=Agent) - agent3 = MagicMock(spec=Agent) - - # Create mock majority voting - mv = MajorityVoting( - agents=[agent1, agent2, agent3], - concurrent=False, - multithreaded=False, - asynchronous=True, - ) - - # Create mock conversation - conversation = MagicMock() - mv.conversation = conversation - - # Create mock results - results = ["Paris", "Paris", "Lyon"] - - # Mock agent.run method - agent1.run.return_value = results[0] - agent2.run.return_value = results[1] - agent3.run.return_value = results[2] - - # Run majority voting - majority_vote = await mv.run("What is the capital of France?") - - # Assert agent.run method was called with the correct task - agent1.run.assert_called_once_with( - "What is the capital of France?" - ) - agent2.run.assert_called_once_with( - "What is the capital of France?" - ) - agent3.run.assert_called_once_with( - "What is the capital of France?" - ) - - # Assert conversation.add method was called with the correct responses - conversation.add.assert_any_call(agent1.agent_name, results[0]) - conversation.add.assert_any_call(agent2.agent_name, results[1]) - conversation.add.assert_any_call(agent3.agent_name, results[2]) - - # Assert majority vote is correct - assert majority_vote is not None +from dotenv import load_dotenv + +def streaming_callback(agent_name: str, chunk: str, is_final: bool): + # Chunk buffer static per call (reset each session) + if not hasattr(streaming_callback, "_buffer"): + streaming_callback._buffer = "" + streaming_callback._buffer_size = 0 + + min_chunk_size = 512 # or any large chunk size you want + + if chunk: + streaming_callback._buffer += chunk + streaming_callback._buffer_size += len(chunk) + if streaming_callback._buffer_size >= min_chunk_size or is_final: + if streaming_callback._buffer: + print(streaming_callback._buffer, end="", flush=True) + streaming_callback._buffer = "" + streaming_callback._buffer_size = 0 + if is_final: + print() + +load_dotenv() + + +# Initialize the agent +agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4.1", + dynamic_temperature_enabled=True, + user_name="swarms_corp", + retry_attempts=3, + context_length=8192, + return_step_meta=False, + output_type="str", # "json", "dict", "csv" OR "string" "yaml" and + auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task + max_tokens=4000, # max output tokens + saved_state_path="agent_00.json", + interactive=False, + streaming_on=True, #if concurrent agents want to be streamed +) + +swarm = MajorityVoting(agents=[agent, agent, agent]) + +swarm.run( + "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", + streaming_callback=streaming_callback, + +) From 0dd6d250883da3a238b047f4593332b8a8a7e6b0 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Tue, 21 Oct 2025 19:21:54 -0700 Subject: [PATCH 7/9] [CLEANUP] --- tests/structs/test_majority_voting_streaming.py | 1 - 1 file changed, 1 deletion(-) delete mode 100644 tests/structs/test_majority_voting_streaming.py diff --git a/tests/structs/test_majority_voting_streaming.py b/tests/structs/test_majority_voting_streaming.py deleted file mode 100644 index 8b137891..00000000 --- a/tests/structs/test_majority_voting_streaming.py +++ /dev/null @@ -1 +0,0 @@ - From 52dbfbfe0562525aad194f308cfc40597851d884 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Wed, 22 Oct 2025 19:15:20 -0700 Subject: [PATCH 8/9] [Add] Old Tests with New Tests --- tests/structs/test_majority_voting.py | 217 +++++++++++++++++++++----- 1 file changed, 181 insertions(+), 36 deletions(-) diff --git a/tests/structs/test_majority_voting.py b/tests/structs/test_majority_voting.py index ae5fc998..709946f9 100644 --- a/tests/structs/test_majority_voting.py +++ b/tests/structs/test_majority_voting.py @@ -1,11 +1,157 @@ -from swarms import Agent -from swarms.prompts.finance_agent_sys_prompt import ( - FINANCIAL_AGENT_SYS_PROMPT, -) +from unittest.mock import MagicMock + +import pytest + +from swarms.structs.agent import Agent from swarms.structs.majority_voting import MajorityVoting -from dotenv import load_dotenv -def streaming_callback(agent_name: str, chunk: str, is_final: bool): + +def test_majority_voting_run_concurrent(mocker): + # Create mock agents + agent1 = MagicMock(spec=Agent) + agent2 = MagicMock(spec=Agent) + agent3 = MagicMock(spec=Agent) + + # Create mock majority voting + mv = MajorityVoting( + agents=[agent1, agent2, agent3], + concurrent=True, + multithreaded=False, + ) + + # Create mock conversation + conversation = MagicMock() + mv.conversation = conversation + + # Create mock results + results = ["Paris", "Paris", "Lyon"] + + # Mock agent.run method + agent1.run.return_value = results[0] + agent2.run.return_value = results[1] + agent3.run.return_value = results[2] + + # Run majority voting + majority_vote = mv.run("What is the capital of France?") + + # Assert agent.run method was called with the correct task + agent1.run.assert_called_once_with( + "What is the capital of France?" + ) + agent2.run.assert_called_once_with( + "What is the capital of France?" + ) + agent3.run.assert_called_once_with( + "What is the capital of France?" + ) + + # Assert conversation.add method was called with the correct responses + conversation.add.assert_any_call(agent1.agent_name, results[0]) + conversation.add.assert_any_call(agent2.agent_name, results[1]) + conversation.add.assert_any_call(agent3.agent_name, results[2]) + + # Assert majority vote is correct + assert majority_vote is not None + + +def test_majority_voting_run_multithreaded(mocker): + # Create mock agents + agent1 = MagicMock(spec=Agent) + agent2 = MagicMock(spec=Agent) + agent3 = MagicMock(spec=Agent) + + # Create mock majority voting + mv = MajorityVoting( + agents=[agent1, agent2, agent3], + concurrent=False, + multithreaded=True, + ) + + # Create mock conversation + conversation = MagicMock() + mv.conversation = conversation + + # Create mock results + results = ["Paris", "Paris", "Lyon"] + + # Mock agent.run method + agent1.run.return_value = results[0] + agent2.run.return_value = results[1] + agent3.run.return_value = results[2] + + # Run majority voting + majority_vote = mv.run("What is the capital of France?") + + # Assert agent.run method was called with the correct task + agent1.run.assert_called_once_with( + "What is the capital of France?" + ) + agent2.run.assert_called_once_with( + "What is the capital of France?" + ) + agent3.run.assert_called_once_with( + "What is the capital of France?" + ) + + # Assert conversation.add method was called with the correct responses + conversation.add.assert_any_call(agent1.agent_name, results[0]) + conversation.add.assert_any_call(agent2.agent_name, results[1]) + conversation.add.assert_any_call(agent3.agent_name, results[2]) + + # Assert majority vote is correct + assert majority_vote is not None + + +@pytest.mark.asyncio +async def test_majority_voting_run_asynchronous(mocker): + # Create mock agents + agent1 = MagicMock(spec=Agent) + agent2 = MagicMock(spec=Agent) + agent3 = MagicMock(spec=Agent) + + # Create mock majority voting + mv = MajorityVoting( + agents=[agent1, agent2, agent3], + concurrent=False, + multithreaded=False, + asynchronous=True, + ) + + # Create mock conversation + conversation = MagicMock() + mv.conversation = conversation + + # Create mock results + results = ["Paris", "Paris", "Lyon"] + + # Mock agent.run method + agent1.run.return_value = results[0] + agent2.run.return_value = results[1] + agent3.run.return_value = results[2] + + # Run majority voting + majority_vote = await mv.run("What is the capital of France?") + + # Assert agent.run method was called with the correct task + agent1.run.assert_called_once_with( + "What is the capital of France?" + ) + agent2.run.assert_called_once_with( + "What is the capital of France?" + ) + agent3.run.assert_called_once_with( + "What is the capital of France?" + ) + + # Assert conversation.add method was called with the correct responses + conversation.add.assert_any_call(agent1.agent_name, results[0]) + conversation.add.assert_any_call(agent2.agent_name, results[1]) + conversation.add.assert_any_call(agent3.agent_name, results[2]) + + # Assert majority vote is correct + assert majority_vote is not None + +def test_streaming_majority_voting(agent_name: str, chunk: str, is_final: bool): # Chunk buffer static per call (reset each session) if not hasattr(streaming_callback, "_buffer"): streaming_callback._buffer = "" @@ -24,33 +170,32 @@ def streaming_callback(agent_name: str, chunk: str, is_final: bool): if is_final: print() -load_dotenv() - - -# Initialize the agent -agent = Agent( - agent_name="Financial-Analysis-Agent", - agent_description="Personal finance advisor agent", - system_prompt=FINANCIAL_AGENT_SYS_PROMPT, - max_loops=1, - model_name="gpt-4.1", - dynamic_temperature_enabled=True, - user_name="swarms_corp", - retry_attempts=3, - context_length=8192, - return_step_meta=False, - output_type="str", # "json", "dict", "csv" OR "string" "yaml" and - auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task - max_tokens=4000, # max output tokens - saved_state_path="agent_00.json", - interactive=False, - streaming_on=True, #if concurrent agents want to be streamed -) - -swarm = MajorityVoting(agents=[agent, agent, agent]) - -swarm.run( - "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", - streaming_callback=streaming_callback, - -) + load_dotenv() + + + # Initialize the agent + agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4.1", + dynamic_temperature_enabled=True, + user_name="swarms_corp", + retry_attempts=3, + context_length=8192, + return_step_meta=False, + output_type="str", # "json", "dict", "csv" OR "string" "yaml" and + auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task + max_tokens=4000, # max output tokens + saved_state_path="agent_00.json", + interactive=False, + streaming_on=True, #if concurrent agents want to be streamed + ) + + swarm = MajorityVoting(agents=[agent, agent, agent]) + + return swarm.run( + "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", + streaming_callback=streaming_callback, + ) From 7f3316100c70f2befa385453cb1b1868ed0fec40 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Thu, 23 Oct 2025 16:30:02 -0700 Subject: [PATCH 9/9] [TEST-EDITS] Added Testing Terminology --- tests/structs/test_majority_voting.py | 101 ++++++++++++++------------ 1 file changed, 55 insertions(+), 46 deletions(-) diff --git a/tests/structs/test_majority_voting.py b/tests/structs/test_majority_voting.py index 709946f9..1ee8fea3 100644 --- a/tests/structs/test_majority_voting.py +++ b/tests/structs/test_majority_voting.py @@ -151,51 +151,60 @@ async def test_majority_voting_run_asynchronous(mocker): # Assert majority vote is correct assert majority_vote is not None -def test_streaming_majority_voting(agent_name: str, chunk: str, is_final: bool): - # Chunk buffer static per call (reset each session) - if not hasattr(streaming_callback, "_buffer"): - streaming_callback._buffer = "" - streaming_callback._buffer_size = 0 - - min_chunk_size = 512 # or any large chunk size you want - - if chunk: - streaming_callback._buffer += chunk - streaming_callback._buffer_size += len(chunk) - if streaming_callback._buffer_size >= min_chunk_size or is_final: - if streaming_callback._buffer: - print(streaming_callback._buffer, end="", flush=True) +def test_streaming_majority_voting(): + """ + Test the streaming_majority_voting with logging/try-except and assertion. + """ + logs = [] + def streaming_callback(agent_name: str, chunk: str, is_final: bool): + # Chunk buffer static per call (reset each session) + if not hasattr(streaming_callback, "_buffer"): streaming_callback._buffer = "" streaming_callback._buffer_size = 0 - if is_final: - print() - - load_dotenv() - - - # Initialize the agent - agent = Agent( - agent_name="Financial-Analysis-Agent", - agent_description="Personal finance advisor agent", - system_prompt=FINANCIAL_AGENT_SYS_PROMPT, - max_loops=1, - model_name="gpt-4.1", - dynamic_temperature_enabled=True, - user_name="swarms_corp", - retry_attempts=3, - context_length=8192, - return_step_meta=False, - output_type="str", # "json", "dict", "csv" OR "string" "yaml" and - auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task - max_tokens=4000, # max output tokens - saved_state_path="agent_00.json", - interactive=False, - streaming_on=True, #if concurrent agents want to be streamed - ) - - swarm = MajorityVoting(agents=[agent, agent, agent]) - - return swarm.run( - "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", - streaming_callback=streaming_callback, - ) + + min_chunk_size = 512 # or any large chunk size you want + + if chunk: + streaming_callback._buffer += chunk + streaming_callback._buffer_size += len(chunk) + if streaming_callback._buffer_size >= min_chunk_size or is_final: + if streaming_callback._buffer: + print(streaming_callback._buffer, end="", flush=True) + logs.append(streaming_callback._buffer) + streaming_callback._buffer = "" + streaming_callback._buffer_size = 0 + if is_final: + print() + + try: + # Initialize the agent + agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + system_prompt="You are a financial analysis agent.", # replaced missing const + max_loops=1, + model_name="gpt-4.1", + dynamic_temperature_enabled=True, + user_name="swarms_corp", + retry_attempts=3, + context_length=8192, + return_step_meta=False, + output_type="str", # "json", "dict", "csv" OR "string" "yaml" and + auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task + max_tokens=4000, # max output tokens + saved_state_path="agent_00.json", + interactive=False, + streaming_on=True, #if concurrent agents want to be streamed + ) + + swarm = MajorityVoting(agents=[agent, agent, agent]) + + result = swarm.run( + "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", + streaming_callback=streaming_callback, + ) + assert result is not None + except Exception as e: + print("Error in test_streaming_majority_voting:", e) + print("Logs so far:", logs) + raise