diff --git a/swarms/structs/flow.py b/swarms/structs/flow.py index bc11522b..7ebf8f88 100644 --- a/swarms/structs/flow.py +++ b/swarms/structs/flow.py @@ -6,11 +6,13 @@ TODO: - Add """ - +import asyncio +import copy import json import logging import time -from typing import Any, Callable, Dict, List, Optional, Tuple, Generator +from collections import defaultdict +from typing import Any, Callable, Dict, List, Optional, Tuple, Generator, Union from termcolor import colored import inspect import random @@ -92,6 +94,8 @@ class Flow: retry_interval: int = 1, interactive: bool = False, dashboard: bool = False, + name: str = "flow-agent", + system_message: str = "system", dynamic_temperature: bool = False, **kwargs: Any, ): @@ -109,6 +113,9 @@ class Flow: self.interactive = interactive self.dashboard = dashboard self.dynamic_temperature = dynamic_temperature + self.system_message = system_message + self._oai_messages = defaultdict(list) + self._oai_system_message = [{"content": system_message, "role": "system"}] def provide_feedback(self, feedback: str) -> None: """Allow users to provide feedback on the responses.""" @@ -453,3 +460,87 @@ class Flow: for token in tokens: time.sleep(0.1) yield token + + def generate_reply( + self, + messages: Optional[List[Dict]] = None, + sender: Optional['Flow'] = None, + exclude: Optional[List[Callable]] = None, + ) -> Union[str, Dict, None]: + """Reply based on the conversation history and the sender. + """ + assert messages is not None or sender is not None, "Either messages or sender must be provided." + if messages is None: + messages = self._oai_messages[sender] + + for reply_func_tuple in self._reply_func_list: + reply_func = reply_func_tuple["reply_func"] + if exclude and reply_func in exclude: + continue + if asyncio.coroutines.iscoroutinefunction(reply_func): + continue + if self._match_trigger(reply_func_tuple["trigger"], sender): + final, reply = reply_func(self, messages=messages, sender=sender, config=reply_func_tuple["config"]) + if final: + return reply + return self._default_auto_reply + + def generate_oai_reply( + self, + messages: Optional[List[Dict]] = None, + sender: Optional['Flow'] = None, + config: Optional[Any] = None, + ) -> Tuple[bool, Union[str, Dict, None]]: + """Generate a reply using autogen.oai.""" + llm_config = self.llm_config if config is None else config + if llm_config is False: + return False, None + if messages is None: + messages = self._oai_messages[sender] + + # TODO: #1143 handle token limit exceeded error + response = oai.ChatCompletion.create( + context=messages[-1].pop("context", None), messages=self._oai_system_message + messages, **llm_config + ) + return True, oai.ChatCompletion.extract_text_or_function_call(response)[0] + + def send( + self, + message: Union[Dict, str], + recipient: 'Flow', + request_reply: Optional[bool] = None, + silent: Optional[bool] = False, + ) -> bool: + """Send a message to another agent. + """ + # When the agent composes and sends the message, the role of the message is "assistant" + # unless it's "function". + valid = self._append_oai_message(message, "assistant", recipient) + if valid: + recipient.receive(message, self, request_reply, silent) + else: + raise ValueError( + "Message can't be converted into a valid ChatCompletion message. Either content or function_call must be provided." + ) + + + def last_message(self, agent: Optional['Flow'] = None) -> Dict: + """The last message exchanged with the agent. + + Args: + agent (Agent): The agent in the conversation. + If None and more than one agent's conversations are found, an error will be raised. + If None and only one conversation is found, the last message of the only conversation will be returned. + + Returns: + The last message exchanged with the agent. + """ + if agent is None: + n_conversations = len(self._oai_messages) + if n_conversations == 0: + return None + if n_conversations == 1: + for conversation in self._oai_messages.values(): + return conversation[-1] + raise ValueError("More than one conversation is found. Please specify the sender to get the last message.") + return self._oai_messages[agent][-1] diff --git a/swarms/swarms/groupchat.py b/swarms/swarms/groupchat.py index ad9ca40a..5feac0ae 100644 --- a/swarms/swarms/groupchat.py +++ b/swarms/swarms/groupchat.py @@ -6,10 +6,9 @@ import logging from .. import Flow logger = logging.getLogger(__name__) -from swarms.agents import SimpleAgent -from termcolor import colored +@dataclass class GroupChat: """A group chat class that contains a list of agents and the maximum number of rounds.""" @@ -143,87 +142,5 @@ class GroupChatManager(Flow): speaker.send(reply, self, request_reply=False) message = self.last_message(speaker) return True, None - """ - Groupchat - - Args: - agents (list): List of agents - dashboard (bool): Whether to print a dashboard or not - - Example: - >>> from swarms.structs import Flow - >>> from swarms.models import OpenAIChat - >>> from swarms.swarms.groupchat import GroupChat - >>> from swarms.agents import SimpleAgent - >>> api_key = "" - >>> llm = OpenAIChat() - >>> agent1 = SimpleAgent("Captain Price", Flow(llm=llm, max_loops=4)) - >>> agent2 = SimpleAgent("John Mactavis", Flow(llm=llm, max_loops=4)) - >>> chat = GroupChat([agent1, agent2]) - >>> chat.assign_duty(agent1.name, "Buy the groceries") - >>> chat.assign_duty(agent2.name, "Clean the house") - >>> response = chat.run("Captain Price", "Hello, how are you John?") - >>> print(response) - - - - """ - - def __init__(self, agents, dashboard: bool = False): - # Ensure that all provided agents are instances of simpleagents - if not all(isinstance(agent, SimpleAgent) for agent in agents): - raise ValueError("All agents must be instances of SimpleAgent") - self.agents = {agent.name: agent for agent in agents} - - # Dictionary to store duties for each agent - self.duties = {} - - # Dictionary to store roles for each agent - self.roles = {} - - self.dashboard = dashboard - - def assign_duty(self, agent_name, duty): - """Assigns duty to the agent""" - if agent_name not in self.agents: - raise ValueError(f"No agent named {agent_name} found.") - - def assign_role(self, agent_name, role): - """Assigns a role to the specified agent""" - if agent_name not in self.agents: - raise ValueError(f"No agent named {agent_name} found") - - self.roles[agent_name] = role - - def run(self, sender_name: str, message: str): - """Runs the groupchat""" - if self.dashboard: - metrics = print( - colored( - f""" - - Groupchat Configuration: - ------------------------ - - Agents: {self.agents} - Message: {message} - Sender: {sender_name} - """, - "red", - ) - ) - - print(metrics) - responses = {} - for agent_name, agent in self.agents.items(): - if agent_name != sender_name: - if agent_name in self.duties: - message += f"Your duty is {self.duties[agent_name]}" - if agent_name in self.roles: - message += ( - f"You are the {self.roles[agent_name]} in this conversation" - ) - responses[agent_name] = agent.run(message) - return responses