@ -1,15 +1,14 @@
import asyncio
import concurrent . futures
import re
import sys
from collections import Counter
from multiprocessing import Pool
from typing import Any , List
from typing import Any , Callable , List , Optional
from loguru import logger
from swarms . structs . agent import Agent
from swarms . structs . conversation import Conversation
from swarms . utils . file_processing import create_file
# Configure loguru logger with advanced settings
logger . remove ( )
@ -96,7 +95,6 @@ def most_frequent(
for i in clist :
current_frequency = sum ( cmp_func ( i , item ) for item in clist )
print ( current_frequency )
if current_frequency > counter :
counter = current_frequency
num = i
@ -104,7 +102,7 @@ def most_frequent(
return num , counter
def majority_voting ( answers : li st) :
def majority_voting ( answers : List [ str] ) :
"""
Performs majority voting on a list of answers and returns the most common answer .
@ -115,7 +113,12 @@ def majority_voting(answers: list):
The most common answer in the list .
"""
counter = Counter ( answers )
if counter :
answer = counter . most_common ( 1 ) [ 0 ] [ 0 ]
else :
answer = " I don ' t know "
return answer
@ -124,13 +127,11 @@ class MajorityVoting:
Class representing a majority voting system for agents .
Args :
agents ( List [ Agent ] ) : A list of agents to use in the majority voting system .
concurrent ( bool , optional ) : Whether to run the agents concurrently . Defaults to False .
multithreaded ( bool , optional ) : Whether to run the agents using multithreading . Defaults to False .
multiprocess ( bool , optional ) : Whether to run the agents using multiprocessing . Defaults to False .
asynchronous ( bool , optional ) : Whether to run the agents asynchronously . Defaults to False .
output_parser ( callable , optional ) : A callable function to parse the output of the majority voting system . Defaults to None .
agents ( list ) : A list of agents to be used in the majority voting system .
output_parser ( function , optional ) : A function used to parse the output of the agents .
If not provided , the default majority voting function is used .
autosave ( bool , optional ) : A boolean indicating whether to autosave the conversation to a file .
verbose ( bool , optional ) : A boolean indicating whether to enable verbose logging .
Examples :
>> > from swarms . structs . agent import Agent
>> > from swarms . structs . majority_voting import MajorityVoting
@ -148,21 +149,13 @@ class MajorityVoting:
def __init__ (
self ,
agents : List [ Agent ] ,
concurrent : bool = False ,
multithreaded : bool = False ,
multiprocess : bool = False ,
asynchronous : bool = False ,
output_parser : callable = None ,
output_parser : Optional [ Callable ] = majority_voting ,
autosave : bool = False ,
verbose : bool = False ,
* args ,
* * kwargs ,
) :
self . agents = agents
self . concurrent = concurrent
self . multithreaded = multithreaded
self . multiprocess = multiprocess
self . asynchronous = asynchronous
self . output_parser = output_parser
self . autosave = autosave
self . verbose = verbose
@ -173,7 +166,7 @@ class MajorityVoting:
# If autosave is enabled, save the conversation to a file
if self . autosave :
self . conversation . save ( )
create_file ( str ( self . conversation ) , " majority_voting.json " )
# Log the agents
logger . info ( " Initializing majority voting system " )
@ -198,69 +191,37 @@ class MajorityVoting:
"""
# Route to each agent
if self . concurrent :
with concurrent . futures . ThreadPoolExecutor ( ) as executor :
# Log the agents
logger . info ( " Running agents concurrently " )
futures = [
executor . submit ( agent . run , task , * args )
for agent in self . agents
]
results = [
future . result ( )
for future in concurrent . futures . as_completed (
futures
)
]
elif self . multithreaded :
logger . info ( " Running agents using multithreading " )
with concurrent . futures . ThreadPoolExecutor ( ) as executor :
results = [
executor . submit ( agent . run , task , * args )
for agent in self . agents
]
results = [ future . result ( ) for future in results ]
elif self . multiprocess :
logger . info ( " Running agents using multiprocessing " )
with Pool ( ) as pool :
results = pool . starmap (
Agent . run ,
[ ( agent , task , * args ) for agent in self . agents ] ,
)
elif self . asynchronous :
loop = asyncio . get_event_loop ( )
tasks = [
loop . run_in_executor ( None , agent . run , task , * args )
for agent in self . agents
]
results = loop . run_until_complete ( asyncio . gather ( * tasks ) )
loop . close ( )
else :
results = [
agent . run ( task , * args ) for agent in self . agents
for future in concurrent . futures . as_completed ( futures )
]
# Add responses to conversation and log them
for agent , response in zip ( self . agents , results ) :
logger . info ( f " [ { agent . agent_id } ][ { response } ] " )
response = (
response if isinstance ( response , list ) else [ response ]
)
response = response if isinstance ( response , list ) else [ response ]
self . conversation . add ( agent . agent_name , response )
logger . info ( f " [ { agent . agent_id } ][ { response } ] " )
logger . info ( f " [Agent][Name: { agent . agent_name } ][Response: { response } ] " )
# Perform majority voting on the conversation
majority_vote = majority_voting ( self . conversation . responses )
responses = [
message [ " content " ]
for message in self . conversation . conversation_history
if message [ " role " ] == " agent "
]
# Log the majority vote
logger . info ( f " Majority vote: { majority_vote } " )
# If an output parser is provided, parse the responses
if self . output_parser is not None :
majority_vote = self . output_parser ( responses , * args , * * kwargs )
else :
majority_vote = majority_voting ( responses )
# If an output parser is provided, parse the output
if self . output_parser :
majority_vote = self . output_parser (
majority_vote , * args , * * kwargs
)
# Return the majority vote
return majority_vote