[CLEANUP][InteractiveGroupChat]

pull/981/head
Kye Gomez 3 days ago
parent 27a57c1f09
commit dd7ac00dcb

@ -1,4 +1,3 @@
import json
import os import os
from swarms_client import SwarmsClient from swarms_client import SwarmsClient
from swarms_client.types import AgentSpecParam from swarms_client.types import AgentSpecParam
@ -12,7 +11,7 @@ agent_spec = AgentSpecParam(
agent_name="doctor_agent", agent_name="doctor_agent",
description="A virtual doctor agent that provides evidence-based, safe, and empathetic medical advice for common health questions. Always reminds users to consult a healthcare professional for diagnoses or prescriptions.", description="A virtual doctor agent that provides evidence-based, safe, and empathetic medical advice for common health questions. Always reminds users to consult a healthcare professional for diagnoses or prescriptions.",
task="What is the best medicine for a cold?", task="What is the best medicine for a cold?",
model_name="claude-3-5-sonnet-20241022", model_name="claude-4-sonnet-20250514",
system_prompt=( system_prompt=(
"You are a highly knowledgeable, ethical, and empathetic virtual doctor. " "You are a highly knowledgeable, ethical, and empathetic virtual doctor. "
"Always provide evidence-based, safe, and practical medical advice. " "Always provide evidence-based, safe, and practical medical advice. "
@ -26,15 +25,15 @@ agent_spec = AgentSpecParam(
role="doctor", role="doctor",
) )
# response = client.agent.run( response = client.agent.run(
# agent_config=agent_spec, agent_config=agent_spec,
# task="What is the best medicine for a cold?", task="What is the best medicine for a cold?",
# ) )
# print(response) print(response)
print(json.dumps(client.models.list_available(), indent=4)) # print(json.dumps(client.models.list_available(), indent=4))
print(json.dumps(client.health.check(), indent=4)) # print(json.dumps(client.health.check(), indent=4))
print(json.dumps(client.swarms.get_logs(), indent=4)) # print(json.dumps(client.swarms.get_logs(), indent=4))
print(json.dumps(client.client.rate.get_limits(), indent=4)) # print(json.dumps(client.client.rate.get_limits(), indent=4))
print(json.dumps(client.swarms.check_available(), indent=4)) # print(json.dumps(client.swarms.check_available(), indent=4))

@ -138,7 +138,7 @@ class ConcurrentWorkflow(BaseSwarm):
self, self,
name: str = "ConcurrentWorkflow", name: str = "ConcurrentWorkflow",
description: str = "Execution of multiple agents concurrently", description: str = "Execution of multiple agents concurrently",
agents: List[Union[Agent, Callable]] = [], agents: List[Union[Agent, Callable]] = None,
metadata_output_path: str = "agent_metadata.json", metadata_output_path: str = "agent_metadata.json",
auto_save: bool = True, auto_save: bool = True,
output_type: str = "dict-all-except-first", output_type: str = "dict-all-except-first",

@ -1,6 +1,7 @@
import re
import random import random
from typing import Callable, List, Union, Optional import re
import traceback
from typing import Callable, List, Optional, Union
from loguru import logger from loguru import logger
@ -183,60 +184,33 @@ speaker_functions = {
class InteractiveGroupChat: class InteractiveGroupChat:
""" """
An interactive group chat system that enables conversations with multiple agents using @mentions. InteractiveGroupChat enables collaborative conversations among multiple agents (or callables)
with flexible speaker selection strategies, conversation history, and interactive terminal sessions.
This class allows users to interact with multiple agents by mentioning them using @agent_name syntax. Features:
When multiple agents are mentioned, they can see and respond to each other's tasks. - Supports multiple agents (LLMs or callable functions)
- Customizable speaker selection (round robin, random, priority, dynamic, or custom)
- Maintains conversation history with time stamps
- Interactive REPL session for human-in-the-loop group chat
- Agents can @mention each other to request input or delegate tasks
- Automatic prompt augmentation to teach agents collaborative protocols
Attributes: Args:
id (str): Unique identifier for the group chat (default: generated)
name (str): Name of the group chat name (str): Name of the group chat
description (str): Description of the group chat's purpose description (str): Description of the group chat's purpose
agents (List[Union[Agent, Callable]]): List of Agent instances or callable functions agents (List[Union[Agent, Callable]]): List of agent objects or callables
max_loops (int): Maximum number of conversation turns max_loops (int): Maximum number of conversation loops per run
conversation (Conversation): Stores the chat history output_type (str): Output format for conversation history ("dict", "str", etc.)
agent_map (Dict[str, Union[Agent, Callable]]): Mapping of agent names to their instances interactive (bool): If True, enables interactive terminal session
speaker_function (Callable): Function to determine speaking order speaker_function (Optional[Union[str, Callable]]): Speaker selection strategy
speaker_state (dict): State for speaker functions that need it speaker_state (Optional[dict]): State/config for the speaker function
Args:
name (str, optional): Name of the group chat. Defaults to "InteractiveGroupChat".
description (str, optional): Description of the chat. Defaults to "An interactive group chat for multiple agents".
agents (List[Union[Agent, Callable]], optional): List of participating agents or callables. Defaults to empty list.
max_loops (int, optional): Maximum conversation turns. Defaults to 1.
output_type (str, optional): Type of output format. Defaults to "string".
interactive (bool, optional): Whether to enable interactive terminal mode. Defaults to False.
speaker_function (Union[str, Callable], optional): Function to determine speaking order. Can be:
- A string name: "round-robin-speaker", "random-speaker", "priority-speaker", "random-dynamic-speaker"
- A custom callable function
- None (defaults to round_robin_speaker)
speaker_state (dict, optional): Initial state for speaker function. Defaults to empty dict.
Raises: Raises:
ValueError: If invalid initialization parameters are provided ValueError: If required arguments are missing or invalid
InvalidSpeakerFunctionError: If the speaker function is invalid InvalidSpeakerFunctionError: If an invalid speaker function is provided
InteractiveGroupChatError: For interactive session errors
Examples: AgentNotFoundError: If an agent is not found by name
# Initialize with string-based speaker function
group_chat = InteractiveGroupChat(
agents=[agent1, agent2, agent3],
speaker_function="random-speaker"
)
# Initialize with priority speaker function
group_chat = InteractiveGroupChat(
agents=[agent1, agent2, agent3],
speaker_function="priority-speaker",
speaker_state={"priorities": {"agent1": 3, "agent2": 2, "agent3": 1}}
)
# Initialize with dynamic speaker function (agents mention each other)
group_chat = InteractiveGroupChat(
agents=[agent1, agent2, agent3],
speaker_function="random-dynamic-speaker"
)
# Change speaker function during runtime
group_chat.set_speaker_function("round-robin-speaker")
""" """
def __init__( def __init__(
@ -246,11 +220,25 @@ class InteractiveGroupChat:
description: str = "An interactive group chat for multiple agents", description: str = "An interactive group chat for multiple agents",
agents: List[Union[Agent, Callable]] = [], agents: List[Union[Agent, Callable]] = [],
max_loops: int = 1, max_loops: int = 1,
output_type: str = "string", output_type: str = "dict",
interactive: bool = False, interactive: bool = False,
speaker_function: Optional[Union[str, Callable]] = None, speaker_function: Optional[Union[str, Callable]] = None,
speaker_state: Optional[dict] = None, speaker_state: Optional[dict] = None,
): ):
"""
Initialize the InteractiveGroupChat.
Args:
id (str): Unique identifier for the group chat.
name (str): Name of the group chat.
description (str): Description of the group chat.
agents (List[Union[Agent, Callable]]): List of agent objects or callables.
max_loops (int): Maximum number of conversation loops per run.
output_type (str): Output format for conversation history.
interactive (bool): If True, enables interactive terminal session.
speaker_function (Optional[Union[str, Callable]]): Speaker selection strategy.
speaker_state (Optional[dict]): State/config for the speaker function.
"""
self.id = id self.id = id
self.name = name self.name = name
self.description = description self.description = description
@ -258,37 +246,49 @@ class InteractiveGroupChat:
self.max_loops = max_loops self.max_loops = max_loops
self.output_type = output_type self.output_type = output_type
self.interactive = interactive self.interactive = interactive
self.speaker_function = speaker_function
self.speaker_state = speaker_state
self.setup()
def _setup_speaker_function(self):
# Speaker function configuration # Speaker function configuration
if speaker_function is None: if self.speaker_function is None:
self.speaker_function = round_robin_speaker self.speaker_function = round_robin_speaker
elif isinstance(speaker_function, str): elif isinstance(self.speaker_function, str):
if speaker_function not in speaker_functions: if self.speaker_function not in speaker_functions:
available_functions = ", ".join( available_functions = ", ".join(
speaker_functions.keys() speaker_functions.keys()
) )
raise InvalidSpeakerFunctionError( raise InvalidSpeakerFunctionError(
f"Invalid speaker function: '{speaker_function}'. " f"Invalid speaker function: '{self.speaker_function}'. "
f"Available functions: {available_functions}" f"Available functions: {available_functions}"
) )
self.speaker_function = speaker_functions[ self.speaker_function = speaker_functions[
speaker_function self.speaker_function
] ]
elif callable(speaker_function): elif callable(self.speaker_function):
self.speaker_function = speaker_function self.speaker_function = self.speaker_function
else: else:
raise InvalidSpeakerFunctionError( raise InvalidSpeakerFunctionError(
"Speaker function must be either a string, callable, or None" "Speaker function must be either a string, callable, or None"
) )
self.speaker_state = speaker_state or {"current_index": 0} self.speaker_state = self.speaker_state or {
"current_index": 0
}
# Validate speaker function def setup(self):
self._validate_speaker_function() """
Set up the group chat, including speaker function, conversation history,
agent mapping, and prompt augmentation.
"""
# Initialize conversation history # Initialize conversation history
self.conversation = Conversation(time_enabled=True) self.conversation = Conversation(time_enabled=True)
self._setup_speaker_function()
self.agent_map = create_agent_map(self.agents) self.agent_map = create_agent_map(self.agents)
self._validate_initialization() self._validate_initialization()
@ -350,16 +350,6 @@ class InteractiveGroupChat:
# Validate the speaker function # Validate the speaker function
self._validate_speaker_function() self._validate_speaker_function()
def set_priorities(self, priorities: dict) -> None:
"""
Set agent priorities for priority-based speaking order.
Args:
priorities: Dictionary mapping agent names to priority weights
"""
self.speaker_state["priorities"] = priorities
logger.info(f"Agent priorities set: {priorities}")
def get_available_speaker_functions(self) -> List[str]: def get_available_speaker_functions(self) -> List[str]:
""" """
Get a list of available speaker function names. Get a list of available speaker function names.
@ -519,32 +509,6 @@ class InteractiveGroupChat:
"The session will continue. You can type 'exit' to end it." "The session will continue. You can type 'exit' to end it."
) )
def _validate_speaker_function(self) -> None:
"""
Validates the speaker function.
Raises:
InvalidSpeakerFunctionError: If the speaker function is invalid
"""
if not callable(self.speaker_function):
raise InvalidSpeakerFunctionError(
"Speaker function must be callable"
)
# Test the speaker function with a dummy list
try:
test_result = self.speaker_function(
["test_agent"], **self.speaker_state
)
if not isinstance(test_result, str):
raise InvalidSpeakerFunctionError(
"Speaker function must return a string"
)
except Exception as e:
raise InvalidSpeakerFunctionError(
f"Speaker function validation failed: {e}"
)
def _validate_initialization(self) -> None: def _validate_initialization(self) -> None:
""" """
Validates the group chat configuration. Validates the group chat configuration.
@ -561,7 +525,10 @@ class InteractiveGroupChat:
raise ValueError("Max loops must be greater than 0") raise ValueError("Max loops must be greater than 0")
def _setup_conversation_context(self) -> None: def _setup_conversation_context(self) -> None:
"""Sets up the initial conversation context with group chat information.""" """
Sets up the initial conversation context with group chat information.
Adds a system message describing the group and its agents.
"""
agent_info = [] agent_info = []
for agent in self.agents: for agent in self.agents:
if isinstance(agent, Agent): if isinstance(agent, Agent):
@ -581,7 +548,10 @@ class InteractiveGroupChat:
self.conversation.add(role="System", content=context) self.conversation.add(role="System", content=context)
def _update_agent_prompts(self) -> None: def _update_agent_prompts(self) -> None:
"""Updates each agent's system prompt with information about other agents and the group chat.""" """
Updates each agent's system prompt with information about other agents and the group chat.
This includes collaborative instructions and @mention usage guidelines.
"""
agent_info = [] agent_info = []
for agent in self.agents: for agent in self.agents:
if isinstance(agent, Agent): if isinstance(agent, Agent):
@ -696,7 +666,7 @@ Remember: You are part of a team. Your response should reflect that you've read,
List[str]: List of mentioned agent names or all agent names if no mentions List[str]: List of mentioned agent names or all agent names if no mentions
Raises: Raises:
InvalidtaskFormatError: If the task format is invalid InvalidTaskFormatError: If the task format is invalid
""" """
try: try:
# Find all @mentions using regex # Find all @mentions using regex
@ -812,6 +782,14 @@ Remember: You are part of a team. Your response should reflect that you've read,
) -> None: ) -> None:
""" """
Process responses using the dynamic speaker function. Process responses using the dynamic speaker function.
Args:
mentioned_agents (List[str]): List of agent names to consider for speaking.
img (Optional[str]): Optional image input for the agents.
imgs (Optional[List[str]]): Optional list of images for the agents.
Returns:
None
""" """
# Get strategy from speaker state (default to sequential) # Get strategy from speaker state (default to sequential)
strategy = self.speaker_state.get("strategy", "sequential") strategy = self.speaker_state.get("strategy", "sequential")
@ -882,6 +860,15 @@ Remember: You are part of a team. Your response should reflect that you've read,
) -> None: ) -> None:
""" """
Process speakers sequentially. Process speakers sequentially.
Args:
speakers (List[str]): List of agent names to process in order.
spoken_agents (set): Set of agent names that have already spoken.
img (Optional[str]): Optional image input for the agents.
imgs (Optional[List[str]]): Optional list of images for the agents.
Returns:
None
""" """
for next_speaker in speakers: for next_speaker in speakers:
if next_speaker in spoken_agents: if next_speaker in spoken_agents:
@ -903,6 +890,15 @@ Remember: You are part of a team. Your response should reflect that you've read,
) -> None: ) -> None:
""" """
Process speakers in parallel. Process speakers in parallel.
Args:
speakers (List[str]): List of agent names to process in parallel.
spoken_agents (set): Set of agent names that have already spoken.
img (Optional[str]): Optional image input for the agents.
imgs (Optional[List[str]]): Optional list of images for the agents.
Returns:
None
""" """
import concurrent.futures import concurrent.futures
@ -939,6 +935,14 @@ Remember: You are part of a team. Your response should reflect that you've read,
) -> None: ) -> None:
""" """
Process responses using a static speaker function. Process responses using a static speaker function.
Args:
mentioned_agents (List[str]): List of agent names to process.
img (Optional[str]): Optional image input for the agents.
imgs (Optional[List[str]]): Optional list of images for the agents.
Returns:
None
""" """
speaking_order = self._get_speaking_order(mentioned_agents) speaking_order = self._get_speaking_order(mentioned_agents)
logger.info(f"Speaking order determined: {speaking_order}") logger.info(f"Speaking order determined: {speaking_order}")
@ -956,6 +960,17 @@ Remember: You are part of a team. Your response should reflect that you've read,
""" """
Process a task and get responses from agents. If no agents are mentioned, Process a task and get responses from agents. If no agents are mentioned,
randomly selects agents to participate. randomly selects agents to participate.
Args:
task (str): The user input or task to process.
img (Optional[str]): Optional image input for the agents.
imgs (Optional[List[str]]): Optional list of images for the agents.
Returns:
str: The formatted conversation history (format depends on output_type).
Raises:
InteractiveGroupChatError: If an unexpected error occurs.
""" """
try: try:
# Extract mentioned agents (or all agents if none mentioned) # Extract mentioned agents (or all agents if none mentioned)
@ -982,11 +997,21 @@ Remember: You are part of a team. Your response should reflect that you've read,
) )
except Exception as e: except Exception as e:
logger.error(f"Unexpected error: {e}") logger.error(
f"InteractiveGroupChat: Unexpected error: {e} Traceback: {traceback.format_exc()}"
)
raise InteractiveGroupChatError( raise InteractiveGroupChatError(
f"Unexpected error occurred: {str(e)}" f"InteractiveGroupChat: Unexpected error occurred: {str(e)} Traceback: {traceback.format_exc()}"
) )
def __call__(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
):
return self.run(task=task, img=img, imgs=imgs)
def _get_agent_response( def _get_agent_response(
self, self,
agent_name: str, agent_name: str,
@ -997,12 +1022,15 @@ Remember: You are part of a team. Your response should reflect that you've read,
Get response from a specific agent. Get response from a specific agent.
Args: Args:
agent_name: Name of the agent to get response from agent_name (str): Name of the agent to get response from.
img: Optional image for the task img (Optional[str]): Optional image for the task.
imgs: Optional list of images for the task imgs (Optional[List[str]]): Optional list of images for the task.
Returns: Returns:
The agent's response or None if error Optional[str]: The agent's response or None if error.
Raises:
AgentNotFoundError: If the agent is not found.
""" """
agent = self.agent_map.get(agent_name) agent = self.agent_map.get(agent_name)
if not agent: if not agent:
@ -1063,20 +1091,3 @@ Remember: You are part of a collaborative team. Your response should demonstrate
return f"Error: Unable to generate response - {str(e)}" return f"Error: Unable to generate response - {str(e)}"
return None return None
def set_dynamic_strategy(self, strategy: str) -> None:
"""
Set the strategy for the random-dynamic-speaker function.
Args:
strategy: Either "sequential" or "parallel"
- "sequential": Process one agent at a time based on @mentions
- "parallel": Process all mentioned agents simultaneously
"""
if strategy not in ["sequential", "parallel"]:
raise ValueError(
"Strategy must be either 'sequential' or 'parallel'"
)
self.speaker_state["strategy"] = strategy
logger.info(f"Dynamic speaker strategy set to: {strategy}")

Loading…
Cancel
Save