@ -6,6 +6,7 @@ from loguru import logger
from swarms . structs . agent import Agent
from swarms . structs . conversation import Conversation
from swarms . structs . ma_utils import create_agent_map
from swarms . utils . generate_keys import generate_api_key
from swarms . utils . history_output_formatter import (
history_output_formatter ,
@ -24,12 +25,6 @@ class AgentNotFoundError(InteractiveGroupChatError):
pass
class NoMentionedAgentsError ( InteractiveGroupChatError ) :
""" Raised when no agents are mentioned in the task """
pass
class InvalidTaskFormatError ( InteractiveGroupChatError ) :
""" Raised when the task format is invalid """
@ -294,14 +289,7 @@ class InteractiveGroupChat:
# Initialize conversation history
self . conversation = Conversation ( time_enabled = True )
# Create a mapping of agent names to agents for easy lookup
self . agent_map = { }
for agent in agents :
if isinstance ( agent , Agent ) :
self . agent_map [ agent . agent_name ] = agent
elif callable ( agent ) :
# For callable functions, use the function name as the agent name
self . agent_map [ agent . __name__ ] = agent
self . agent_map = create_agent_map ( self . agents )
self . _validate_initialization ( )
self . _setup_conversation_context ( )
@ -398,7 +386,7 @@ class InteractiveGroupChat:
Start an interactive terminal session for chatting with agents .
This method creates a REPL ( Read - Eval - Print Loop ) that allows users to :
- Chat with agents using @mentions
- Chat with agents using @mentions ( optional )
- See available agents and their descriptions
- Exit the session using ' exit ' or ' quit '
- Get help using ' help ' or ' ? '
@ -426,7 +414,9 @@ class InteractiveGroupChat:
print ( " - Type ' help ' or ' ? ' for help " )
print ( " - Type ' exit ' or ' quit ' to end the session " )
print ( " - Type ' speaker ' to change speaker function " )
print ( " - Use @agent_name to mention agents " )
print (
" - Use @agent_name to mention specific agents (optional) "
)
print ( " \n Start chatting: " )
while True :
@ -441,9 +431,11 @@ class InteractiveGroupChat:
if user_input . lower ( ) in [ " help " , " ? " ] :
print ( " \n Help: " )
print ( " 1. Mention agents using @agent_name " )
print (
" 2. You can mention multiple agents in one task "
" 1. You can mention specific agents using @agent_name (optional) "
)
print (
" 2. If no agents are mentioned, they will be selected automatically "
)
print ( " 3. Available agents: " )
for name in self . agent_map :
@ -513,10 +505,6 @@ class InteractiveGroupChat:
print ( " \n Chat: " )
# print(response)
except NoMentionedAgentsError :
print (
" \n Error: Please mention at least one agent using @agent_name "
)
except AgentNotFoundError as e :
print ( f " \n Error: { str ( e ) } " )
except Exception as e :
@ -699,13 +687,13 @@ Remember: You are part of a team. Your response should reflect that you've read,
def _extract_mentions ( self , task : str ) - > List [ str ] :
"""
Extracts @mentions from the task .
Extracts @mentions from the task . If no mentions are found , returns all available agents .
Args :
task ( str ) : The input task
Returns :
List [ str ] : List of mentioned agent names
List [ str ] : List of mentioned agent names or all agent names if no mentions
Raises :
InvalidtaskFormatError : If the task format is invalid
@ -713,11 +701,17 @@ Remember: You are part of a team. Your response should reflect that you've read,
try :
# Find all @mentions using regex
mentions = re . findall ( r " @( \ w+) " , task )
return [
valid_mentions = [
mention
for mention in mentions
if mention in self . agent_map
]
# If no valid mentions found, return all available agents
if not valid_mentions :
return list ( self . agent_map . keys ( ) )
return valid_mentions
except Exception as e :
logger . error ( f " Error extracting mentions: { e } " )
raise InvalidTaskFormatError ( f " Invalid task format: { e } " )
@ -810,6 +804,149 @@ Remember: You are part of a team. Your response should reflect that you've read,
# Fallback to original order
return mentioned_agents
def _process_dynamic_speakers (
self ,
mentioned_agents : List [ str ] ,
img : Optional [ str ] ,
imgs : Optional [ List [ str ] ] ,
) - > None :
"""
Process responses using the dynamic speaker function .
"""
# Get strategy from speaker state (default to sequential)
strategy = self . speaker_state . get ( " strategy " , " sequential " )
# Track which agents have spoken to ensure all get a chance
spoken_agents = set ( )
last_response = " "
max_iterations = (
len ( mentioned_agents ) * 3
) # Allow more iterations for parallel
iteration = 0
while iteration < max_iterations and len ( spoken_agents ) < len (
mentioned_agents
) :
# Determine next speaker(s) using dynamic function
next_speakers = self . speaker_function (
mentioned_agents ,
last_response ,
strategy = strategy ,
* * self . speaker_state ,
)
# Handle both single agent and multiple agents
if isinstance ( next_speakers , str ) :
next_speakers = [ next_speakers ]
# Filter out invalid agents
valid_next_speakers = [
agent
for agent in next_speakers
if agent in mentioned_agents
]
if not valid_next_speakers :
# If no valid mentions found, randomly select from unspoken agents
unspoken_agents = [
agent
for agent in mentioned_agents
if agent not in spoken_agents
]
if unspoken_agents :
valid_next_speakers = [
random . choice ( unspoken_agents )
]
else :
# All agents have spoken, break the loop
break
# Process agents based on strategy
if strategy == " sequential " :
self . _process_sequential_speakers (
valid_next_speakers , spoken_agents , img , imgs
)
elif strategy == " parallel " :
self . _process_parallel_speakers (
valid_next_speakers , spoken_agents , img , imgs
)
iteration + = 1
def _process_sequential_speakers (
self ,
speakers : List [ str ] ,
spoken_agents : set ,
img : Optional [ str ] ,
imgs : Optional [ List [ str ] ] ,
) - > None :
"""
Process speakers sequentially .
"""
for next_speaker in speakers :
if next_speaker in spoken_agents :
continue # Skip if already spoken
response = self . _get_agent_response (
next_speaker , img , imgs
)
if response :
spoken_agents . add ( next_speaker )
break # Only process one agent in sequential mode
def _process_parallel_speakers (
self ,
speakers : List [ str ] ,
spoken_agents : set ,
img : Optional [ str ] ,
imgs : Optional [ List [ str ] ] ,
) - > None :
"""
Process speakers in parallel .
"""
import concurrent . futures
# Get responses from all valid agents
responses = [ ]
with concurrent . futures . ThreadPoolExecutor ( ) as executor :
future_to_agent = {
executor . submit (
self . _get_agent_response , agent , img , imgs
) : agent
for agent in speakers
if agent not in spoken_agents
}
for future in concurrent . futures . as_completed (
future_to_agent
) :
agent = future_to_agent [ future ]
try :
response = future . result ( )
if response :
responses . append ( response )
spoken_agents . add ( agent )
except Exception as e :
logger . error (
f " Error getting response from { agent } : { e } "
)
def _process_static_speakers (
self ,
mentioned_agents : List [ str ] ,
img : Optional [ str ] ,
imgs : Optional [ List [ str ] ] ,
) - > None :
"""
Process responses using a static speaker function .
"""
speaking_order = self . _get_speaking_order ( mentioned_agents )
logger . info ( f " Speaking order determined: { speaking_order } " )
# Get responses from mentioned agents in the determined order
for agent_name in speaking_order :
self . _get_agent_response ( agent_name , img , imgs )
def run (
self ,
task : str ,
@ -817,151 +954,33 @@ Remember: You are part of a team. Your response should reflect that you've read,
imgs : Optional [ List [ str ] ] = None ,
) - > str :
"""
Process a task and get responses from mentioned agents .
If interactive mode is enabled , this will be called by start_interactive_session ( ) .
Otherwise , it can be called directly for single task processing .
Process a task and get responses from agents . If no agents are mentioned ,
randomly selects agents to participate .
"""
try :
# Extract mentioned agents
mentioned_agents = self . _extract_mentions ( task )
if not mentioned_agents :
raise NoMentionedAgentsError (
" No valid agents mentioned in the task "
)
# Extract mentioned agents (or all agents if none mentioned)
if " @ " in task :
mentioned_agents = self . _extract_mentions ( task )
else :
pass
# Add user task to conversation
self . conversation . add ( role = " User " , content = task )
# Handle dynamic speaker function differently
# Process responses based on speaker function type
if self . speaker_function == random_dynamic_speaker :
# Get strategy from speaker state (default to sequential)
strategy = self . speaker_state . get (
" strategy " , " sequential "
self . _process_dynamic_speakers (
mentioned_agents , img , imgs
)
# For dynamic speaker, we'll determine the next speaker after each response
# Track which agents have spoken to ensure all get a chance
spoken_agents = set ( )
last_response = " "
max_iterations = (
len ( mentioned_agents ) * 3
) # Allow more iterations for parallel
iteration = 0
while iteration < max_iterations and len (
spoken_agents
) < len ( mentioned_agents ) :
# Determine next speaker(s) using dynamic function
next_speakers = self . speaker_function (
mentioned_agents , # Use all mentioned agents, not remaining_agents
last_response ,
strategy = strategy ,
* * self . speaker_state ,
)
# Handle both single agent and multiple agents
if isinstance ( next_speakers , str ) :
next_speakers = [ next_speakers ]
# Filter out invalid agents
valid_next_speakers = [
agent
for agent in next_speakers
if agent in mentioned_agents
]
if not valid_next_speakers :
# If no valid mentions found, randomly select from unspoken agents
unspoken_agents = [
agent
for agent in mentioned_agents
if agent not in spoken_agents
]
if unspoken_agents :
valid_next_speakers = [
random . choice ( unspoken_agents )
]
else :
# All agents have spoken, break the loop
break
# Process agents based on strategy
if strategy == " sequential " :
# Process one agent at a time
for next_speaker in valid_next_speakers :
if next_speaker in spoken_agents :
continue # Skip if already spoken
response = self . _get_agent_response (
next_speaker , img , imgs
)
if response :
last_response = response
spoken_agents . add ( next_speaker )
break # Only process one agent in sequential mode
elif strategy == " parallel " :
# Process all mentioned agents in parallel
import concurrent . futures
# Get responses from all valid agents
responses = [ ]
with concurrent . futures . ThreadPoolExecutor ( ) as executor :
future_to_agent = {
executor . submit (
self . _get_agent_response ,
agent ,
img ,
imgs ,
) : agent
for agent in valid_next_speakers
if agent not in spoken_agents
}
for (
future
) in concurrent . futures . as_completed (
future_to_agent
) :
agent = future_to_agent [ future ]
try :
response = future . result ( )
if response :
responses . append ( response )
spoken_agents . add ( agent )
except Exception as e :
logger . error (
f " Error getting response from { agent } : { e } "
)
# Combine responses for next iteration
if responses :
last_response = " \n \n " . join ( responses )
iteration + = 1
else :
# For non-dynamic speaker functions, use the original logic
speaking_order = self . _get_speaking_order (
mentioned_agents
self . _process_static_speakers (
mentioned_agents , img , imgs
)
logger . info (
f " Speaking order determined: { speaking_order } "
)
# Get responses from mentioned agents in the determined order
for agent_name in speaking_order :
response = self . _get_agent_response (
agent_name , img , imgs
)
return history_output_formatter (
self . conversation , self . output_type
)
except InteractiveGroupChatError as e :
logger . error ( f " GroupChat error: { e } " )
raise
except Exception as e :
logger . error ( f " Unexpected error: { e } " )
raise InteractiveGroupChatError (