diff --git a/swarms/structs/majority_voting.py b/swarms/structs/majority_voting.py index dc9ad9d8..85ff31d7 100644 --- a/swarms/structs/majority_voting.py +++ b/swarms/structs/majority_voting.py @@ -13,6 +13,7 @@ from swarms.utils.history_output_formatter import ( ) from swarms.utils.loguru_logger import initialize_logger from swarms.utils.output_types import OutputType +from typing import Callable, Optional logger = initialize_logger(log_folder="majority_voting") @@ -65,9 +66,16 @@ def default_consensus_agent( system_prompt: str = None, description: str = "An agent that uses consensus to generate a final answer.", model_name: str = "gpt-4.1", + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ): + # If streaming_on is not None, force it to True; else, set to False + if streaming_callback is not None: + streaming_on_value = True + else: + streaming_on_value = False + return Agent( agent_name=name, agent_description=description, @@ -76,6 +84,7 @@ def default_consensus_agent( system_prompt=system_prompt, dynamic_context_window=True, dynamic_temperature_enabled=True, + streaming_on=streaming_on_value, *args, **kwargs, ) @@ -159,7 +168,7 @@ class MajorityVoting: title="Majority Voting", ) - def run(self, task: str, *args, **kwargs) -> List[Any]: + def run(self, task: str, streaming_callback: Optional[Callable[[str, str, bool], None]] = None, *args, **kwargs) -> List[Any]: """ Runs the majority voting system with multi-loop functionality and returns the majority vote. @@ -179,6 +188,7 @@ class MajorityVoting: ) for i in range(self.max_loops): + output = run_agents_concurrently( agents=self.agents, task=self.conversation.get_str(), @@ -190,10 +200,31 @@ class MajorityVoting: role=agent.agent_name, content=output, ) - - # Now run the consensus agent + + # Set streaming_on for the consensus agent based on the provided streaming_callback + self.consensus_agent.streaming_on = streaming_callback is not None + + # Instead of a simple passthrough wrapper, match the callback invocation pattern from the provided reference for the consensus agent: + consensus_agent_name = self.consensus_agent.agent_name + + if streaming_callback is not None: + def consensus_streaming_callback(chunk: str): + """Wrapper for consensus agent streaming callback.""" + try: + if chunk is not None and chunk.strip(): + streaming_callback(consensus_agent_name, chunk, False) + except Exception as callback_error: + if self.verbose: + logger.warning( + f"[STREAMING] Callback failed for {consensus_agent_name}: {str(callback_error)}" + ) + else: + consensus_streaming_callback = None + + # Run the consensus agent with the streaming callback, if any consensus_output = self.consensus_agent.run( task=(f"History: {self.conversation.get_str()}"), + streaming_callback=consensus_streaming_callback, ) self.conversation.add(