Merge pull request #1147 from aparekh02/mvstream

[FEAT] [TEST] [DOCS] Streaming Callback for Majority Voting
pull/1173/head
Kye Gomez 7 days ago committed by GitHub
commit f5b9544351
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -854,3 +854,66 @@ for i, result in enumerate(concurrent_results, 1):
else:
print(f"Result: {result}")
```
### Example 4: Majorty Voting with Custom Streaming Features
This example demonstrates streaming callback with a custom streaming function.
```python
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.structs.majority_voting import MajorityVoting
from dotenv import load_dotenv
def streaming_callback(agent_name: str, chunk: str, is_final: bool):
# Chunk buffer static per call (reset each session)
if not hasattr(streaming_callback, "_buffer"):
streaming_callback._buffer = ""
streaming_callback._buffer_size = 0
min_chunk_size = 512 # or any large chunk size you want
if chunk:
streaming_callback._buffer += chunk
streaming_callback._buffer_size += len(chunk)
if streaming_callback._buffer_size >= min_chunk_size or is_final:
if streaming_callback._buffer:
print(streaming_callback._buffer, end="", flush=True)
streaming_callback._buffer = ""
streaming_callback._buffer_size = 0
if is_final:
print()
load_dotenv()
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4.1",
dynamic_temperature_enabled=True,
user_name="swarms_corp",
retry_attempts=3,
context_length=8192,
return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" "yaml" and
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
max_tokens=4000, # max output tokens
saved_state_path="agent_00.json",
interactive=False,
streaming_on=True, #if concurrent agents want to be streamed
)
swarm = MajorityVoting(agents=[agent, agent, agent])
swarm.run(
"Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
streaming_callback=streaming_callback,
)
```

@ -13,6 +13,7 @@ from swarms.utils.history_output_formatter import (
)
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType
from typing import Callable, Optional
logger = initialize_logger(log_folder="majority_voting")
@ -65,9 +66,16 @@ def default_consensus_agent(
system_prompt: str = None,
description: str = "An agent that uses consensus to generate a final answer.",
model_name: str = "gpt-4.1",
streaming_callback: Optional[Callable[[str], None]] = None,
*args,
**kwargs,
):
# If streaming_on is not None, force it to True; else, set to False
if streaming_callback is not None:
streaming_on_value = True
else:
streaming_on_value = False
return Agent(
agent_name=name,
agent_description=description,
@ -76,6 +84,7 @@ def default_consensus_agent(
system_prompt=system_prompt,
dynamic_context_window=True,
dynamic_temperature_enabled=True,
streaming_on=streaming_on_value,
*args,
**kwargs,
)
@ -159,7 +168,7 @@ class MajorityVoting:
title="Majority Voting",
)
def run(self, task: str, *args, **kwargs) -> List[Any]:
def run(self, task: str, streaming_callback: Optional[Callable[[str, str, bool], None]] = None, *args, **kwargs) -> List[Any]:
"""
Runs the majority voting system with multi-loop functionality and returns the majority vote.
@ -179,6 +188,7 @@ class MajorityVoting:
)
for i in range(self.max_loops):
output = run_agents_concurrently(
agents=self.agents,
task=self.conversation.get_str(),
@ -190,10 +200,31 @@ class MajorityVoting:
role=agent.agent_name,
content=output,
)
# Now run the consensus agent
# Set streaming_on for the consensus agent based on the provided streaming_callback
self.consensus_agent.streaming_on = streaming_callback is not None
# Instead of a simple passthrough wrapper, match the callback invocation pattern from the provided reference for the consensus agent:
consensus_agent_name = self.consensus_agent.agent_name
if streaming_callback is not None:
def consensus_streaming_callback(chunk: str):
"""Wrapper for consensus agent streaming callback."""
try:
if chunk is not None and chunk.strip():
streaming_callback(consensus_agent_name, chunk, False)
except Exception as callback_error:
if self.verbose:
logger.warning(
f"[STREAMING] Callback failed for {consensus_agent_name}: {str(callback_error)}"
)
else:
consensus_streaming_callback = None
# Run the consensus agent with the streaming callback, if any
consensus_output = self.consensus_agent.run(
task=(f"History: {self.conversation.get_str()}"),
streaming_callback=consensus_streaming_callback,
)
self.conversation.add(

@ -199,21 +199,63 @@ def test_majority_voting_different_output_types():
max_loops=1,
)
# Test different output types
for output_type in ["dict", "string", "list"]:
mv = MajorityVoting(
name=f"Output-Type-Test-{output_type}",
description=f"Testing output type: {output_type}",
agents=[
security_expert,
compliance_officer,
privacy_advocate,
],
# Assert majority vote is correct
assert majority_vote is not None
def test_streaming_majority_voting():
"""
Test the streaming_majority_voting with logging/try-except and assertion.
"""
logs = []
def streaming_callback(agent_name: str, chunk: str, is_final: bool):
# Chunk buffer static per call (reset each session)
if not hasattr(streaming_callback, "_buffer"):
streaming_callback._buffer = ""
streaming_callback._buffer_size = 0
min_chunk_size = 512 # or any large chunk size you want
if chunk:
streaming_callback._buffer += chunk
streaming_callback._buffer_size += len(chunk)
if streaming_callback._buffer_size >= min_chunk_size or is_final:
if streaming_callback._buffer:
print(streaming_callback._buffer, end="", flush=True)
logs.append(streaming_callback._buffer)
streaming_callback._buffer = ""
streaming_callback._buffer_size = 0
if is_final:
print()
try:
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt="You are a financial analysis agent.", # replaced missing const
max_loops=1,
output_type=output_type,
model_name="gpt-4.1",
dynamic_temperature_enabled=True,
user_name="swarms_corp",
retry_attempts=3,
context_length=8192,
return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" "yaml" and
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
max_tokens=4000, # max output tokens
saved_state_path="agent_00.json",
interactive=False,
streaming_on=True, #if concurrent agents want to be streamed
)
result = mv.run(
"What are the key considerations for implementing GDPR compliance in our data processing systems?"
swarm = MajorityVoting(agents=[agent, agent, agent])
result = swarm.run(
"Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
streaming_callback=streaming_callback,
)
assert result is not None
except Exception as e:
print("Error in test_streaming_majority_voting:", e)
print("Logs so far:", logs)
raise

Loading…
Cancel
Save