diff --git a/group_chat_example.py b/group_chat_example.py new file mode 100644 index 00000000..1278e439 --- /dev/null +++ b/group_chat_example.py @@ -0,0 +1,36 @@ +from swarms.structs.agent import Agent +from swarms.structs.groupchat import GroupChat + + +if __name__ == "__main__": + + # Example agents + agent1 = Agent( + agent_name="Financial-Analysis-Agent", + system_prompt="You are a financial analyst specializing in investment strategies.", + model_name="gpt-4o", + max_loops=1, + dynamic_temperature_enabled=True, + ) + + agent2 = Agent( + agent_name="Tax-Adviser-Agent", + system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.", + model_name="gpt-4o", + max_loops=1, + dynamic_temperature_enabled=True, + ) + + agents = [agent1, agent2] + + chat = GroupChat( + name="Investment Advisory", + description="Financial and tax analysis group", + agents=agents, + max_loops=1, + ) + + history = chat.run( + "How to optimize tax strategy for investments?" + ) + print(history) diff --git a/majority_voting_example.py b/majority_voting_example.py index 047b4878..5c4edeb1 100644 --- a/majority_voting_example.py +++ b/majority_voting_example.py @@ -7,22 +7,22 @@ agents = [ agent_description="Personal finance advisor focused on market analysis", system_prompt="You are a financial advisor specializing in market analysis and investment opportunities.", max_loops=1, - model_name="gpt-4o" + model_name="gpt-4o", ), Agent( - agent_name="Risk-Assessment-Agent", + agent_name="Risk-Assessment-Agent", agent_description="Risk analysis and portfolio management expert", system_prompt="You are a risk assessment expert focused on evaluating investment risks and portfolio diversification.", max_loops=1, - model_name="gpt-4o" + model_name="gpt-4o", ), Agent( agent_name="Tech-Investment-Agent", agent_description="Technology sector investment specialist", system_prompt="You are a technology investment specialist focused on AI, emerging tech, and growth opportunities.", max_loops=1, - model_name="gpt-4o" - ) + model_name="gpt-4o", + ), ] @@ -31,7 +31,7 @@ consensus_agent = Agent( agent_description="Consensus agent focused on analyzing investment advice", system_prompt="You are a consensus agent focused on analyzing investment advice and providing a final answer.", max_loops=1, - model_name="gpt-4o" + model_name="gpt-4o", ) # Create majority voting system @@ -40,13 +40,12 @@ majority_voting = MajorityVoting( description="Multi-agent system for investment advice", agents=agents, verbose=True, - consensus_agent=consensus_agent + consensus_agent=consensus_agent, ) # Run the analysis with majority voting result = majority_voting.run( task="Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", - correct_answer="" # Optional evaluation metric ) print(result) diff --git a/pyproject.toml b/pyproject.toml index c2b703c0..fc570420 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "7.1.7" +version = "7.1.9" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] @@ -73,21 +73,9 @@ docstring_parser = "0.16" # TODO: tiktoken = "*" networkx = "*" aiofiles = "*" -# chromadb = "*" rich = "*" numpy = "*" litellm = "*" -# sentence-transformers = "*" - - -# # All optional dependencies for convenience -# all = [ -# "torch", -# "transformers", -# "litellm" -# ] - - [tool.poetry.scripts] swarms = "swarms.cli.main:main" diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 89d10fac..6a42f47e 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -14,9 +14,6 @@ from swarms.structs.graph_workflow import ( NodeType, ) from swarms.structs.groupchat import ( - AgentResponse, - ChatHistory, - ChatTurn, GroupChat, expertise_based, ) @@ -138,9 +135,6 @@ __all__ = [ "run_agents_with_tasks_concurrently", "showcase_available_agents", "GroupChat", - "ChatHistory", - "ChatTurn", - "AgentResponse", "expertise_based", "MultiAgentRouter", "MemeAgentGenerator", diff --git a/swarms/structs/conversation.py b/swarms/structs/conversation.py index a86a6d3b..ec648aed 100644 --- a/swarms/structs/conversation.py +++ b/swarms/structs/conversation.py @@ -405,10 +405,23 @@ class Conversation(BaseStructure): visible_messages.append(message) return visible_messages + def get_last_message_as_string(self): + # fetch the last message from the conversation history with the agent name and the message of the agent + return f"{self.conversation_history[-1]['role']}: {self.conversation_history[-1]['content']}" -# # Example usage -# conversation = Conversation() -# conversation.add("user", "Hello, how are you?") + def return_messages_as_list(self): + # we must concat the role and the content of the message + return [ + f"{message['role']}: {message['content']}" + for message in self.conversation_history + ] + + +# Example usage +conversation = Conversation() +conversation.add("user", "Hello, how are you?") +# print(conversation.get_last_message_as_string()) +# print(conversation.return_messages_as_list()) # conversation.add("assistant", "I am doing well, thanks.") # # print(conversation.to_json()) # print(type(conversation.to_dict())) diff --git a/swarms/structs/groupchat.py b/swarms/structs/groupchat.py index 05d96260..45e19639 100644 --- a/swarms/structs/groupchat.py +++ b/swarms/structs/groupchat.py @@ -1,14 +1,12 @@ import concurrent.futures from datetime import datetime -import os from typing import Callable, List -from dotenv import load_dotenv from loguru import logger from pydantic import BaseModel, Field -from swarm_models import OpenAIChat from swarms.structs.agent import Agent +from swarms.structs.conversation import Conversation class AgentResponse(BaseModel): @@ -20,21 +18,6 @@ class AgentResponse(BaseModel): preceding_context: List[str] = Field(default_factory=list) -class ChatTurn(BaseModel): - turn_number: int - responses: List[AgentResponse] - task: str - timestamp: datetime = Field(default_factory=datetime.now) - - -class ChatHistory(BaseModel): - turns: List[ChatTurn] - total_messages: int - name: str - description: str - start_time: datetime = Field(default_factory=datetime.now) - - SpeakerFunction = Callable[[List[str], "Agent"], bool] @@ -114,10 +97,129 @@ def most_recent(history: List[str], agent: Agent) -> bool: ) +def sentiment_based(history: List[str], agent: Agent) -> bool: + """ + Sentiment based speaker function. + An agent speaks if the last message has a sentiment matching their personality. + """ + if not history: + return True + + last_message = history[-1].lower() + positive_words = [ + "good", + "great", + "excellent", + "happy", + "positive", + ] + negative_words = [ + "bad", + "poor", + "terrible", + "unhappy", + "negative", + ] + + is_positive = any(word in last_message for word in positive_words) + is_negative = any(word in last_message for word in negative_words) + + # Assuming agent has a "personality" trait in description + agent_is_positive = "positive" in agent.description.lower() + + return is_positive if agent_is_positive else is_negative + + +def length_based(history: List[str], agent: Agent) -> bool: + """ + Length based speaker function. + An agent speaks if the last message is longer/shorter than a threshold. + """ + if not history: + return True + + last_message = history[-1] + threshold = 100 + + # Some agents prefer long messages, others short + prefers_long = "detailed" in agent.description.lower() + message_is_long = len(last_message) > threshold + + return message_is_long if prefers_long else not message_is_long + + +def question_based(history: List[str], agent: Agent) -> bool: + """ + Question based speaker function. + An agent speaks if the last message contains a question. + """ + if not history: + return True + + last_message = history[-1] + question_indicators = [ + "?", + "what", + "how", + "why", + "when", + "where", + "who", + ] + + return any( + indicator in last_message.lower() + for indicator in question_indicators + ) + + +def topic_based(history: List[str], agent: Agent) -> bool: + """ + Topic based speaker function. + An agent speaks if their expertise matches the current conversation topic. + """ + if not history: + return True + + # Look at last 3 messages to determine topic + recent_messages = history[-3:] if len(history) >= 3 else history + combined_text = " ".join(msg.lower() for msg in recent_messages) + + # Extract expertise topics from agent description + expertise_topics = [ + word.lower() + for word in agent.description.split() + if len(word) > 4 + ] # Simple topic extraction + + return any(topic in combined_text for topic in expertise_topics) + + class GroupChat: """ - GroupChat class to enable multiple agents to communicate in a synchronous group chat. - Each agent is aware of all other agents, every message exchanged, and the social context. + A class that manages conversations between multiple AI agents. + + This class facilitates group chat interactions between multiple agents, where agents + can communicate with each other based on a specified speaker function. It handles + conversation flow, message history, and agent coordination. + + Attributes: + name (str): Name of the group chat + description (str): Description of the group chat's purpose + agents (List[Agent]): List of Agent instances participating in the chat + speaker_fn (SpeakerFunction): Function determining which agents can speak + max_loops (int): Maximum number of conversation turns + conversation (Conversation): Stores the chat history + + Args: + name (str, optional): Name of the group chat. Defaults to "GroupChat". + description (str, optional): Description of the chat. Defaults to "A group chat for multiple agents". + agents (List[Agent], optional): List of participating agents. Defaults to empty list. + speaker_fn (SpeakerFunction, optional): Speaker selection function. Defaults to round_robin. + max_loops (int, optional): Maximum conversation turns. Defaults to 1. + + Raises: + ValueError: If invalid initialization parameters are provided """ def __init__( @@ -127,244 +229,277 @@ class GroupChat: agents: List[Agent] = [], speaker_fn: SpeakerFunction = round_robin, max_loops: int = 1, + rules: str = "", ): - """ - Initialize the GroupChat. - - Args: - name (str): Name of the group chat. - description (str): Description of the purpose of the group chat. - agents (List[Agent]): A list of agents participating in the chat. - speaker_fn (SpeakerFunction): The function to determine which agent should speak next. - max_loops (int): Maximum number of turns in the chat. - """ self.name = name self.description = description self.agents = agents self.speaker_fn = speaker_fn self.max_loops = max_loops - self.chat_history = ChatHistory( - turns=[], - total_messages=0, - name=name, - description=description, - ) - - def _get_response_sync( - self, agent: Agent, prompt: str, turn_number: int - ) -> AgentResponse: - """ - Get the response from an agent synchronously. - - Args: - agent (Agent): The agent responding. - prompt (str): The message triggering the response. - turn_number (int): The current turn number. + self.conversation = Conversation(time_enabled=True) + self.rules = rules - Returns: - AgentResponse: The agent's response captured in a structured format. - """ - try: - # Provide the agent with information about the chat and other agents - chat_info = f"Chat Name: {self.name}\nChat Description: {self.description}\nAgents in Chat: {[a.agent_name for a in self.agents]}" - context = f"""You are {agent.agent_name} - Conversation History: - \n{chat_info} - Other agents: {[a.agent_name for a in self.agents if a != agent]} - Previous messages: {self.get_full_chat_history()} - """ # Updated line - - message = agent.run( - task=f"From {agent.agent_name}: {context} \n {prompt}" - ) - return AgentResponse( - agent_name=agent.name, - role=agent.system_prompt, - message=message, - turn_number=turn_number, - preceding_context=self.get_recent_messages(3), - ) - except Exception as e: - logger.error(f"Error from {agent.name}: {e}") - return AgentResponse( - agent_name=agent.name, - role=agent.system_prompt, - message=f"Error generating response: {str(e)}", - turn_number=turn_number, - preceding_context=[], - ) + self.reliability_check() - def get_full_chat_history(self) -> str: + def reliability_check(self): """ - Get the full chat history formatted for agent context. + Validates the group chat configuration. - Returns: - str: The full chat history with sender names. + Raises: + ValueError: If any required components are missing or invalid """ - messages = [] - for turn in self.chat_history.turns: - for response in turn.responses: - messages.append( - f"{response.agent_name}: {response.message}" + if not self.agents: + raise ValueError("No agents provided") + if self.speaker_fn is None: + raise ValueError("No speaker function provided") + if self.max_loops <= 0: + raise ValueError("Max loops must be greater than 0") + for agent in self.agents: + if not isinstance(agent, Agent): + raise ValueError( + f"Invalid agent type: {type(agent)}. Must be Agent instance" ) - return "\n".join(messages) - def get_recent_messages(self, n: int = 3) -> List[str]: + def run(self, task: str, img: str = None, *args, **kwargs) -> str: """ - Get the most recent messages in the chat. + Executes a conversation between agents about the given task. Args: - n (int): The number of recent messages to retrieve. + task (str): The task or topic for agents to discuss + img (str, optional): Image input for the conversation. Defaults to None. + *args: Additional positional arguments + **kwargs: Additional keyword arguments Returns: - List[str]: The most recent messages in the chat. - """ - messages = [] - for turn in self.chat_history.turns[-n:]: - for response in turn.responses: - messages.append( - f"{response.agent_name}: {response.message}" - ) - return messages + str: Complete conversation history as a string - def run(self, task: str) -> ChatHistory: + Raises: + ValueError: If task is empty or invalid + Exception: If any error occurs during conversation """ - Run the group chat, feeding the context of previous turns into each new turn. + if not task or not isinstance(task, str): + raise ValueError("Task must be a non-empty string") - Args: - task (str): The initial message to start the chat. + # Initialize conversation with context + agent_context = f"Group Chat Name: {self.name}\nGroup Chat Description: {self.description}\nRules: {self.rules}\n Other agents: {', '.join([a.agent_name for a in self.agents])}" + self.conversation.add(role="system", content=agent_context) + self.conversation.add(role="User", content=task) + + print( + f"....... conversation history: \n {self.conversation.return_history_as_string()}" + ) - Returns: - ChatHistory: The history of the chat. - """ try: - logger.info( - f"Starting chat '{self.name}' with task: {task}" - ) + turn = 0 + consecutive_silent_turns = 0 + max_silent_turns = 2 # End conversation if no one speaks for this many turns + + while turn < self.max_loops: + context = self.conversation.return_messages_as_list() + + # Get agents who should speak this turn + speaking_agents = [ + agent + for agent in self.agents + if self.speaker_fn(context, agent) + ] + + if not speaking_agents: + consecutive_silent_turns += 1 + if consecutive_silent_turns >= max_silent_turns: + logger.debug( + "Multiple silent turns, ending conversation" + ) + break + continue - for turn in range(self.max_loops): - current_turn = ChatTurn( - turn_number=turn, responses=[], task=task + consecutive_silent_turns = ( + 0 # Reset counter when agents speak ) - # Get context from previous turns - context = self.get_full_chat_history() + # Process each speaking agent + for agent in speaking_agents: + try: + # Build context-aware prompt + prompt = ( + f"You're {agent.agent_name} participating in a group chat.\n" + f"Chat Purpose: {self.description}\n" + f"Current Discussion: {task}\n" + f"Chat History:\n{self.conversation.return_history_as_string()}\n" + f"As {agent.agent_name}, please provide your response:" + ) - # Combine task with context for agents - contextualized_task = ( - f"{task}\n\nPrevious conversation:\n{context}" - if context - else task - ) + print( + f"....... what the agent sees prompt: \n {prompt}" + ) - for agent in self.agents: - if self.speaker_fn( - self.get_recent_messages(), agent - ): - response = self._get_response_sync( - agent, contextualized_task, turn + message = agent.run( + task=prompt, + img=img, + *args, + **kwargs, ) - current_turn.responses.append(response) - self.chat_history.total_messages += 1 - logger.debug( - f"Turn {turn}, {agent.name} responded" + + if not message or message.isspace(): + logger.warning( + f"Empty response from {agent.agent_name}, skipping" + ) + continue + + self.conversation.add( + role=agent.agent_name, content=message ) - self.chat_history.turns.append(current_turn) + logger.info( + f"Turn {turn}, {agent.agent_name} responded" + ) + + except Exception as e: + logger.error( + f"Error from {agent.agent_name}: {e}" + ) + # Continue with other agents instead of crashing + continue + + turn += 1 + + # Check if conversation has reached a natural conclusion + last_messages = ( + context[-3:] if len(context) >= 3 else context + ) + if all( + "conclusion" in msg.lower() + for msg in last_messages + ): + logger.debug( + "Natural conversation conclusion detected" + ) + break + + return self.conversation.return_history_as_string() - return self.chat_history except Exception as e: logger.error(f"Error in chat: {e}") - raise e + raise - def batched_run(self, tasks: List[str], *args, **kwargs): + def batched_run( + self, tasks: List[str], *args, **kwargs + ) -> List[str]: """ - Run the group chat with a batch of tasks. + Runs multiple tasks in sequence. Args: - tasks (List[str]): The list of tasks to run in the chat. + tasks (List[str]): List of tasks to process + *args: Additional positional arguments + **kwargs: Additional keyword arguments Returns: - List[ChatHistory]: The history of each chat. + List[str]: List of conversation histories for each task + + Raises: + ValueError: If tasks list is empty or invalid """ + if not tasks or not isinstance(tasks, list): + raise ValueError( + "Tasks must be a non-empty list of strings" + ) return [self.run(task, *args, **kwargs) for task in tasks] - def concurrent_run(self, tasks: List[str], *args, **kwargs): + def concurrent_run( + self, tasks: List[str], *args, **kwargs + ) -> List[str]: """ - Run the group chat with a batch of tasks concurrently using a thread pool. + Runs multiple tasks concurrently using threads. Args: - tasks (List[str]): The list of tasks to run in the chat. + tasks (List[str]): List of tasks to process + *args: Additional positional arguments + **kwargs: Additional keyword arguments Returns: - List[ChatHistory]: The history of each chat. + List[str]: List of conversation histories for each task + + Raises: + ValueError: If tasks list is empty or invalid + RuntimeError: If concurrent execution fails """ - with concurrent.futures.ThreadPoolExecutor() as executor: - return list( - executor.map( - lambda task: self.run(task, *args, **kwargs), - tasks, - ) + if not tasks or not isinstance(tasks, list): + raise ValueError( + "Tasks must be a non-empty list of strings" ) + try: + with concurrent.futures.ThreadPoolExecutor() as executor: + return list( + executor.map( + lambda task: self.run(task, *args, **kwargs), + tasks, + ) + ) + except Exception as e: + logger.error(f"Error in concurrent execution: {e}") + raise RuntimeError( + f"Concurrent execution failed: {str(e)}" + ) -if __name__ == "__main__": - - load_dotenv() - - # Get the OpenAI API key from the environment variable - api_key = os.getenv("OPENAI_API_KEY") - - # Create an instance of the OpenAIChat class - model = OpenAIChat( - openai_api_key=api_key, - model_name="gpt-4o-mini", - temperature=0.1, - ) - - # Example agents - agent1 = Agent( - agent_name="Financial-Analysis-Agent", - system_prompt="You are a financial analyst specializing in investment strategies.", - llm=model, - max_loops=1, - autosave=False, - dashboard=False, - verbose=True, - dynamic_temperature_enabled=True, - user_name="swarms_corp", - retry_attempts=1, - context_length=200000, - output_type="string", - streaming_on=False, - ) - - agent2 = Agent( - agent_name="Tax-Adviser-Agent", - system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.", - llm=model, - max_loops=1, - autosave=False, - dashboard=False, - verbose=True, - dynamic_temperature_enabled=True, - user_name="swarms_corp", - retry_attempts=1, - context_length=200000, - output_type="string", - streaming_on=False, - ) - - agents = [agent1, agent2] - - chat = GroupChat( - name="Investment Advisory", - description="Financial and tax analysis group", - agents=agents, - speaker_fn=expertise_based, - ) - history = chat.run( - "How to optimize tax strategy for investments?" - ) - print(history.model_dump_json(indent=2)) +# if __name__ == "__main__": + +# load_dotenv() + +# # Get the OpenAI API key from the environment variable +# api_key = os.getenv("OPENAI_API_KEY") + +# # Create an instance of the OpenAIChat class +# model = OpenAIChat( +# openai_api_key=api_key, +# model_name="gpt-4o-mini", +# temperature=0.1, +# ) + +# # Example agents +# agent1 = Agent( +# agent_name="Financial-Analysis-Agent", +# system_prompt="You are a financial analyst specializing in investment strategies.", +# llm=model, +# max_loops=1, +# autosave=False, +# dashboard=False, +# verbose=True, +# dynamic_temperature_enabled=True, +# user_name="swarms_corp", +# retry_attempts=1, +# context_length=200000, +# output_type="string", +# streaming_on=False, +# ) + +# agent2 = Agent( +# agent_name="Tax-Adviser-Agent", +# system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.", +# llm=model, +# max_loops=1, +# autosave=False, +# dashboard=False, +# verbose=True, +# dynamic_temperature_enabled=True, +# user_name="swarms_corp", +# retry_attempts=1, +# context_length=200000, +# output_type="string", +# streaming_on=False, +# ) + +# agents = [agent1, agent2] + +# chat = GroupChat( +# name="Investment Advisory", +# description="Financial and tax analysis group", +# agents=agents, +# speaker_fn=expertise_based, +# ) + +# history = chat.run( +# "How to optimize tax strategy for investments?" +# ) +# print(history.model_dump_json(indent=2)) diff --git a/swarms/structs/majority_voting.py b/swarms/structs/majority_voting.py index dd8deac4..9f19e031 100644 --- a/swarms/structs/majority_voting.py +++ b/swarms/structs/majority_voting.py @@ -176,9 +176,7 @@ class MajorityVoting: title="Majority Voting", ) - def run( - self, task: str, correct_answer: str, *args, **kwargs - ) -> List[Any]: + def run(self, task: str, *args, **kwargs) -> List[Any]: """ Runs the majority voting system and returns the majority vote. @@ -205,7 +203,7 @@ class MajorityVoting: responses = self.conversation.return_history_as_string() print(responses) - + prompt = f"""Conduct a detailed majority voting analysis on the following conversation: {responses} @@ -224,9 +222,7 @@ class MajorityVoting: # If an output parser is provided, parse the responses if self.consensus_agent is not None: - majority_vote = self.consensus_agent.run( - prompt - ) + majority_vote = self.consensus_agent.run(prompt) self.conversation.add( self.consensus_agent.agent_name, majority_vote diff --git a/swarms/structs/task.py b/swarms/structs/task.py deleted file mode 100644 index fc73dea9..00000000 --- a/swarms/structs/task.py +++ /dev/null @@ -1,396 +0,0 @@ -import json -import sched -import time -from datetime import datetime -from typing import Any, Callable, ClassVar, Dict, List, Union - -from pydantic import BaseModel, Field - -from swarms.structs.agent import Agent -from swarms.structs.conversation import Conversation -from swarms.structs.omni_agent_types import AgentType -from typing import Optional -from swarms.utils.loguru_logger import initialize_logger - -logger = initialize_logger(log_folder="task") - - -class Task(BaseModel): - """ - Task class for running a task in a sequential workflow. - - Attributes: - description (str): Description of the task. - agent (Union[Callable, Agent]): Agent or callable object to run the task. - args (List[Any]): Arguments to pass to the agent or callable object. - kwargs (Dict[str, Any]): Keyword arguments to pass to the agent or callable object. - result (Any): Result of the task. - history (List[Any]): History of the task. - schedule_time (datetime): Time to schedule the task. - scheduler (sched.scheduler): Scheduler to schedule the task. - trigger (Callable): Trigger to run the task. - action (Callable): Action to run the task. - condition (Callable): Condition to run the task. - priority (int): Priority of the task. - dependencies (List[Task]): List of tasks that need to be completed before this task can be executed. - - Methods: - execute: Execute the task by calling the agent or model with the arguments and keyword arguments. - handle_scheduled_task: Handles the execution of a scheduled task. - set_trigger: Sets the trigger for the task. - set_action: Sets the action for the task. - set_condition: Sets the condition for the task. - is_completed: Checks whether the task has been completed. - add_dependency: Adds a task to the list of dependencies. - set_priority: Sets the priority of the task. - check_dependency_completion: Checks whether all the dependencies have been completed. - - - Examples: - >>> from swarms.structs import Task, Agent - >>> from swarm_models import OpenAIChat - >>> agent = Agent(llm=OpenAIChat(openai_api_key=""), max_loops=1, dashboard=False) - >>> task = Task(description="What's the weather in miami", agent=agent) - >>> task.run() - - >>> task.result - - """ - - name: Optional[str] = "Task" - description: Optional[str] = ( - "A task is a unit of work that needs to be completed for a workflow to progress." - ) - agent: Optional[Union[Callable, Agent, AgentType]] = Field( - None, - description="Agent or callable object to run the task", - ) - result: Optional[Any] = None - history: List[Any] = Field(default_factory=list) - schedule_time: Optional[datetime] = Field( - None, - description="Time to schedule the task", - ) - scheduler: ClassVar[sched.scheduler] = sched.scheduler( - time.time, time.sleep - ) - trigger: Optional[Callable] = Field( - None, - description="Trigger to run the task", - ) - action: Optional[Callable] = Field( - None, - description="Action to run the task", - ) - condition: Optional[Callable] = Field( - None, - description="Condition to run the task", - ) - priority: Optional[int] = Field( - 0.4, - description="Priority of the task", - ) - dependencies: List["Task"] = Field(default_factory=list) - args: List[Any] = Field(default_factory=list) - kwargs: Dict[str, Any] = Field(default_factory=dict) - - class Config: - arbitrary_types_allowed = True - - # We need to check that the agent exists - - def step(self, task: str = None, *args, **kwargs): - """ - Execute the task by calling the agent or model with the arguments and - keyword arguments. You can add images to the agent by passing the - path to the image as a keyword argument. - - - Examples: - >>> from swarms.structs import Task, Agent - >>> from swarm_models import OpenAIChat - >>> agent = Agent(llm=OpenAIChat(openai_api_key=""), max_loops=1, dashboard=False) - >>> task = Task(description="What's the weather in miami", agent=agent) - >>> task.run() - >>> task.result - - """ - - logger.info(f"Running task: {task}") - - # Check dependencies - if not self.check_dependency_completion(): - logger.info( - f"Task {self.description} is waiting for dependencies to complete" - ) - return None - - # Check the condition before executing the task - if self.condition is not None: - try: - condition_result = self.condition() - if not condition_result: - logger.info( - f"Completion not met for the task: {task} Skipping execution" - ) - return None - except Exception as error: - logger.error(f"[ERROR][Task] {error}") - return None - - # Execute the task - if self.trigger is None or self.trigger(): - try: - logger.info(f"Executing task: {task}") - self.result = self.agent.run(task, *args, **kwargs) - - # Ensure the result is either a string or a dict - if isinstance(self.result, str): - logger.info(f"Task result: {self.result}") - elif isinstance(self.result, dict): - logger.info(f"Task result: {self.result}") - else: - logger.error( - "Task result must be either a string or a dict" - ) - - # Add the result to the history - self.history.append(self.result) - - # If an action is specified, execute it - if self.action is not None: - try: - logger.info( - f"Executing action for task: {task}" - ) - self.action() - except Exception as error: - logger.error(f"[ERROR][Task] {error}") - except Exception as error: - logger.error(f"[ERROR][Task] {error}") - else: - logger.info(f"Task {task} is not triggered") - - def run(self, task: str = None, *args, **kwargs): - now = datetime.now() - - # If the task is scheduled for the future, schedule it - if self.schedule_time and self.schedule_time > now: - delay = (self.schedule_time - now).total_seconds() - logger.info( - f"Scheduling task: {self.description} for {self.schedule_time}" - ) - self.scheduler.enter( - delay, - 1, - self.step, - argument=(task, args, kwargs), - ) - self.scheduler.run() - - # We need to return the result - else: - # If no scheduling or the time has already passed run the task - return self.step(task, *args, **kwargs) - - def handle_scheduled_task(self): - """ - Handles the execution of a scheduled task. - - If the schedule time is not set or has already passed, the task is executed immediately. - Otherwise, the task is scheduled to be executed at the specified schedule time. - """ - logger.info( - f"[INFO][Task] Handling scheduled task: {self.description}" - ) - try: - if ( - self.schedule_time is None - or self.schedule_time <= datetime.now() - ): - self.execute() - - else: - delay = ( - self.schedule_time - datetime.now() - ).total_seconds() - self.scheduler.enter(delay, 1, self.execute) - self.scheduler_run() - except Exception as error: - logger.error(f"[ERROR][Task] {error}") - - def set_trigger(self, trigger: Callable): - """ - Sets the trigger for the task. - - Args: - trigger (Callable): The trigger to set. - """ - self.trigger = trigger - - def set_action(self, action: Callable): - """ - Sets the action for the task. - - Args: - action (Callable): The action to set. - """ - self.action = action - - def set_condition(self, condition: Callable): - """ - Sets the condition for the task. - - Args: - condition (Callable): The condition to set. - """ - self.condition = condition - - def is_completed(self): - """Is the task completed? - - Returns: - _type_: _description_ - """ - return self.result is not None - - def add_dependency(self, task): - """Adds a task to the list of dependencies. - - Args: - task (_type_): _description_ - """ - self.dependencies.append(task) - - def set_priority(self, priority: int): - """Sets the priority of the task. - - Args: - priority (int): _description_ - """ - self.priority = priority - - def check_dependency_completion(self): - """ - Checks whether all the dependencies have been completed. - - Returns: - bool: True if all the dependencies have been completed, False otherwise. - """ - logger.info("[INFO][Task] Checking dependency completion") - try: - for task in self.dependencies: - if not task.is_completed(): - return False - except Exception as error: - logger.error( - f"[ERROR][Task][check_dependency_completion] {error}" - ) - - def context( - self, - task: "Task" = None, - context: List["Task"] = None, - *args, - **kwargs, - ): - """ - Set the context for the task. - - Args: - context (str): The context to set. - """ - # For sequential workflow, sequentially add the context of the previous task in the list - new_context = Conversation(time_enabled=True, *args, **kwargs) - - if context: - for task in context: - description = ( - task.description - if task.description is not None - else "" - ) - - result = ( - task.result if task.result is not None else "" - ) - - # Add the context of the task to the conversation - new_context.add( - task.agent.agent_name, f"{description} {result}" - ) - - elif task: - description = ( - task.description - if task.description is not None - else "" - ) - result = task.result if task.result is not None else "" - new_context.add( - task.agent.agent_name, f"{description} {result}" - ) - - prompt = new_context.return_history_as_string() - - # Add to history - return self.history.append(prompt) - - def to_dict(self): - """ - Convert the task to a dictionary. - - Returns: - dict: The task as a dictionary. - """ - return self.model_dump_json(indent=4) - - def save_to_file(self, file_path: str): - """ - Save the task to a file. - - Args: - file_path (str): The path to the file to save the task to. - """ - with open(file_path, "w") as file: - file.write(self.to_json(indent=4)) - - @classmethod - def load_from_file(cls, file_path: str): - """ - Load a task from a file. - - Args: - file_path (str): The path to the file to load the task from. - - Returns: - Task: The task loaded from the file. - """ - with open(file_path, "r") as file: - task_dict = json.load(file) - return Task(**task_dict) - - def schedule_task_with_sched( - function: Callable, run_date: datetime - ) -> None: - now = datetime.now() - - if run_date <= now: - raise ValueError("run_date must be in the future") - - # Calculate the delay in seconds - delay = (run_date - now).total_seconds() - - scheduler = sched.scheduler(time.time, time.sleep) - - # Schedule the function - scheduler.enter(delay, 1, function) - - # Start the scheduler - scheduler.run(delay, 1, function) - - # Start the scheduler - logger.info(f"Task scheduled for {run_date}") - scheduler.run() - - return None diff --git a/tests/structs/test_groupchat.py b/tests/structs/test_groupchat.py index 08bceec5..a9146160 100644 --- a/tests/structs/test_groupchat.py +++ b/tests/structs/test_groupchat.py @@ -1,147 +1,388 @@ -import os -from dotenv import load_dotenv -from swarm_models import OpenAIChat from swarms.structs.agent import Agent -from swarms.structs.groupchat import GroupChat, expertise_based - - -def setup_test_agents(): - model = OpenAIChat( - openai_api_key=os.getenv("OPENAI_API_KEY"), - model_name="gpt-4", - temperature=0.1, - ) - - return [ - Agent( - agent_name="Agent1", - system_prompt="You only respond with 'A'", - llm=model, +from swarms.structs.groupchat import ( + GroupChat, + round_robin, + expertise_based, + random_selection, + sentiment_based, + length_based, + question_based, + topic_based, +) +from datetime import datetime +import time + + +class TestReport: + def __init__(self): + self.results = [] + self.start_time = None + self.end_time = None + + def add_result(self, test_name, passed, message="", duration=0): + self.results.append( + { + "test_name": test_name, + "passed": passed, + "message": message, + "duration": duration, + } + ) + + def start(self): + self.start_time = datetime.now() + + def end(self): + self.end_time = datetime.now() + + def generate_report(self): + total_tests = len(self.results) + passed_tests = sum(1 for r in self.results if r["passed"]) + failed_tests = total_tests - passed_tests + duration = (self.end_time - self.start_time).total_seconds() + + report = "\n" + "=" * 50 + "\n" + report += "GROUP CHAT TEST SUITE REPORT\n" + report += "=" * 50 + "\n\n" + report += f"Test Run: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}\n" + report += f"Duration: {duration:.2f} seconds\n" + report += f"Total Tests: {total_tests}\n" + report += f"Passed: {passed_tests}\n" + report += f"Failed: {failed_tests}\n" + report += ( + f"Success Rate: {(passed_tests/total_tests)*100:.1f}%\n\n" + ) + + report += "Detailed Test Results:\n" + report += "-" * 50 + "\n" + + for result in self.results: + status = "✓" if result["passed"] else "✗" + report += f"{status} {result['test_name']} ({result['duration']:.2f}s)\n" + if result["message"]: + report += f" {result['message']}\n" + + return report + + +def create_test_agents(num_agents, diverse_prompts=False): + """Helper function to create test agents with diverse prompts""" + agents = [] + specialties = [ + ( + "Finance", + "You are a financial expert focusing on investment strategies and market analysis. Be concise and data-driven in your responses.", ), - Agent( - agent_name="Agent2", - system_prompt="You only respond with 'B'", - llm=model, + ( + "Tech", + "You are a technology expert specializing in AI and cybersecurity. Use technical terms and provide practical examples.", ), - Agent( - agent_name="Agent3", - system_prompt="You only respond with 'C'", - llm=model, + ( + "Healthcare", + "You are a healthcare professional with expertise in public health. Focus on evidence-based information and patient care.", + ), + ( + "Marketing", + "You are a marketing strategist focusing on digital trends. Be creative and audience-focused in your responses.", + ), + ( + "Legal", + "You are a legal expert specializing in corporate law. Be precise and reference relevant regulations.", ), ] + for i in range(num_agents): + specialty, base_prompt = specialties[i % len(specialties)] + if diverse_prompts: + # Add personality traits and communication style to make responses more diverse + traits = [ + "Be analytical and data-focused", + "Use analogies and examples", + "Be concise and direct", + "Ask thought-provoking questions", + "Provide practical applications", + ] + prompt = f"{base_prompt} {traits[i % len(traits)]}" + else: + prompt = base_prompt + + agents.append( + Agent( + agent_name=f"{specialty}-Agent-{i+1}", + system_prompt=prompt, + model_name="gpt-4", + max_loops=1, + temperature=0.7, # Add temperature to increase response variety + ) + ) + return agents + + +def test_basic_groupchat(report): + """Test basic GroupChat initialization and conversation""" + start_time = time.time() + + try: + agents = create_test_agents(2) + chat = GroupChat( + name="Test Chat", + description="A test group chat", + agents=agents, + max_loops=2, + ) + + result = chat.run("Say hello!") + report.add_result( + "Basic GroupChat Test", + True, + duration=time.time() - start_time, + ) + + except Exception as e: + report.add_result( + "Basic GroupChat Test", + False, + message=str(e), + duration=time.time() - start_time, + ) + + +def test_speaker_functions(report): + """Test all available speaker functions with enhanced prompts""" + speaker_functions = { + "round_robin": ( + round_robin, + "What are your thoughts on sustainable practices?", + ), + "expertise_based": ( + expertise_based, + "Discuss the impact of AI on your field.", + ), + "random_selection": ( + random_selection, + "How do you approach problem-solving?", + ), + "sentiment_based": ( + sentiment_based, + "Share your positive outlook on future trends.", + ), + "length_based": ( + length_based, + "Provide a detailed analysis of recent developments.", + ), + "question_based": ( + question_based, + "What challenges do you foresee in your industry?", + ), + "topic_based": ( + topic_based, + "How does digital transformation affect your sector?", + ), + } -def test_round_robin_speaking(): - chat = GroupChat(agents=setup_test_agents()) - history = chat.run("Say your letter") - - # Verify agents speak in order - responses = [ - r.message for t in history.turns for r in t.responses - ] - assert responses == ["A", "B", "C"] * (len(history.turns)) - - -def test_concurrent_processing(): - chat = GroupChat(agents=setup_test_agents()) - tasks = ["Task1", "Task2", "Task3"] - histories = chat.concurrent_run(tasks) - - assert len(histories) == len(tasks) - for history in histories: - assert history.total_messages > 0 - - -def test_expertise_based_speaking(): - agents = setup_test_agents() - chat = GroupChat(agents=agents, speaker_fn=expertise_based) - - # Test each agent's expertise trigger - for agent in agents: - history = chat.run(f"Trigger {agent.system_prompt}") - first_response = history.turns[0].responses[0] - assert first_response.agent_name == agent.agent_name - - -def test_max_loops_limit(): - max_loops = 3 - chat = GroupChat(agents=setup_test_agents(), max_loops=max_loops) - history = chat.run("Test message") - - assert len(history.turns) == max_loops - - -def test_error_handling(): - broken_agent = Agent( - agent_name="BrokenAgent", - system_prompt="You raise errors", - llm=None, - ) - - chat = GroupChat(agents=[broken_agent]) - history = chat.run("Trigger error") + for name, (func, prompt) in speaker_functions.items(): + start_time = time.time() + try: + # Create agents with diverse prompts for this test + agents = create_test_agents(3, diverse_prompts=True) + chat = GroupChat( + name=f"{name.title()} Test", + description=f"Testing {name} speaker function with diverse responses", + agents=agents, + speaker_fn=func, + max_loops=2, + rules="1. Be unique in your responses\n2. Build on others' points\n3. Stay relevant to your expertise", + ) + + result = chat.run(prompt) + report.add_result( + f"Speaker Function - {name}", + True, + duration=time.time() - start_time, + ) - assert "Error" in history.turns[0].responses[0].message + except Exception as e: + report.add_result( + f"Speaker Function - {name}", + False, + message=str(e), + duration=time.time() - start_time, + ) -def test_conversation_context(): - agents = setup_test_agents() - complex_prompt = "Previous message refers to A. Now trigger B. Finally discuss C." +def test_varying_agent_counts(report): + """Test GroupChat with different numbers of agents""" + agent_counts = [1, 3, 5, 7] - chat = GroupChat(agents=agents, speaker_fn=expertise_based) - history = chat.run(complex_prompt) + for count in agent_counts: + start_time = time.time() + try: + agents = create_test_agents(count) + chat = GroupChat( + name=f"{count}-Agent Test", agents=agents, max_loops=2 + ) + + result = chat.run("Introduce yourselves briefly.") + report.add_result( + f"Agent Count Test - {count} agents", + True, + duration=time.time() - start_time, + ) - responses = [ - r.agent_name for t in history.turns for r in t.responses + except Exception as e: + report.add_result( + f"Agent Count Test - {count} agents", + False, + message=str(e), + duration=time.time() - start_time, + ) + + +def test_error_cases(report): + """Test error handling with expanded cases""" + test_cases = [ + ("Empty Agents List", lambda: GroupChat(agents=[])), + ( + "Invalid Max Loops", + lambda: GroupChat( + agents=[create_test_agents(1)[0]], max_loops=0 + ), + ), + ( + "Empty Task", + lambda: GroupChat(agents=[create_test_agents(1)[0]]).run( + "" + ), + ), + ( + "None Task", + lambda: GroupChat(agents=[create_test_agents(1)[0]]).run( + None + ), + ), + ( + "Invalid Speaker Function", + lambda: GroupChat( + agents=[create_test_agents(1)[0]], + speaker_fn=lambda x, y: "not a boolean", # This should raise ValueError + ), + ), ] - assert all(agent.agent_name in responses for agent in agents) - - -def test_large_agent_group(): - large_group = setup_test_agents() * 5 # 15 agents - chat = GroupChat(agents=large_group) - history = chat.run("Test scaling") - - assert history.total_messages > len(large_group) - - -def test_long_conversations(): - chat = GroupChat(agents=setup_test_agents(), max_loops=50) - history = chat.run("Long conversation test") - - assert len(history.turns) == 50 - assert history.total_messages > 100 + for name, test_func in test_cases: + start_time = time.time() + try: + test_func() + report.add_result( + f"Error Case - {name}", + False, + message="Expected ValueError not raised", + duration=time.time() - start_time, + ) + except ( + ValueError, + TypeError, + ): # Include TypeError for invalid speaker function + report.add_result( + f"Error Case - {name}", + True, + duration=time.time() - start_time, + ) + except Exception as e: + report.add_result( + f"Error Case - {name}", + False, + message=f"Unexpected error: {str(e)}", + duration=time.time() - start_time, + ) + + +def test_concurrent_execution(report): + """Test concurrent execution with various task counts""" + start_time = time.time() + + try: + agents = create_test_agents(3) + chat = GroupChat( + name="Concurrent Test", agents=agents, max_loops=1 + ) + + tasks = [ + "Task 1: Introduce yourself", + "Task 2: What's your specialty?", + "Task 3: How can you help?", + "Task 4: What are your limitations?", + "Task 5: Give an example of your expertise", + ] + + results = chat.concurrent_run(tasks) + report.add_result( + "Concurrent Execution Test", + True, + message=f"Successfully completed {len(results)} tasks", + duration=time.time() - start_time, + ) + + except Exception as e: + report.add_result( + "Concurrent Execution Test", + False, + message=str(e), + duration=time.time() - start_time, + ) + + +def test_conversation_rules(report): + """Test GroupChat with different conversation rules""" + start_time = time.time() + + try: + agents = create_test_agents(3, diverse_prompts=True) + chat = GroupChat( + name="Rules Test", + description="Testing conversation with specific rules", + agents=agents, + max_loops=2, + rules=""" + 1. Keep responses under 50 words + 2. Always be professional + 3. Stay on topic + 4. Provide unique perspectives + 5. Build on previous responses + """, + ) + + result = chat.run( + "How can we ensure ethical AI development across different sectors?" + ) + report.add_result( + "Conversation Rules Test", + True, + duration=time.time() - start_time, + ) + + except Exception as e: + report.add_result( + "Conversation Rules Test", + False, + message=str(e), + duration=time.time() - start_time, + ) -def test_stress_batched_runs(): - chat = GroupChat(agents=setup_test_agents()) - tasks = ["Task"] * 100 - histories = chat.batched_run(tasks) - assert len(histories) == len(tasks) - total_messages = sum(h.total_messages for h in histories) - assert total_messages > len(tasks) * 3 +if __name__ == "__main__": + report = TestReport() + report.start() + print("Starting Enhanced GroupChat Test Suite...\n") -if __name__ == "__main__": - load_dotenv() - - functions = [ - test_round_robin_speaking, - test_concurrent_processing, - test_expertise_based_speaking, - test_max_loops_limit, - test_error_handling, - test_conversation_context, - test_large_agent_group, - test_long_conversations, - test_stress_batched_runs, - ] + # Run all tests + test_basic_groupchat(report) + test_speaker_functions(report) + test_varying_agent_counts(report) + test_error_cases(report) + test_concurrent_execution(report) + test_conversation_rules(report) - for func in functions: - try: - print(f"Running {func.__name__}...") - func() - print("✓ Passed") - except Exception as e: - print(f"✗ Failed: {str(e)}") + report.end() + print(report.generate_report())