pull/56/head
Kye 1 year ago
parent 7d888c6a71
commit 154f50cc25

@ -1,109 +1,49 @@
# from swarms.structs import Flow
# from swarms.models import OpenAIChat
# from swarms.swarms.groupchat import GroupChat
# from swarms.agents import SimpleAgent
from swarms import OpenAI, Flow
from swarms.swarms.groupchat import GroupChatManager, GroupChat
# api_key = ""
# llm = OpenAIChat(
# openai_api_key=api_key,
# )
api_key = ""
# agent1 = SimpleAgent("Captain Price", Flow(llm=llm, max_loops=4))
# agent2 = SimpleAgent("John Mactavis", Flow(llm=llm, max_loops=4))
# # Create a groupchat with the 2 agents
# chat = GroupChat([agent1, agent2])
# # Assign duties to the agents
# chat.assign_duty(agent1.name, "Buy the groceries")
# chat.assign_duty(agent2.name, "Clean the house")
# # Initate a chat
# response = chat.run("Captain Price", "Hello, how are you John?")
# print(response)
from swarms.models import OpenAIChat
from swarms.structs import Flow
import random
api_key = "" # Your API Key here
class GroupChat:
"""
GroupChat class that facilitates agent-to-agent communication using multiple instances of the Flow class.
"""
def __init__(self, agents: list):
self.agents = {f"agent_{i}": agent for i, agent in enumerate(agents)}
self.message_log = []
def add_agent(self, agent: Flow):
agent_id = f"agent_{len(self.agents)}"
self.agents[agent_id] = agent
def remove_agent(self, agent_id: str):
if agent_id in self.agents:
del self.agents[agent_id]
def send_message(self, sender_id: str, recipient_id: str, message: str):
if sender_id not in self.agents or recipient_id not in self.agents:
raise ValueError("Invalid sender or recipient ID.")
formatted_message = f"{sender_id} to {recipient_id}: {message}"
self.message_log.append(formatted_message)
recipient_agent = self.agents[recipient_id]
recipient_agent.run(message)
def broadcast_message(self, sender_id: str, message: str):
for agent_id, agent in self.agents.items():
if agent_id != sender_id:
self.send_message(sender_id, agent_id, message)
def get_message_log(self):
return self.message_log
class EnhancedGroupChatV2(GroupChat):
def __init__(self, agents: list):
super().__init__(agents)
def multi_round_conversation(self, rounds: int = 5):
"""
Initiate a multi-round conversation between agents.
Args:
rounds (int): The number of rounds of conversation.
"""
for _ in range(rounds):
# Randomly select a sender and recipient agent for the conversation
sender_id = random.choice(list(self.agents.keys()))
recipient_id = random.choice(list(self.agents.keys()))
while recipient_id == sender_id: # Ensure the recipient is not the sender
recipient_id = random.choice(list(self.agents.keys()))
# Generate a message (for simplicity, a generic message is used)
message = f"Hello {recipient_id}, how are you today?"
self.send_message(sender_id, recipient_id, message)
# Sample usage with EnhancedGroupChatV2
# Initialize the language model
llm = OpenAIChat(
llm = OpenAI(
openai_api_key=api_key,
temperature=0.5,
max_tokens=3000,
)
# Initialize two Flow agents
agent1 = Flow(llm=llm, max_loops=5, dashboard=True)
agent2 = Flow(llm=llm, max_loops=5, dashboard=True)
# Initialize the flow
flow1 = Flow(
llm=llm,
max_loops=1,
system_message="YOU ARE SILLY, YOU OFFER NOTHING OF VALUE",
name='silly',
dashboard=True,
)
flow2 = Flow(
llm=llm,
max_loops=1,
system_message="YOU ARE VERY SMART AND ANSWER RIDDLES",
name='detective',
dashboard=True,
)
flow3 = Flow(
llm=llm,
max_loops=1,
system_message="YOU MAKE RIDDLES",
name='riddler',
dashboard=True,
)
manager = Flow(
llm=llm,
max_loops=1,
system_message="YOU ARE A GROUP CHAT MANAGER",
name='manager',
dashboard=True,
)
# Create an enhanced group chat with the two agents
enhanced_group_chat_v2 = EnhancedGroupChatV2(agents=[agent1, agent2])
# Simulate multi-round agent to agent communication
enhanced_group_chat_v2.multi_round_conversation(rounds=5)
# Example usage:
agents = [flow1, flow2, flow3]
enhanced_group_chat_v2.get_message_log() # Get the conversation log
group_chat = GroupChat(agents=agents, messages=[], max_round=10)
chat_manager = GroupChatManager(groupchat=group_chat, selector = manager)
chat_history = chat_manager("Write me a riddle")

@ -34,6 +34,7 @@ When you have finished the task, and you feel as if you are done: output a speci
This will enable you to leave the flow loop.
"""
# Custome stopping condition
def stop_when_repeats(response: str) -> bool:
# Stop if the word stop appears in the response
@ -100,6 +101,8 @@ class Flow:
retry_interval: int = 1,
interactive: bool = False,
dashboard: bool = False,
name: str = "Flow agent",
system_message: str = FLOW_SYSTEM_PROMPT,
# tools: List[BaseTool] = None,
dynamic_temperature: bool = False,
**kwargs: Any,
@ -119,6 +122,8 @@ class Flow:
self.dashboard = dashboard
self.dynamic_temperature = dynamic_temperature
# self.tools = tools
self.system_message = system_message
self.name = name
def provide_feedback(self, feedback: str) -> None:
"""Allow users to provide feedback on the responses."""
@ -131,11 +136,6 @@ class Flow:
return self.stopping_condition(response)
return False
def __call__(self, prompt, **kwargs) -> str:
"""Invoke the flow by providing a template and its variables."""
response = self.llm(prompt, **kwargs)
return response
def dynamic_temperature(self):
"""
1. Check the self.llm object for the temperature
@ -282,6 +282,82 @@ class Flow:
return response # , history
def __call__(self, task: str, save: bool = True, **kwargs):
"""
Run the autonomous agent loop
Args:
task (str): The initial task to run
Flow:
1. Generate a response
2. Check stopping condition
3. If stopping condition is met, stop
4. If stopping condition is not met, generate a response
5. Repeat until stopping condition is met or max_loops is reached
Example:
>>> out = flow.run("Generate a 10,000 word blog on health and wellness.")
"""
# Start with a new history or continue from the last saved state
if not self.memory or not self.memory[-1]:
history = [f"Human: {task}"]
else:
history = self.memory[-1]
response = task
history = [f"Human: {task}"]
# If dashboard = True then print the dashboard
if self.dashboard:
self.print_dashboard(task)
# Start or continue the loop process
for i in range(len(history), self.max_loops):
print(colored(f"\nLoop {i+1} of {self.max_loops}", "blue"))
print("\n")
response = history[-1].split(": ", 1)[-1] # Get the last response
if self._check_stopping_condition(response) or parse_done_token(response):
break
# Adjust temperature, comment if no work
if self.dynamic_temperature:
self.dynamic_temperature()
attempt = 0
while attempt < self.retry_attempts:
try:
response = self.llm(
self.agent_history_prompt(FLOW_SYSTEM_PROMPT, response)
** kwargs,
)
# print(f"Next query: {response}")
# break
if self.interactive:
print(f"AI: {response}")
history.append(f"AI: {response}")
response = input("You: ")
history.append(f"Human: {response}")
else:
print(f"AI: {response}")
history.append(f"AI: {response}")
print(response)
break
except Exception as e:
logging.error(f"Error generating response: {e}")
attempt += 1
time.sleep(self.retry_interval)
history.append(response)
time.sleep(self.loop_interval)
self.memory.append(history)
if save:
self.save_state("flow_history.json")
return response # , history
def _run(self, **kwargs: Any) -> str:
"""Generate a result using the provided keyword args."""
task = self.format_prompt(**kwargs)
@ -304,6 +380,7 @@ class Flow:
Returns:
str: The agent history prompt
"""
system_prompt = system_prompt or self.system_message
agent_history_prompt = f"""
SYSTEM_PROMPT: {system_prompt}
@ -608,3 +685,22 @@ class Flow:
attempt += 1
time.sleep(retry_delay)
raise Exception("All retry attempts failed")
def generate_reply(self, history: str, **kwargs) -> str:
"""
Generate a response based on initial or task
"""
prompt = f"""
SYSTEM_PROMPT: {self.system_message}
History: {history}
Your response:
"""
response = self.llm(prompt, **kwargs)
return {"role": self.name, "content": response}
def update_system_message(self, system_message: str):
"""Upddate the system message"""
self.system_message = system_message

@ -1,89 +1,108 @@
from swarms.agents import SimpleAgent
from termcolor import colored
import logging
from dataclasses import dataclass
from typing import Dict, List
from swarms.structs.flow import Flow
logger = logging.getLogger(__name__)
@dataclass
class GroupChat:
"""
Groupchat
Args:
agents (list): List of agents
dashboard (bool): Whether to print a dashboard or not
Example:
>>> from swarms.structs import Flow
>>> from swarms.models import OpenAIChat
>>> from swarms.swarms.groupchat import GroupChat
>>> from swarms.agents import SimpleAgent
>>> api_key = ""
>>> llm = OpenAIChat()
>>> agent1 = SimpleAgent("Captain Price", Flow(llm=llm, max_loops=4))
>>> agent2 = SimpleAgent("John Mactavis", Flow(llm=llm, max_loops=4))
>>> chat = GroupChat([agent1, agent2])
>>> chat.assign_duty(agent1.name, "Buy the groceries")
>>> chat.assign_duty(agent2.name, "Clean the house")
>>> response = chat.run("Captain Price", "Hello, how are you John?")
>>> print(response)
"""
def __init__(self, agents, dashboard: bool = False):
# Ensure that all provided agents are instances of simpleagents
if not all(isinstance(agent, SimpleAgent) for agent in agents):
raise ValueError("All agents must be instances of SimpleAgent")
self.agents = {agent.name: agent for agent in agents}
# Dictionary to store duties for each agent
self.duties = {}
# Dictionary to store roles for each agent
self.roles = {}
self.dashboard = dashboard
def assign_duty(self, agent_name, duty):
"""Assigns duty to the agent"""
if agent_name not in self.agents:
raise ValueError(f"No agent named {agent_name} found.")
def assign_role(self, agent_name, role):
"""Assigns a role to the specified agent"""
if agent_name not in self.agents:
raise ValueError(f"No agent named {agent_name} found")
self.roles[agent_name] = role
def run(self, sender_name: str, message: str):
"""Runs the groupchat"""
if self.dashboard:
metrics = print(
colored(
f"""
Groupchat Configuration:
------------------------
Agents: {self.agents}
Message: {message}
Sender: {sender_name}
""",
"red",
)
"""A group chat class that contains a list of agents and the maximum number of rounds."""
agents: List[Flow]
messages: List[Dict]
max_round: int = 10
admin_name: str = "Admin" # the name of the admin agent
@property
def agent_names(self) -> List[str]:
"""Return the names of the agents in the group chat."""
return [agent.name for agent in self.agents]
def reset(self):
"""Reset the group chat."""
self.messages.clear()
def agent_by_name(self, name: str) -> Flow:
"""Find an agent whose name is contained within the given 'name' string."""
for agent in self.agents:
if agent.name in name:
return agent
raise ValueError(f"No agent found with a name contained in '{name}'.")
def next_agent(self, agent: Flow) -> Flow:
"""Return the next agent in the list."""
return self.agents[(self.agent_names.index(agent.name) + 1) % len(self.agents)]
def select_speaker_msg(self):
"""Return the message for selecting the next speaker."""
return f"""
You are in a role play game. The following roles are available:
{self._participant_roles()}.
Read the following conversation.
Then select the next role from {self.agent_names} to play. Only return the role.
"""
def select_speaker(self, last_speaker: Flow, selector: Flow):
"""Select the next speaker."""
selector.update_system_message(self.select_speaker_msg())
# Warn if GroupChat is underpopulated, without established changing behavior
n_agents = len(self.agent_names)
if n_agents < 3:
logger.warning(
f"GroupChat is underpopulated with {n_agents} agents. Direct communication would be more efficient."
)
print(metrics)
responses = {}
for agent_name, agent in self.agents.items():
if agent_name != sender_name:
if agent_name in self.duties:
message += f"Your duty is {self.duties[agent_name]}"
if agent_name in self.roles:
message += (
f"You are the {self.roles[agent_name]} in this conversation"
)
name = selector.generate_reply(
self.format_history(
self.messages
+ [
{
"role": "system",
"content": f"Read the above conversation. Then select the next most suitable role from {self.agent_names} to play. Only return the role.",
}
]
)
)
try:
return self.agent_by_name(name["content"])
except ValueError:
return self.next_agent(last_speaker)
def _participant_roles(self):
return "\n".join(
[f"{agent.name}: {agent.system_message}" for agent in self.agents]
)
def format_history(self, messages: List[Dict]) -> str:
formatted_messages = []
for message in messages:
formatted_message = f"'{message['role']}:{message['content']}"
formatted_messages.append(formatted_message)
return "\n".join(formatted_messages)
class GroupChatManager:
def __init__(self, groupchat: GroupChat, selector: Flow):
self.groupchat = groupchat
self.selector = selector
def __call__(self, task: str):
self.groupchat.messages.append({"role": self.selector.name, "content": task})
for i in range(self.groupchat.max_round):
speaker = self.groupchat.select_speaker(
last_speaker=self.selector, selector=self.selector
)
reply = speaker.generate_reply(
self.groupchat.format_history(self.groupchat.messages)
)
self.groupchat.messages.append(reply)
print(reply)
if i == self.groupchat.max_round - 1:
break
responses[agent_name] = agent.run(message)
return responses
return reply

Loading…
Cancel
Save