[TEST] Majority Voting with Streaming Feature

pull/1147/head
Aksh Parekh 2 weeks ago committed by GitHub
parent de76a3ad39
commit 946ae70af9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,152 +1,56 @@
from unittest.mock import MagicMock from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
import pytest FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.structs.agent import Agent
from swarms.structs.majority_voting import MajorityVoting from swarms.structs.majority_voting import MajorityVoting
from dotenv import load_dotenv
def test_majority_voting_run_concurrent(mocker): def streaming_callback(agent_name: str, chunk: str, is_final: bool):
# Create mock agents # Chunk buffer static per call (reset each session)
agent1 = MagicMock(spec=Agent) if not hasattr(streaming_callback, "_buffer"):
agent2 = MagicMock(spec=Agent) streaming_callback._buffer = ""
agent3 = MagicMock(spec=Agent) streaming_callback._buffer_size = 0
# Create mock majority voting min_chunk_size = 512 # or any large chunk size you want
mv = MajorityVoting(
agents=[agent1, agent2, agent3], if chunk:
concurrent=True, streaming_callback._buffer += chunk
multithreaded=False, streaming_callback._buffer_size += len(chunk)
) if streaming_callback._buffer_size >= min_chunk_size or is_final:
if streaming_callback._buffer:
# Create mock conversation print(streaming_callback._buffer, end="", flush=True)
conversation = MagicMock() streaming_callback._buffer = ""
mv.conversation = conversation streaming_callback._buffer_size = 0
if is_final:
# Create mock results print()
results = ["Paris", "Paris", "Lyon"]
load_dotenv()
# Mock agent.run method
agent1.run.return_value = results[0]
agent2.run.return_value = results[1] # Initialize the agent
agent3.run.return_value = results[2] agent = Agent(
agent_name="Financial-Analysis-Agent",
# Run majority voting agent_description="Personal finance advisor agent",
majority_vote = mv.run("What is the capital of France?") system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
# Assert agent.run method was called with the correct task model_name="gpt-4.1",
agent1.run.assert_called_once_with( dynamic_temperature_enabled=True,
"What is the capital of France?" user_name="swarms_corp",
) retry_attempts=3,
agent2.run.assert_called_once_with( context_length=8192,
"What is the capital of France?" return_step_meta=False,
) output_type="str", # "json", "dict", "csv" OR "string" "yaml" and
agent3.run.assert_called_once_with( auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
"What is the capital of France?" max_tokens=4000, # max output tokens
) saved_state_path="agent_00.json",
interactive=False,
# Assert conversation.add method was called with the correct responses streaming_on=True, #if concurrent agents want to be streamed
conversation.add.assert_any_call(agent1.agent_name, results[0]) )
conversation.add.assert_any_call(agent2.agent_name, results[1])
conversation.add.assert_any_call(agent3.agent_name, results[2]) swarm = MajorityVoting(agents=[agent, agent, agent])
# Assert majority vote is correct swarm.run(
assert majority_vote is not None "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
streaming_callback=streaming_callback,
def test_majority_voting_run_multithreaded(mocker): )
# Create mock agents
agent1 = MagicMock(spec=Agent)
agent2 = MagicMock(spec=Agent)
agent3 = MagicMock(spec=Agent)
# Create mock majority voting
mv = MajorityVoting(
agents=[agent1, agent2, agent3],
concurrent=False,
multithreaded=True,
)
# Create mock conversation
conversation = MagicMock()
mv.conversation = conversation
# Create mock results
results = ["Paris", "Paris", "Lyon"]
# Mock agent.run method
agent1.run.return_value = results[0]
agent2.run.return_value = results[1]
agent3.run.return_value = results[2]
# Run majority voting
majority_vote = mv.run("What is the capital of France?")
# Assert agent.run method was called with the correct task
agent1.run.assert_called_once_with(
"What is the capital of France?"
)
agent2.run.assert_called_once_with(
"What is the capital of France?"
)
agent3.run.assert_called_once_with(
"What is the capital of France?"
)
# Assert conversation.add method was called with the correct responses
conversation.add.assert_any_call(agent1.agent_name, results[0])
conversation.add.assert_any_call(agent2.agent_name, results[1])
conversation.add.assert_any_call(agent3.agent_name, results[2])
# Assert majority vote is correct
assert majority_vote is not None
@pytest.mark.asyncio
async def test_majority_voting_run_asynchronous(mocker):
# Create mock agents
agent1 = MagicMock(spec=Agent)
agent2 = MagicMock(spec=Agent)
agent3 = MagicMock(spec=Agent)
# Create mock majority voting
mv = MajorityVoting(
agents=[agent1, agent2, agent3],
concurrent=False,
multithreaded=False,
asynchronous=True,
)
# Create mock conversation
conversation = MagicMock()
mv.conversation = conversation
# Create mock results
results = ["Paris", "Paris", "Lyon"]
# Mock agent.run method
agent1.run.return_value = results[0]
agent2.run.return_value = results[1]
agent3.run.return_value = results[2]
# Run majority voting
majority_vote = await mv.run("What is the capital of France?")
# Assert agent.run method was called with the correct task
agent1.run.assert_called_once_with(
"What is the capital of France?"
)
agent2.run.assert_called_once_with(
"What is the capital of France?"
)
agent3.run.assert_called_once_with(
"What is the capital of France?"
)
# Assert conversation.add method was called with the correct responses
conversation.add.assert_any_call(agent1.agent_name, results[0])
conversation.add.assert_any_call(agent2.agent_name, results[1])
conversation.add.assert_any_call(agent3.agent_name, results[2])
# Assert majority vote is correct
assert majority_vote is not None

Loading…
Cancel
Save