pull/776/head
Kye Gomez 2 months ago
parent d8e06900a4
commit 43c5845bbd

@ -0,0 +1,36 @@
from swarms.structs.agent import Agent
from swarms.structs.groupchat import GroupChat
if __name__ == "__main__":
# Example agents
agent1 = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt="You are a financial analyst specializing in investment strategies.",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
)
agent2 = Agent(
agent_name="Tax-Adviser-Agent",
system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
)
agents = [agent1, agent2]
chat = GroupChat(
name="Investment Advisory",
description="Financial and tax analysis group",
agents=agents,
max_loops=1,
)
history = chat.run(
"How to optimize tax strategy for investments?"
)
print(history)

@ -7,22 +7,22 @@ agents = [
agent_description="Personal finance advisor focused on market analysis",
system_prompt="You are a financial advisor specializing in market analysis and investment opportunities.",
max_loops=1,
model_name="gpt-4o"
model_name="gpt-4o",
),
Agent(
agent_name="Risk-Assessment-Agent",
agent_description="Risk analysis and portfolio management expert",
system_prompt="You are a risk assessment expert focused on evaluating investment risks and portfolio diversification.",
max_loops=1,
model_name="gpt-4o"
model_name="gpt-4o",
),
Agent(
agent_name="Tech-Investment-Agent",
agent_description="Technology sector investment specialist",
system_prompt="You are a technology investment specialist focused on AI, emerging tech, and growth opportunities.",
max_loops=1,
model_name="gpt-4o"
)
model_name="gpt-4o",
),
]
@ -31,7 +31,7 @@ consensus_agent = Agent(
agent_description="Consensus agent focused on analyzing investment advice",
system_prompt="You are a consensus agent focused on analyzing investment advice and providing a final answer.",
max_loops=1,
model_name="gpt-4o"
model_name="gpt-4o",
)
# Create majority voting system
@ -40,13 +40,12 @@ majority_voting = MajorityVoting(
description="Multi-agent system for investment advice",
agents=agents,
verbose=True,
consensus_agent=consensus_agent
consensus_agent=consensus_agent,
)
# Run the analysis with majority voting
result = majority_voting.run(
task="Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
correct_answer="" # Optional evaluation metric
)
print(result)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "7.1.7"
version = "7.1.9"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]
@ -73,21 +73,9 @@ docstring_parser = "0.16" # TODO:
tiktoken = "*"
networkx = "*"
aiofiles = "*"
# chromadb = "*"
rich = "*"
numpy = "*"
litellm = "*"
# sentence-transformers = "*"
# # All optional dependencies for convenience
# all = [
# "torch",
# "transformers",
# "litellm"
# ]
[tool.poetry.scripts]
swarms = "swarms.cli.main:main"

@ -14,9 +14,6 @@ from swarms.structs.graph_workflow import (
NodeType,
)
from swarms.structs.groupchat import (
AgentResponse,
ChatHistory,
ChatTurn,
GroupChat,
expertise_based,
)
@ -138,9 +135,6 @@ __all__ = [
"run_agents_with_tasks_concurrently",
"showcase_available_agents",
"GroupChat",
"ChatHistory",
"ChatTurn",
"AgentResponse",
"expertise_based",
"MultiAgentRouter",
"MemeAgentGenerator",

@ -405,10 +405,23 @@ class Conversation(BaseStructure):
visible_messages.append(message)
return visible_messages
def get_last_message_as_string(self):
# fetch the last message from the conversation history with the agent name and the message of the agent
return f"{self.conversation_history[-1]['role']}: {self.conversation_history[-1]['content']}"
# # Example usage
# conversation = Conversation()
# conversation.add("user", "Hello, how are you?")
def return_messages_as_list(self):
# we must concat the role and the content of the message
return [
f"{message['role']}: {message['content']}"
for message in self.conversation_history
]
# Example usage
conversation = Conversation()
conversation.add("user", "Hello, how are you?")
# print(conversation.get_last_message_as_string())
# print(conversation.return_messages_as_list())
# conversation.add("assistant", "I am doing well, thanks.")
# # print(conversation.to_json())
# print(type(conversation.to_dict()))

@ -1,14 +1,12 @@
import concurrent.futures
from datetime import datetime
import os
from typing import Callable, List
from dotenv import load_dotenv
from loguru import logger
from pydantic import BaseModel, Field
from swarm_models import OpenAIChat
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
class AgentResponse(BaseModel):
@ -20,21 +18,6 @@ class AgentResponse(BaseModel):
preceding_context: List[str] = Field(default_factory=list)
class ChatTurn(BaseModel):
turn_number: int
responses: List[AgentResponse]
task: str
timestamp: datetime = Field(default_factory=datetime.now)
class ChatHistory(BaseModel):
turns: List[ChatTurn]
total_messages: int
name: str
description: str
start_time: datetime = Field(default_factory=datetime.now)
SpeakerFunction = Callable[[List[str], "Agent"], bool]
@ -114,10 +97,129 @@ def most_recent(history: List[str], agent: Agent) -> bool:
)
def sentiment_based(history: List[str], agent: Agent) -> bool:
"""
Sentiment based speaker function.
An agent speaks if the last message has a sentiment matching their personality.
"""
if not history:
return True
last_message = history[-1].lower()
positive_words = [
"good",
"great",
"excellent",
"happy",
"positive",
]
negative_words = [
"bad",
"poor",
"terrible",
"unhappy",
"negative",
]
is_positive = any(word in last_message for word in positive_words)
is_negative = any(word in last_message for word in negative_words)
# Assuming agent has a "personality" trait in description
agent_is_positive = "positive" in agent.description.lower()
return is_positive if agent_is_positive else is_negative
def length_based(history: List[str], agent: Agent) -> bool:
"""
Length based speaker function.
An agent speaks if the last message is longer/shorter than a threshold.
"""
if not history:
return True
last_message = history[-1]
threshold = 100
# Some agents prefer long messages, others short
prefers_long = "detailed" in agent.description.lower()
message_is_long = len(last_message) > threshold
return message_is_long if prefers_long else not message_is_long
def question_based(history: List[str], agent: Agent) -> bool:
"""
Question based speaker function.
An agent speaks if the last message contains a question.
"""
if not history:
return True
last_message = history[-1]
question_indicators = [
"?",
"what",
"how",
"why",
"when",
"where",
"who",
]
return any(
indicator in last_message.lower()
for indicator in question_indicators
)
def topic_based(history: List[str], agent: Agent) -> bool:
"""
Topic based speaker function.
An agent speaks if their expertise matches the current conversation topic.
"""
if not history:
return True
# Look at last 3 messages to determine topic
recent_messages = history[-3:] if len(history) >= 3 else history
combined_text = " ".join(msg.lower() for msg in recent_messages)
# Extract expertise topics from agent description
expertise_topics = [
word.lower()
for word in agent.description.split()
if len(word) > 4
] # Simple topic extraction
return any(topic in combined_text for topic in expertise_topics)
class GroupChat:
"""
GroupChat class to enable multiple agents to communicate in a synchronous group chat.
Each agent is aware of all other agents, every message exchanged, and the social context.
A class that manages conversations between multiple AI agents.
This class facilitates group chat interactions between multiple agents, where agents
can communicate with each other based on a specified speaker function. It handles
conversation flow, message history, and agent coordination.
Attributes:
name (str): Name of the group chat
description (str): Description of the group chat's purpose
agents (List[Agent]): List of Agent instances participating in the chat
speaker_fn (SpeakerFunction): Function determining which agents can speak
max_loops (int): Maximum number of conversation turns
conversation (Conversation): Stores the chat history
Args:
name (str, optional): Name of the group chat. Defaults to "GroupChat".
description (str, optional): Description of the chat. Defaults to "A group chat for multiple agents".
agents (List[Agent], optional): List of participating agents. Defaults to empty list.
speaker_fn (SpeakerFunction, optional): Speaker selection function. Defaults to round_robin.
max_loops (int, optional): Maximum conversation turns. Defaults to 1.
Raises:
ValueError: If invalid initialization parameters are provided
"""
def __init__(
@ -127,178 +229,206 @@ class GroupChat:
agents: List[Agent] = [],
speaker_fn: SpeakerFunction = round_robin,
max_loops: int = 1,
rules: str = "",
):
"""
Initialize the GroupChat.
Args:
name (str): Name of the group chat.
description (str): Description of the purpose of the group chat.
agents (List[Agent]): A list of agents participating in the chat.
speaker_fn (SpeakerFunction): The function to determine which agent should speak next.
max_loops (int): Maximum number of turns in the chat.
"""
self.name = name
self.description = description
self.agents = agents
self.speaker_fn = speaker_fn
self.max_loops = max_loops
self.chat_history = ChatHistory(
turns=[],
total_messages=0,
name=name,
description=description,
self.conversation = Conversation(time_enabled=True)
self.rules = rules
self.reliability_check()
def reliability_check(self):
"""
Validates the group chat configuration.
Raises:
ValueError: If any required components are missing or invalid
"""
if not self.agents:
raise ValueError("No agents provided")
if self.speaker_fn is None:
raise ValueError("No speaker function provided")
if self.max_loops <= 0:
raise ValueError("Max loops must be greater than 0")
for agent in self.agents:
if not isinstance(agent, Agent):
raise ValueError(
f"Invalid agent type: {type(agent)}. Must be Agent instance"
)
def _get_response_sync(
self, agent: Agent, prompt: str, turn_number: int
) -> AgentResponse:
def run(self, task: str, img: str = None, *args, **kwargs) -> str:
"""
Get the response from an agent synchronously.
Executes a conversation between agents about the given task.
Args:
agent (Agent): The agent responding.
prompt (str): The message triggering the response.
turn_number (int): The current turn number.
task (str): The task or topic for agents to discuss
img (str, optional): Image input for the conversation. Defaults to None.
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Returns:
AgentResponse: The agent's response captured in a structured format.
str: Complete conversation history as a string
Raises:
ValueError: If task is empty or invalid
Exception: If any error occurs during conversation
"""
try:
# Provide the agent with information about the chat and other agents
chat_info = f"Chat Name: {self.name}\nChat Description: {self.description}\nAgents in Chat: {[a.agent_name for a in self.agents]}"
context = f"""You are {agent.agent_name}
Conversation History:
\n{chat_info}
Other agents: {[a.agent_name for a in self.agents if a != agent]}
Previous messages: {self.get_full_chat_history()}
""" # Updated line
if not task or not isinstance(task, str):
raise ValueError("Task must be a non-empty string")
message = agent.run(
task=f"From {agent.agent_name}: {context} \n {prompt}"
)
return AgentResponse(
agent_name=agent.name,
role=agent.system_prompt,
message=message,
turn_number=turn_number,
preceding_context=self.get_recent_messages(3),
)
except Exception as e:
logger.error(f"Error from {agent.name}: {e}")
return AgentResponse(
agent_name=agent.name,
role=agent.system_prompt,
message=f"Error generating response: {str(e)}",
turn_number=turn_number,
preceding_context=[],
# Initialize conversation with context
agent_context = f"Group Chat Name: {self.name}\nGroup Chat Description: {self.description}\nRules: {self.rules}\n Other agents: {', '.join([a.agent_name for a in self.agents])}"
self.conversation.add(role="system", content=agent_context)
self.conversation.add(role="User", content=task)
print(
f"....... conversation history: \n {self.conversation.return_history_as_string()}"
)
def get_full_chat_history(self) -> str:
"""
Get the full chat history formatted for agent context.
try:
turn = 0
consecutive_silent_turns = 0
max_silent_turns = 2 # End conversation if no one speaks for this many turns
while turn < self.max_loops:
context = self.conversation.return_messages_as_list()
# Get agents who should speak this turn
speaking_agents = [
agent
for agent in self.agents
if self.speaker_fn(context, agent)
]
if not speaking_agents:
consecutive_silent_turns += 1
if consecutive_silent_turns >= max_silent_turns:
logger.debug(
"Multiple silent turns, ending conversation"
)
break
continue
Returns:
str: The full chat history with sender names.
"""
messages = []
for turn in self.chat_history.turns:
for response in turn.responses:
messages.append(
f"{response.agent_name}: {response.message}"
consecutive_silent_turns = (
0 # Reset counter when agents speak
)
return "\n".join(messages)
def get_recent_messages(self, n: int = 3) -> List[str]:
"""
Get the most recent messages in the chat.
# Process each speaking agent
for agent in speaking_agents:
try:
# Build context-aware prompt
prompt = (
f"You're {agent.agent_name} participating in a group chat.\n"
f"Chat Purpose: {self.description}\n"
f"Current Discussion: {task}\n"
f"Chat History:\n{self.conversation.return_history_as_string()}\n"
f"As {agent.agent_name}, please provide your response:"
)
Args:
n (int): The number of recent messages to retrieve.
print(
f"....... what the agent sees prompt: \n {prompt}"
)
Returns:
List[str]: The most recent messages in the chat.
"""
messages = []
for turn in self.chat_history.turns[-n:]:
for response in turn.responses:
messages.append(
f"{response.agent_name}: {response.message}"
message = agent.run(
task=prompt,
img=img,
*args,
**kwargs,
)
return messages
def run(self, task: str) -> ChatHistory:
"""
Run the group chat, feeding the context of previous turns into each new turn.
if not message or message.isspace():
logger.warning(
f"Empty response from {agent.agent_name}, skipping"
)
continue
Args:
task (str): The initial message to start the chat.
self.conversation.add(
role=agent.agent_name, content=message
)
Returns:
ChatHistory: The history of the chat.
"""
try:
logger.info(
f"Starting chat '{self.name}' with task: {task}"
f"Turn {turn}, {agent.agent_name} responded"
)
for turn in range(self.max_loops):
current_turn = ChatTurn(
turn_number=turn, responses=[], task=task
except Exception as e:
logger.error(
f"Error from {agent.agent_name}: {e}"
)
# Continue with other agents instead of crashing
continue
# Get context from previous turns
context = self.get_full_chat_history()
turn += 1
# Combine task with context for agents
contextualized_task = (
f"{task}\n\nPrevious conversation:\n{context}"
if context
else task
# Check if conversation has reached a natural conclusion
last_messages = (
context[-3:] if len(context) >= 3 else context
)
for agent in self.agents:
if self.speaker_fn(
self.get_recent_messages(), agent
if all(
"conclusion" in msg.lower()
for msg in last_messages
):
response = self._get_response_sync(
agent, contextualized_task, turn
)
current_turn.responses.append(response)
self.chat_history.total_messages += 1
logger.debug(
f"Turn {turn}, {agent.name} responded"
"Natural conversation conclusion detected"
)
break
self.chat_history.turns.append(current_turn)
return self.conversation.return_history_as_string()
return self.chat_history
except Exception as e:
logger.error(f"Error in chat: {e}")
raise e
raise
def batched_run(self, tasks: List[str], *args, **kwargs):
def batched_run(
self, tasks: List[str], *args, **kwargs
) -> List[str]:
"""
Run the group chat with a batch of tasks.
Runs multiple tasks in sequence.
Args:
tasks (List[str]): The list of tasks to run in the chat.
tasks (List[str]): List of tasks to process
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Returns:
List[ChatHistory]: The history of each chat.
List[str]: List of conversation histories for each task
Raises:
ValueError: If tasks list is empty or invalid
"""
if not tasks or not isinstance(tasks, list):
raise ValueError(
"Tasks must be a non-empty list of strings"
)
return [self.run(task, *args, **kwargs) for task in tasks]
def concurrent_run(self, tasks: List[str], *args, **kwargs):
def concurrent_run(
self, tasks: List[str], *args, **kwargs
) -> List[str]:
"""
Run the group chat with a batch of tasks concurrently using a thread pool.
Runs multiple tasks concurrently using threads.
Args:
tasks (List[str]): The list of tasks to run in the chat.
tasks (List[str]): List of tasks to process
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Returns:
List[ChatHistory]: The history of each chat.
List[str]: List of conversation histories for each task
Raises:
ValueError: If tasks list is empty or invalid
RuntimeError: If concurrent execution fails
"""
if not tasks or not isinstance(tasks, list):
raise ValueError(
"Tasks must be a non-empty list of strings"
)
try:
with concurrent.futures.ThreadPoolExecutor() as executor:
return list(
executor.map(
@ -306,65 +436,70 @@ class GroupChat:
tasks,
)
)
if __name__ == "__main__":
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key,
model_name="gpt-4o-mini",
temperature=0.1,
)
# Example agents
agent1 = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt="You are a financial analyst specializing in investment strategies.",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
output_type="string",
streaming_on=False,
)
agent2 = Agent(
agent_name="Tax-Adviser-Agent",
system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
output_type="string",
streaming_on=False,
except Exception as e:
logger.error(f"Error in concurrent execution: {e}")
raise RuntimeError(
f"Concurrent execution failed: {str(e)}"
)
agents = [agent1, agent2]
chat = GroupChat(
name="Investment Advisory",
description="Financial and tax analysis group",
agents=agents,
speaker_fn=expertise_based,
)
history = chat.run(
"How to optimize tax strategy for investments?"
)
print(history.model_dump_json(indent=2))
# if __name__ == "__main__":
# load_dotenv()
# # Get the OpenAI API key from the environment variable
# api_key = os.getenv("OPENAI_API_KEY")
# # Create an instance of the OpenAIChat class
# model = OpenAIChat(
# openai_api_key=api_key,
# model_name="gpt-4o-mini",
# temperature=0.1,
# )
# # Example agents
# agent1 = Agent(
# agent_name="Financial-Analysis-Agent",
# system_prompt="You are a financial analyst specializing in investment strategies.",
# llm=model,
# max_loops=1,
# autosave=False,
# dashboard=False,
# verbose=True,
# dynamic_temperature_enabled=True,
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# output_type="string",
# streaming_on=False,
# )
# agent2 = Agent(
# agent_name="Tax-Adviser-Agent",
# system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.",
# llm=model,
# max_loops=1,
# autosave=False,
# dashboard=False,
# verbose=True,
# dynamic_temperature_enabled=True,
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# output_type="string",
# streaming_on=False,
# )
# agents = [agent1, agent2]
# chat = GroupChat(
# name="Investment Advisory",
# description="Financial and tax analysis group",
# agents=agents,
# speaker_fn=expertise_based,
# )
# history = chat.run(
# "How to optimize tax strategy for investments?"
# )
# print(history.model_dump_json(indent=2))

@ -176,9 +176,7 @@ class MajorityVoting:
title="Majority Voting",
)
def run(
self, task: str, correct_answer: str, *args, **kwargs
) -> List[Any]:
def run(self, task: str, *args, **kwargs) -> List[Any]:
"""
Runs the majority voting system and returns the majority vote.
@ -224,9 +222,7 @@ class MajorityVoting:
# If an output parser is provided, parse the responses
if self.consensus_agent is not None:
majority_vote = self.consensus_agent.run(
prompt
)
majority_vote = self.consensus_agent.run(prompt)
self.conversation.add(
self.consensus_agent.agent_name, majority_vote

@ -1,396 +0,0 @@
import json
import sched
import time
from datetime import datetime
from typing import Any, Callable, ClassVar, Dict, List, Union
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.omni_agent_types import AgentType
from typing import Optional
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="task")
class Task(BaseModel):
"""
Task class for running a task in a sequential workflow.
Attributes:
description (str): Description of the task.
agent (Union[Callable, Agent]): Agent or callable object to run the task.
args (List[Any]): Arguments to pass to the agent or callable object.
kwargs (Dict[str, Any]): Keyword arguments to pass to the agent or callable object.
result (Any): Result of the task.
history (List[Any]): History of the task.
schedule_time (datetime): Time to schedule the task.
scheduler (sched.scheduler): Scheduler to schedule the task.
trigger (Callable): Trigger to run the task.
action (Callable): Action to run the task.
condition (Callable): Condition to run the task.
priority (int): Priority of the task.
dependencies (List[Task]): List of tasks that need to be completed before this task can be executed.
Methods:
execute: Execute the task by calling the agent or model with the arguments and keyword arguments.
handle_scheduled_task: Handles the execution of a scheduled task.
set_trigger: Sets the trigger for the task.
set_action: Sets the action for the task.
set_condition: Sets the condition for the task.
is_completed: Checks whether the task has been completed.
add_dependency: Adds a task to the list of dependencies.
set_priority: Sets the priority of the task.
check_dependency_completion: Checks whether all the dependencies have been completed.
Examples:
>>> from swarms.structs import Task, Agent
>>> from swarm_models import OpenAIChat
>>> agent = Agent(llm=OpenAIChat(openai_api_key=""), max_loops=1, dashboard=False)
>>> task = Task(description="What's the weather in miami", agent=agent)
>>> task.run()
>>> task.result
"""
name: Optional[str] = "Task"
description: Optional[str] = (
"A task is a unit of work that needs to be completed for a workflow to progress."
)
agent: Optional[Union[Callable, Agent, AgentType]] = Field(
None,
description="Agent or callable object to run the task",
)
result: Optional[Any] = None
history: List[Any] = Field(default_factory=list)
schedule_time: Optional[datetime] = Field(
None,
description="Time to schedule the task",
)
scheduler: ClassVar[sched.scheduler] = sched.scheduler(
time.time, time.sleep
)
trigger: Optional[Callable] = Field(
None,
description="Trigger to run the task",
)
action: Optional[Callable] = Field(
None,
description="Action to run the task",
)
condition: Optional[Callable] = Field(
None,
description="Condition to run the task",
)
priority: Optional[int] = Field(
0.4,
description="Priority of the task",
)
dependencies: List["Task"] = Field(default_factory=list)
args: List[Any] = Field(default_factory=list)
kwargs: Dict[str, Any] = Field(default_factory=dict)
class Config:
arbitrary_types_allowed = True
# We need to check that the agent exists
def step(self, task: str = None, *args, **kwargs):
"""
Execute the task by calling the agent or model with the arguments and
keyword arguments. You can add images to the agent by passing the
path to the image as a keyword argument.
Examples:
>>> from swarms.structs import Task, Agent
>>> from swarm_models import OpenAIChat
>>> agent = Agent(llm=OpenAIChat(openai_api_key=""), max_loops=1, dashboard=False)
>>> task = Task(description="What's the weather in miami", agent=agent)
>>> task.run()
>>> task.result
"""
logger.info(f"Running task: {task}")
# Check dependencies
if not self.check_dependency_completion():
logger.info(
f"Task {self.description} is waiting for dependencies to complete"
)
return None
# Check the condition before executing the task
if self.condition is not None:
try:
condition_result = self.condition()
if not condition_result:
logger.info(
f"Completion not met for the task: {task} Skipping execution"
)
return None
except Exception as error:
logger.error(f"[ERROR][Task] {error}")
return None
# Execute the task
if self.trigger is None or self.trigger():
try:
logger.info(f"Executing task: {task}")
self.result = self.agent.run(task, *args, **kwargs)
# Ensure the result is either a string or a dict
if isinstance(self.result, str):
logger.info(f"Task result: {self.result}")
elif isinstance(self.result, dict):
logger.info(f"Task result: {self.result}")
else:
logger.error(
"Task result must be either a string or a dict"
)
# Add the result to the history
self.history.append(self.result)
# If an action is specified, execute it
if self.action is not None:
try:
logger.info(
f"Executing action for task: {task}"
)
self.action()
except Exception as error:
logger.error(f"[ERROR][Task] {error}")
except Exception as error:
logger.error(f"[ERROR][Task] {error}")
else:
logger.info(f"Task {task} is not triggered")
def run(self, task: str = None, *args, **kwargs):
now = datetime.now()
# If the task is scheduled for the future, schedule it
if self.schedule_time and self.schedule_time > now:
delay = (self.schedule_time - now).total_seconds()
logger.info(
f"Scheduling task: {self.description} for {self.schedule_time}"
)
self.scheduler.enter(
delay,
1,
self.step,
argument=(task, args, kwargs),
)
self.scheduler.run()
# We need to return the result
else:
# If no scheduling or the time has already passed run the task
return self.step(task, *args, **kwargs)
def handle_scheduled_task(self):
"""
Handles the execution of a scheduled task.
If the schedule time is not set or has already passed, the task is executed immediately.
Otherwise, the task is scheduled to be executed at the specified schedule time.
"""
logger.info(
f"[INFO][Task] Handling scheduled task: {self.description}"
)
try:
if (
self.schedule_time is None
or self.schedule_time <= datetime.now()
):
self.execute()
else:
delay = (
self.schedule_time - datetime.now()
).total_seconds()
self.scheduler.enter(delay, 1, self.execute)
self.scheduler_run()
except Exception as error:
logger.error(f"[ERROR][Task] {error}")
def set_trigger(self, trigger: Callable):
"""
Sets the trigger for the task.
Args:
trigger (Callable): The trigger to set.
"""
self.trigger = trigger
def set_action(self, action: Callable):
"""
Sets the action for the task.
Args:
action (Callable): The action to set.
"""
self.action = action
def set_condition(self, condition: Callable):
"""
Sets the condition for the task.
Args:
condition (Callable): The condition to set.
"""
self.condition = condition
def is_completed(self):
"""Is the task completed?
Returns:
_type_: _description_
"""
return self.result is not None
def add_dependency(self, task):
"""Adds a task to the list of dependencies.
Args:
task (_type_): _description_
"""
self.dependencies.append(task)
def set_priority(self, priority: int):
"""Sets the priority of the task.
Args:
priority (int): _description_
"""
self.priority = priority
def check_dependency_completion(self):
"""
Checks whether all the dependencies have been completed.
Returns:
bool: True if all the dependencies have been completed, False otherwise.
"""
logger.info("[INFO][Task] Checking dependency completion")
try:
for task in self.dependencies:
if not task.is_completed():
return False
except Exception as error:
logger.error(
f"[ERROR][Task][check_dependency_completion] {error}"
)
def context(
self,
task: "Task" = None,
context: List["Task"] = None,
*args,
**kwargs,
):
"""
Set the context for the task.
Args:
context (str): The context to set.
"""
# For sequential workflow, sequentially add the context of the previous task in the list
new_context = Conversation(time_enabled=True, *args, **kwargs)
if context:
for task in context:
description = (
task.description
if task.description is not None
else ""
)
result = (
task.result if task.result is not None else ""
)
# Add the context of the task to the conversation
new_context.add(
task.agent.agent_name, f"{description} {result}"
)
elif task:
description = (
task.description
if task.description is not None
else ""
)
result = task.result if task.result is not None else ""
new_context.add(
task.agent.agent_name, f"{description} {result}"
)
prompt = new_context.return_history_as_string()
# Add to history
return self.history.append(prompt)
def to_dict(self):
"""
Convert the task to a dictionary.
Returns:
dict: The task as a dictionary.
"""
return self.model_dump_json(indent=4)
def save_to_file(self, file_path: str):
"""
Save the task to a file.
Args:
file_path (str): The path to the file to save the task to.
"""
with open(file_path, "w") as file:
file.write(self.to_json(indent=4))
@classmethod
def load_from_file(cls, file_path: str):
"""
Load a task from a file.
Args:
file_path (str): The path to the file to load the task from.
Returns:
Task: The task loaded from the file.
"""
with open(file_path, "r") as file:
task_dict = json.load(file)
return Task(**task_dict)
def schedule_task_with_sched(
function: Callable, run_date: datetime
) -> None:
now = datetime.now()
if run_date <= now:
raise ValueError("run_date must be in the future")
# Calculate the delay in seconds
delay = (run_date - now).total_seconds()
scheduler = sched.scheduler(time.time, time.sleep)
# Schedule the function
scheduler.enter(delay, 1, function)
# Start the scheduler
scheduler.run(delay, 1, function)
# Start the scheduler
logger.info(f"Task scheduled for {run_date}")
scheduler.run()
return None

@ -1,147 +1,388 @@
import os
from dotenv import load_dotenv
from swarm_models import OpenAIChat
from swarms.structs.agent import Agent
from swarms.structs.groupchat import GroupChat, expertise_based
from swarms.structs.groupchat import (
GroupChat,
round_robin,
expertise_based,
random_selection,
sentiment_based,
length_based,
question_based,
topic_based,
)
from datetime import datetime
import time
class TestReport:
def __init__(self):
self.results = []
self.start_time = None
self.end_time = None
def add_result(self, test_name, passed, message="", duration=0):
self.results.append(
{
"test_name": test_name,
"passed": passed,
"message": message,
"duration": duration,
}
)
def setup_test_agents():
model = OpenAIChat(
openai_api_key=os.getenv("OPENAI_API_KEY"),
model_name="gpt-4",
temperature=0.1,
def start(self):
self.start_time = datetime.now()
def end(self):
self.end_time = datetime.now()
def generate_report(self):
total_tests = len(self.results)
passed_tests = sum(1 for r in self.results if r["passed"])
failed_tests = total_tests - passed_tests
duration = (self.end_time - self.start_time).total_seconds()
report = "\n" + "=" * 50 + "\n"
report += "GROUP CHAT TEST SUITE REPORT\n"
report += "=" * 50 + "\n\n"
report += f"Test Run: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}\n"
report += f"Duration: {duration:.2f} seconds\n"
report += f"Total Tests: {total_tests}\n"
report += f"Passed: {passed_tests}\n"
report += f"Failed: {failed_tests}\n"
report += (
f"Success Rate: {(passed_tests/total_tests)*100:.1f}%\n\n"
)
return [
Agent(
agent_name="Agent1",
system_prompt="You only respond with 'A'",
llm=model,
report += "Detailed Test Results:\n"
report += "-" * 50 + "\n"
for result in self.results:
status = "" if result["passed"] else ""
report += f"{status} {result['test_name']} ({result['duration']:.2f}s)\n"
if result["message"]:
report += f" {result['message']}\n"
return report
def create_test_agents(num_agents, diverse_prompts=False):
"""Helper function to create test agents with diverse prompts"""
agents = []
specialties = [
(
"Finance",
"You are a financial expert focusing on investment strategies and market analysis. Be concise and data-driven in your responses.",
),
Agent(
agent_name="Agent2",
system_prompt="You only respond with 'B'",
llm=model,
(
"Tech",
"You are a technology expert specializing in AI and cybersecurity. Use technical terms and provide practical examples.",
),
Agent(
agent_name="Agent3",
system_prompt="You only respond with 'C'",
llm=model,
(
"Healthcare",
"You are a healthcare professional with expertise in public health. Focus on evidence-based information and patient care.",
),
(
"Marketing",
"You are a marketing strategist focusing on digital trends. Be creative and audience-focused in your responses.",
),
(
"Legal",
"You are a legal expert specializing in corporate law. Be precise and reference relevant regulations.",
),
]
def test_round_robin_speaking():
chat = GroupChat(agents=setup_test_agents())
history = chat.run("Say your letter")
# Verify agents speak in order
responses = [
r.message for t in history.turns for r in t.responses
for i in range(num_agents):
specialty, base_prompt = specialties[i % len(specialties)]
if diverse_prompts:
# Add personality traits and communication style to make responses more diverse
traits = [
"Be analytical and data-focused",
"Use analogies and examples",
"Be concise and direct",
"Ask thought-provoking questions",
"Provide practical applications",
]
assert responses == ["A", "B", "C"] * (len(history.turns))
prompt = f"{base_prompt} {traits[i % len(traits)]}"
else:
prompt = base_prompt
agents.append(
Agent(
agent_name=f"{specialty}-Agent-{i+1}",
system_prompt=prompt,
model_name="gpt-4",
max_loops=1,
temperature=0.7, # Add temperature to increase response variety
)
)
return agents
def test_concurrent_processing():
chat = GroupChat(agents=setup_test_agents())
tasks = ["Task1", "Task2", "Task3"]
histories = chat.concurrent_run(tasks)
assert len(histories) == len(tasks)
for history in histories:
assert history.total_messages > 0
def test_basic_groupchat(report):
"""Test basic GroupChat initialization and conversation"""
start_time = time.time()
try:
agents = create_test_agents(2)
chat = GroupChat(
name="Test Chat",
description="A test group chat",
agents=agents,
max_loops=2,
)
def test_expertise_based_speaking():
agents = setup_test_agents()
chat = GroupChat(agents=agents, speaker_fn=expertise_based)
result = chat.run("Say hello!")
report.add_result(
"Basic GroupChat Test",
True,
duration=time.time() - start_time,
)
# Test each agent's expertise trigger
for agent in agents:
history = chat.run(f"Trigger {agent.system_prompt}")
first_response = history.turns[0].responses[0]
assert first_response.agent_name == agent.agent_name
except Exception as e:
report.add_result(
"Basic GroupChat Test",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_max_loops_limit():
max_loops = 3
chat = GroupChat(agents=setup_test_agents(), max_loops=max_loops)
history = chat.run("Test message")
def test_speaker_functions(report):
"""Test all available speaker functions with enhanced prompts"""
speaker_functions = {
"round_robin": (
round_robin,
"What are your thoughts on sustainable practices?",
),
"expertise_based": (
expertise_based,
"Discuss the impact of AI on your field.",
),
"random_selection": (
random_selection,
"How do you approach problem-solving?",
),
"sentiment_based": (
sentiment_based,
"Share your positive outlook on future trends.",
),
"length_based": (
length_based,
"Provide a detailed analysis of recent developments.",
),
"question_based": (
question_based,
"What challenges do you foresee in your industry?",
),
"topic_based": (
topic_based,
"How does digital transformation affect your sector?",
),
}
assert len(history.turns) == max_loops
for name, (func, prompt) in speaker_functions.items():
start_time = time.time()
try:
# Create agents with diverse prompts for this test
agents = create_test_agents(3, diverse_prompts=True)
chat = GroupChat(
name=f"{name.title()} Test",
description=f"Testing {name} speaker function with diverse responses",
agents=agents,
speaker_fn=func,
max_loops=2,
rules="1. Be unique in your responses\n2. Build on others' points\n3. Stay relevant to your expertise",
)
result = chat.run(prompt)
report.add_result(
f"Speaker Function - {name}",
True,
duration=time.time() - start_time,
)
def test_error_handling():
broken_agent = Agent(
agent_name="BrokenAgent",
system_prompt="You raise errors",
llm=None,
except Exception as e:
report.add_result(
f"Speaker Function - {name}",
False,
message=str(e),
duration=time.time() - start_time,
)
chat = GroupChat(agents=[broken_agent])
history = chat.run("Trigger error")
assert "Error" in history.turns[0].responses[0].message
def test_varying_agent_counts(report):
"""Test GroupChat with different numbers of agents"""
agent_counts = [1, 3, 5, 7]
for count in agent_counts:
start_time = time.time()
try:
agents = create_test_agents(count)
chat = GroupChat(
name=f"{count}-Agent Test", agents=agents, max_loops=2
)
def test_conversation_context():
agents = setup_test_agents()
complex_prompt = "Previous message refers to A. Now trigger B. Finally discuss C."
result = chat.run("Introduce yourselves briefly.")
report.add_result(
f"Agent Count Test - {count} agents",
True,
duration=time.time() - start_time,
)
except Exception as e:
report.add_result(
f"Agent Count Test - {count} agents",
False,
message=str(e),
duration=time.time() - start_time,
)
chat = GroupChat(agents=agents, speaker_fn=expertise_based)
history = chat.run(complex_prompt)
responses = [
r.agent_name for t in history.turns for r in t.responses
def test_error_cases(report):
"""Test error handling with expanded cases"""
test_cases = [
("Empty Agents List", lambda: GroupChat(agents=[])),
(
"Invalid Max Loops",
lambda: GroupChat(
agents=[create_test_agents(1)[0]], max_loops=0
),
),
(
"Empty Task",
lambda: GroupChat(agents=[create_test_agents(1)[0]]).run(
""
),
),
(
"None Task",
lambda: GroupChat(agents=[create_test_agents(1)[0]]).run(
None
),
),
(
"Invalid Speaker Function",
lambda: GroupChat(
agents=[create_test_agents(1)[0]],
speaker_fn=lambda x, y: "not a boolean", # This should raise ValueError
),
),
]
assert all(agent.agent_name in responses for agent in agents)
for name, test_func in test_cases:
start_time = time.time()
try:
test_func()
report.add_result(
f"Error Case - {name}",
False,
message="Expected ValueError not raised",
duration=time.time() - start_time,
)
except (
ValueError,
TypeError,
): # Include TypeError for invalid speaker function
report.add_result(
f"Error Case - {name}",
True,
duration=time.time() - start_time,
)
except Exception as e:
report.add_result(
f"Error Case - {name}",
False,
message=f"Unexpected error: {str(e)}",
duration=time.time() - start_time,
)
def test_concurrent_execution(report):
"""Test concurrent execution with various task counts"""
start_time = time.time()
def test_large_agent_group():
large_group = setup_test_agents() * 5 # 15 agents
chat = GroupChat(agents=large_group)
history = chat.run("Test scaling")
try:
agents = create_test_agents(3)
chat = GroupChat(
name="Concurrent Test", agents=agents, max_loops=1
)
assert history.total_messages > len(large_group)
tasks = [
"Task 1: Introduce yourself",
"Task 2: What's your specialty?",
"Task 3: How can you help?",
"Task 4: What are your limitations?",
"Task 5: Give an example of your expertise",
]
results = chat.concurrent_run(tasks)
report.add_result(
"Concurrent Execution Test",
True,
message=f"Successfully completed {len(results)} tasks",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result(
"Concurrent Execution Test",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_long_conversations():
chat = GroupChat(agents=setup_test_agents(), max_loops=50)
history = chat.run("Long conversation test")
assert len(history.turns) == 50
assert history.total_messages > 100
def test_conversation_rules(report):
"""Test GroupChat with different conversation rules"""
start_time = time.time()
try:
agents = create_test_agents(3, diverse_prompts=True)
chat = GroupChat(
name="Rules Test",
description="Testing conversation with specific rules",
agents=agents,
max_loops=2,
rules="""
1. Keep responses under 50 words
2. Always be professional
3. Stay on topic
4. Provide unique perspectives
5. Build on previous responses
""",
)
def test_stress_batched_runs():
chat = GroupChat(agents=setup_test_agents())
tasks = ["Task"] * 100
histories = chat.batched_run(tasks)
result = chat.run(
"How can we ensure ethical AI development across different sectors?"
)
report.add_result(
"Conversation Rules Test",
True,
duration=time.time() - start_time,
)
assert len(histories) == len(tasks)
total_messages = sum(h.total_messages for h in histories)
assert total_messages > len(tasks) * 3
except Exception as e:
report.add_result(
"Conversation Rules Test",
False,
message=str(e),
duration=time.time() - start_time,
)
if __name__ == "__main__":
load_dotenv()
functions = [
test_round_robin_speaking,
test_concurrent_processing,
test_expertise_based_speaking,
test_max_loops_limit,
test_error_handling,
test_conversation_context,
test_large_agent_group,
test_long_conversations,
test_stress_batched_runs,
]
report = TestReport()
report.start()
for func in functions:
try:
print(f"Running {func.__name__}...")
func()
print("✓ Passed")
except Exception as e:
print(f"✗ Failed: {str(e)}")
print("Starting Enhanced GroupChat Test Suite...\n")
# Run all tests
test_basic_groupchat(report)
test_speaker_functions(report)
test_varying_agent_counts(report)
test_error_cases(report)
test_concurrent_execution(report)
test_conversation_rules(report)
report.end()
print(report.generate_report())

Loading…
Cancel
Save