[FEAT] Majority Voting Streaming Feature

Added streaming callback support to the majority voting agent.
pull/1147/head
Aksh Parekh 3 weeks ago committed by GitHub
parent 3f1ed3d043
commit 6b80101323
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -13,6 +13,7 @@ from swarms.utils.history_output_formatter import (
)
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType
from typing import Callable, Optional
logger = initialize_logger(log_folder="majority_voting")
@ -65,9 +66,16 @@ def default_consensus_agent(
system_prompt: str = None,
description: str = "An agent that uses consensus to generate a final answer.",
model_name: str = "gpt-4.1",
streaming_callback: Optional[Callable[[str], None]] = None,
*args,
**kwargs,
):
# If streaming_on is not None, force it to True; else, set to False
if streaming_callback is not None:
streaming_on_value = True
else:
streaming_on_value = False
return Agent(
agent_name=name,
agent_description=description,
@ -76,6 +84,7 @@ def default_consensus_agent(
system_prompt=system_prompt,
dynamic_context_window=True,
dynamic_temperature_enabled=True,
streaming_on=streaming_on_value,
*args,
**kwargs,
)
@ -159,7 +168,7 @@ class MajorityVoting:
title="Majority Voting",
)
def run(self, task: str, *args, **kwargs) -> List[Any]:
def run(self, task: str, streaming_callback: Optional[Callable[[str, str, bool], None]] = None, *args, **kwargs) -> List[Any]:
"""
Runs the majority voting system with multi-loop functionality and returns the majority vote.
@ -179,6 +188,7 @@ class MajorityVoting:
)
for i in range(self.max_loops):
output = run_agents_concurrently(
agents=self.agents,
task=self.conversation.get_str(),
@ -190,10 +200,31 @@ class MajorityVoting:
role=agent.agent_name,
content=output,
)
# Now run the consensus agent
# Set streaming_on for the consensus agent based on the provided streaming_callback
self.consensus_agent.streaming_on = streaming_callback is not None
# Instead of a simple passthrough wrapper, match the callback invocation pattern from the provided reference for the consensus agent:
consensus_agent_name = self.consensus_agent.agent_name
if streaming_callback is not None:
def consensus_streaming_callback(chunk: str):
"""Wrapper for consensus agent streaming callback."""
try:
if chunk is not None and chunk.strip():
streaming_callback(consensus_agent_name, chunk, False)
except Exception as callback_error:
if self.verbose:
logger.warning(
f"[STREAMING] Callback failed for {consensus_agent_name}: {str(callback_error)}"
)
else:
consensus_streaming_callback = None
# Run the consensus agent with the streaming callback, if any
consensus_output = self.consensus_agent.run(
task=(f"History: {self.conversation.get_str()}"),
streaming_callback=consensus_streaming_callback,
)
self.conversation.add(

Loading…
Cancel
Save