diff --git a/docs/swarms/structs/majorityvoting.md b/docs/swarms/structs/majorityvoting.md index 5e62ffef..26e6c10e 100644 --- a/docs/swarms/structs/majorityvoting.md +++ b/docs/swarms/structs/majorityvoting.md @@ -854,3 +854,66 @@ for i, result in enumerate(concurrent_results, 1): else: print(f"Result: {result}") ``` +### Example 4: Majorty Voting with Custom Streaming Features + +This example demonstrates streaming callback with a custom streaming function. + +```python +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) +from swarms.structs.majority_voting import MajorityVoting +from dotenv import load_dotenv + +def streaming_callback(agent_name: str, chunk: str, is_final: bool): + # Chunk buffer static per call (reset each session) + if not hasattr(streaming_callback, "_buffer"): + streaming_callback._buffer = "" + streaming_callback._buffer_size = 0 + + min_chunk_size = 512 # or any large chunk size you want + + if chunk: + streaming_callback._buffer += chunk + streaming_callback._buffer_size += len(chunk) + if streaming_callback._buffer_size >= min_chunk_size or is_final: + if streaming_callback._buffer: + print(streaming_callback._buffer, end="", flush=True) + streaming_callback._buffer = "" + streaming_callback._buffer_size = 0 + if is_final: + print() + +load_dotenv() + + +# Initialize the agent +agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4.1", + dynamic_temperature_enabled=True, + user_name="swarms_corp", + retry_attempts=3, + context_length=8192, + return_step_meta=False, + output_type="str", # "json", "dict", "csv" OR "string" "yaml" and + auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task + max_tokens=4000, # max output tokens + saved_state_path="agent_00.json", + interactive=False, + streaming_on=True, #if concurrent agents want to be streamed +) + +swarm = MajorityVoting(agents=[agent, agent, agent]) + +swarm.run( + "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", + streaming_callback=streaming_callback, + +) + +``` diff --git a/swarms/structs/majority_voting.py b/swarms/structs/majority_voting.py index dc9ad9d8..85ff31d7 100644 --- a/swarms/structs/majority_voting.py +++ b/swarms/structs/majority_voting.py @@ -13,6 +13,7 @@ from swarms.utils.history_output_formatter import ( ) from swarms.utils.loguru_logger import initialize_logger from swarms.utils.output_types import OutputType +from typing import Callable, Optional logger = initialize_logger(log_folder="majority_voting") @@ -65,9 +66,16 @@ def default_consensus_agent( system_prompt: str = None, description: str = "An agent that uses consensus to generate a final answer.", model_name: str = "gpt-4.1", + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ): + # If streaming_on is not None, force it to True; else, set to False + if streaming_callback is not None: + streaming_on_value = True + else: + streaming_on_value = False + return Agent( agent_name=name, agent_description=description, @@ -76,6 +84,7 @@ def default_consensus_agent( system_prompt=system_prompt, dynamic_context_window=True, dynamic_temperature_enabled=True, + streaming_on=streaming_on_value, *args, **kwargs, ) @@ -159,7 +168,7 @@ class MajorityVoting: title="Majority Voting", ) - def run(self, task: str, *args, **kwargs) -> List[Any]: + def run(self, task: str, streaming_callback: Optional[Callable[[str, str, bool], None]] = None, *args, **kwargs) -> List[Any]: """ Runs the majority voting system with multi-loop functionality and returns the majority vote. @@ -179,6 +188,7 @@ class MajorityVoting: ) for i in range(self.max_loops): + output = run_agents_concurrently( agents=self.agents, task=self.conversation.get_str(), @@ -190,10 +200,31 @@ class MajorityVoting: role=agent.agent_name, content=output, ) - - # Now run the consensus agent + + # Set streaming_on for the consensus agent based on the provided streaming_callback + self.consensus_agent.streaming_on = streaming_callback is not None + + # Instead of a simple passthrough wrapper, match the callback invocation pattern from the provided reference for the consensus agent: + consensus_agent_name = self.consensus_agent.agent_name + + if streaming_callback is not None: + def consensus_streaming_callback(chunk: str): + """Wrapper for consensus agent streaming callback.""" + try: + if chunk is not None and chunk.strip(): + streaming_callback(consensus_agent_name, chunk, False) + except Exception as callback_error: + if self.verbose: + logger.warning( + f"[STREAMING] Callback failed for {consensus_agent_name}: {str(callback_error)}" + ) + else: + consensus_streaming_callback = None + + # Run the consensus agent with the streaming callback, if any consensus_output = self.consensus_agent.run( task=(f"History: {self.conversation.get_str()}"), + streaming_callback=consensus_streaming_callback, ) self.conversation.add( diff --git a/tests/structs/test_majority_voting.py b/tests/structs/test_majority_voting.py index 580028d5..46e2e452 100644 --- a/tests/structs/test_majority_voting.py +++ b/tests/structs/test_majority_voting.py @@ -199,21 +199,63 @@ def test_majority_voting_different_output_types(): max_loops=1, ) - # Test different output types - for output_type in ["dict", "string", "list"]: - mv = MajorityVoting( - name=f"Output-Type-Test-{output_type}", - description=f"Testing output type: {output_type}", - agents=[ - security_expert, - compliance_officer, - privacy_advocate, - ], + # Assert majority vote is correct + assert majority_vote is not None + +def test_streaming_majority_voting(): + """ + Test the streaming_majority_voting with logging/try-except and assertion. + """ + logs = [] + def streaming_callback(agent_name: str, chunk: str, is_final: bool): + # Chunk buffer static per call (reset each session) + if not hasattr(streaming_callback, "_buffer"): + streaming_callback._buffer = "" + streaming_callback._buffer_size = 0 + + min_chunk_size = 512 # or any large chunk size you want + + if chunk: + streaming_callback._buffer += chunk + streaming_callback._buffer_size += len(chunk) + if streaming_callback._buffer_size >= min_chunk_size or is_final: + if streaming_callback._buffer: + print(streaming_callback._buffer, end="", flush=True) + logs.append(streaming_callback._buffer) + streaming_callback._buffer = "" + streaming_callback._buffer_size = 0 + if is_final: + print() + + try: + # Initialize the agent + agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + system_prompt="You are a financial analysis agent.", # replaced missing const max_loops=1, - output_type=output_type, + model_name="gpt-4.1", + dynamic_temperature_enabled=True, + user_name="swarms_corp", + retry_attempts=3, + context_length=8192, + return_step_meta=False, + output_type="str", # "json", "dict", "csv" OR "string" "yaml" and + auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task + max_tokens=4000, # max output tokens + saved_state_path="agent_00.json", + interactive=False, + streaming_on=True, #if concurrent agents want to be streamed ) - - result = mv.run( - "What are the key considerations for implementing GDPR compliance in our data processing systems?" + + swarm = MajorityVoting(agents=[agent, agent, agent]) + + result = swarm.run( + "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", + streaming_callback=streaming_callback, ) assert result is not None + except Exception as e: + print("Error in test_streaming_majority_voting:", e) + print("Logs so far:", logs) + raise