dynamical temp for flow class and groupchat class

pull/77/head
Kye 1 year ago
parent b1516944dd
commit 0dcb7f9549

@ -1,13 +1,3 @@
message='Request to OpenAI API' method=post path=https://api.openai.com/v1/chat/completions
api_version=None data='{"messages": [{"role": "user", "content": "Generate a 10,000 word blog on health and wellness."}], "model": "gpt-3.5-turbo", "temperature": 0.5}' message='Post details'
Converted retries value: 2 -> Retry(total=2, connect=None, read=None, redirect=None, status=None)
Starting new HTTPS connection (1): api.openai.com:443
https://api.openai.com:443 "POST /v1/chat/completions HTTP/1.1" 200 None
message='OpenAI API response' path=https://api.openai.com/v1/chat/completions processing_ms=13345 request_id=ad9fa4aaf27128538fdb603e44576e17 response_code=200
message='Request to OpenAI API' method=post path=https://api.openai.com/v1/chat/completions
api_version=None data='{"messages": [{"role": "user", "content": "Title: Achieving Optimal Health and Wellness: A Comprehensive Guide\\n\\nIntroduction:\\n\\nIn today\'s fast-paced world, maintaining good health and wellness has become a paramount concern for many individuals. The pursuit of a healthy lifestyle encompasses various aspects of our lives, including physical, mental, and emotional well-being. This blog aims to provide a comprehensive guide to achieving optimal health and wellness, offering valuable insights, practical tips, and evidence-based information to support your journey towards a healthier and happier life.\\n\\nTable of Contents:\\n\\n1. Understanding Health and Wellness\\n a. Definitions and distinctions\\n b. The importance of a holistic approach\\n\\n2. Physical Health\\n a. The significance of regular exercise\\n b. Balanced nutrition and healthy eating habits\\n c. Importance of adequate sleep and rest\\n d. Maintaining a healthy weight\\n e. Preventive measures and screenings\\n\\n3. Mental Health\\n a. Recognizing and managing stress\\n b. Building resilience and coping mechanisms\\n c. Strategies for improving focus and concentration\\n d. Promoting positive self-esteem and body image\\n e. Seeking professional help when needed\\n\\n4. Emotional Well-being\\n a. Nurturing healthy relationships\\n b. Techniques for managing emotions effectively\\n c. Practicing gratitude and mindfulness\\n d. Engaging in activities that bring joy and fulfillment\\n e. Cultivating emotional intelligence\\n\\n5. Spiritual Wellness\\n a. Exploring different spiritual practices\\n b. Finding meaning and purpose in life\\n c. Meditation and mindfulness techniques\\n d. Connecting with nature and the universe\\n e. Embracing personal beliefs and values\\n\\n6. Social Wellness\\n a. The importance of social connections\\n b. Building a support network\\n c. Effective communication skills\\n d. Active involvement in the community\\n e. Balancing social interactions and alone time\\n\\n7. Environmental Wellness\\n a. Promoting a clean and sustainable environment\\n b. Reducing exposure to toxins and pollutants\\n c. Creating a calming and organized living space\\n d. Incorporating nature into everyday life\\n e. Adopting eco-friendly practices\\n\\n8. Occupational Wellness\\n a. Finding fulfillment in your career\\n b. Achieving work-life balance\\n c. Setting realistic goals and managing time effectively\\n d. Developing skills and continuous learning\\n e. Embracing creativity and innovation\\n\\n9. Financial Wellness\\n a. Creating a budget and managing finances\\n b. Saving and investing for the future\\n c. Reducing debt and financial stress\\n d. Seeking professional advice for financial planning\\n e. Balancing material and non-material aspirations\\n\\n10. Integrating Health and Wellness into Daily Life\\n a. Setting goals and creating a personalized plan\\n b. Overcoming obstacles and maintaining motivation\\n c. Building healthy habits and routines\\n d. Tracking progress and celebrating achievements\\n e. Embracing a lifelong commitment to health and wellness\\n\\nConclusion:\\n\\nAchieving optimal health and wellness is a lifelong journey that requires dedication, self-awareness, and continuous effort. By adopting a holistic approach and addressing various dimensions of well-being, we can enhance our quality of life and experience a profound sense of fulfillment. Remember, small changes can make a big difference, so start implementing the tips and strategies shared in this blog to embark on your path towards a healthier, happier you."}], "model": "gpt-3.5-turbo", "temperature": 0.5}' message='Post details'
https://api.openai.com:443 "POST /v1/chat/completions HTTP/1.1" 200 None
message='OpenAI API response' path=https://api.openai.com/v1/chat/completions processing_ms=1528 request_id=f0c0b4c521f98a3031aeea3e6b099810 response_code=200
Error in sys.excepthook:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/humbug/report.py", line 505, in _hook
@ -18,9 +8,16 @@ TypeError: format_exception() got an unexpected keyword argument 'etype'
Original exception was:
Traceback (most recent call last):
File "/Users/defalt/Desktop/Athena/research/swarms/flow.py", line 20, in <module>
out = flow.run("Generate a 10,000 word blog on health and wellness.")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/defalt/Desktop/Athena/research/swarms/swarms/structs/flow.py", line 185, in run
time.sleep(self.loop_interval)
KeyboardInterrupt
File "/Users/defalt/Desktop/Athena/research/swarms/groupchat.py", line 23, in <module>
response = chat.run("Captain Price", "Hello, how are you John?")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/defalt/Desktop/Athena/research/swarms/swarms/swarms/groupchat.py", line 60, in run
responses[agent_name] = agent.run(message)
^^^^^^^^^^^^^^^^^^
File "/Users/defalt/Desktop/Athena/research/swarms/swarms/agents/simple_agent.py", line 35, in run
response = self.flow.run(task)
^^^^^^^^^^^^^^^^^^^
File "/Users/defalt/Desktop/Athena/research/swarms/swarms/structs/flow.py", line 151, in run
if self.dashboard:
^^^^^^^^^^^^^^
AttributeError: 'Flow' object has no attribute 'dashboard'

@ -0,0 +1,24 @@
from swarms.structs import Flow
from swarms.models import OpenAIChat
from swarms.swarms.groupchat import GroupChat
from swarms.agents import SimpleAgent
api_key = ""
llm = OpenAIChat(
openai_api_key=api_key,
)
agent1 = SimpleAgent("Captain Price", Flow(llm=llm, max_loops=4))
agent2 = SimpleAgent("John Mactavis", Flow(llm=llm, max_loops=4))
# Create a groupchat with the 2 agents
chat = GroupChat([agent1, agent2])
# Assign duties to the agents
chat.assign_duty(agent1.name, "Buy the groceries")
chat.assign_duty(agent2.name, "Clean the house")
# Initate a chat
response = chat.run("Captain Price", "Hello, how are you John?")
print(response)

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "1.9.2"
version = "1.9.3"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -22,4 +22,4 @@ agent = SimpleAgent(
)
out = agent.run("Generate a 10,000 word blog on health and wellness.")
print(out)
print(out)

@ -0,0 +1,521 @@
import os
from typing import Optional
import json
import os
import shutil
import time
import xml.etree.ElementTree as ET
import zipfile
from tempfile import mkdtemp
from typing import Dict, Optional
from urllib.parse import urlparse
import pyautogui
import requests
import semver
import undetected_chromedriver as uc # type: ignore
import yaml
from extension import load_extension
from pydantic import BaseModel
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from tqdm import tqdm
def _is_blank_agent(agent_name: str) -> bool:
with open(f"agents/{agent_name}.py", "r") as agent_file:
agent_data = agent_file.read()
with open("src/template.py", "r") as template_file:
template_data = template_file.read()
return agent_data == template_data
def record(agent_name: str, autotab_ext_path: Optional[str] = None):
if not os.path.exists("agents"):
os.makedirs("agents")
if os.path.exists(f"agents/{agent_name}.py") and config.environment != "local":
if not _is_blank_agent(agent_name=agent_name):
raise Exception(f"Agent with name {agent_name} already exists")
driver = get_driver( # noqa: F841
autotab_ext_path=autotab_ext_path,
record_mode=True,
)
# Need to keep a reference to the driver so that it doesn't get garbage collected
with open("src/template.py", "r") as file:
data = file.read()
with open(f"agents/{agent_name}.py", "w") as file:
file.write(data)
print(
"\033[34mYou have the Python debugger open, you can run commands in it like you would in a normal Python shell.\033[0m"
)
print(
"\033[34mTo exit, type 'q' and press enter. For a list of commands type '?' and press enter.\033[0m"
)
breakpoint()
if __name__ == "__main__":
record("agent")
def extract_domain_from_url(url: str):
# url = http://username:password@hostname:port/path?arg=value#anchor
parsed_url = urlparse(url)
hostname = parsed_url.hostname
if hostname is None:
raise ValueError(f"Could not extract hostname from url {url}")
if hostname.startswith("www."):
hostname = hostname[4:]
return hostname
class AutotabChromeDriver(uc.Chrome):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def find_element_with_retry(
self, by=By.ID, value: Optional[str] = None
) -> WebElement:
try:
return super().find_element(by, value)
except Exception as e:
# TODO: Use an LLM to retry, finding a similar element on the DOM
breakpoint()
raise e
def open_plugin(driver: AutotabChromeDriver):
print("Opening plugin sidepanel")
driver.execute_script("document.activeElement.blur();")
pyautogui.press("esc")
pyautogui.hotkey("command", "shift", "y", interval=0.05) # mypy: ignore
def open_plugin_and_login(driver: AutotabChromeDriver):
if config.autotab_api_key is not None:
backend_url = (
"http://localhost:8000"
if config.environment == "local"
else "https://api.autotab.com"
)
driver.get(f"{backend_url}/auth/signin-api-key-page")
response = requests.post(
f"{backend_url}/auth/signin-api-key",
json={"api_key": config.autotab_api_key},
)
cookie = response.json()
if response.status_code != 200:
if response.status_code == 401:
raise Exception("Invalid API key")
else:
raise Exception(
f"Error {response.status_code} from backend while logging you in with your API key: {response.text}"
)
cookie["name"] = cookie["key"]
del cookie["key"]
driver.add_cookie(cookie)
driver.get("https://www.google.com")
open_plugin(driver)
else:
print("No autotab API key found, heading to autotab.com to sign up")
url = (
"http://localhost:3000/dashboard"
if config.environment == "local"
else "https://autotab.com/dashboard"
)
driver.get(url)
time.sleep(0.5)
open_plugin(driver)
def get_driver(
autotab_ext_path: Optional[str] = None, record_mode: bool = False
) -> AutotabChromeDriver:
options = webdriver.ChromeOptions()
options.add_argument("--no-sandbox") # Necessary for running
options.add_argument(
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
)
options.add_argument("--enable-webgl")
options.add_argument("--enable-3d-apis")
options.add_argument("--enable-clipboard-read-write")
options.add_argument("--disable-popup-blocking")
if autotab_ext_path is None:
load_extension()
options.add_argument("--load-extension=./src/extension/autotab")
else:
options.add_argument(f"--load-extension={autotab_ext_path}")
options.add_argument("--allow-running-insecure-content")
options.add_argument("--disable-web-security")
options.add_argument(f"--user-data-dir={mkdtemp()}")
options.binary_location = config.chrome_binary_location
driver = AutotabChromeDriver(options=options)
if record_mode:
open_plugin_and_login(driver)
return driver
class SiteCredentials(BaseModel):
name: Optional[str] = None
email: Optional[str] = None
password: Optional[str] = None
login_with_google_account: Optional[str] = None
login_url: Optional[str] = None
def __init__(self, **data) -> None:
super().__init__(**data)
if self.name is None:
self.name = self.email
class GoogleCredentials(BaseModel):
credentials: Dict[str, SiteCredentials]
def __init__(self, **data) -> None:
super().__init__(**data)
for cred in self.credentials.values():
cred.login_url = "https://accounts.google.com/v3/signin"
@property
def default(self) -> SiteCredentials:
if "default" not in self.credentials:
if len(self.credentials) == 1:
return list(self.credentials.values())[0]
raise Exception("No default credentials found in config")
return self.credentials["default"]
class Config(BaseModel):
autotab_api_key: Optional[str]
credentials: Dict[str, SiteCredentials]
google_credentials: GoogleCredentials
chrome_binary_location: str
environment: str
@classmethod
def load_from_yaml(cls, path: str):
with open(path, "r") as config_file:
config = yaml.safe_load(config_file)
_credentials = {}
for domain, creds in config.get("credentials", {}).items():
if "login_url" not in creds:
creds["login_url"] = f"https://{domain}/login"
site_creds = SiteCredentials(**creds)
_credentials[domain] = site_creds
for alt in creds.get("alts", []):
_credentials[alt] = site_creds
google_credentials = {}
for creds in config.get("google_credentials", []):
credentials: SiteCredentials = SiteCredentials(**creds)
google_credentials[credentials.name] = credentials
chrome_binary_location = config.get("chrome_binary_location")
if chrome_binary_location is None:
raise Exception("Must specify chrome_binary_location in config")
autotab_api_key = config.get("autotab_api_key")
if autotab_api_key == "...":
autotab_api_key = None
return cls(
autotab_api_key=autotab_api_key,
credentials=_credentials,
google_credentials=GoogleCredentials(credentials=google_credentials),
chrome_binary_location=config.get("chrome_binary_location"),
environment=config.get("environment", "prod"),
)
def get_site_credentials(self, domain: str) -> SiteCredentials:
credentials = self.credentials[domain].copy()
return credentials
config = Config.load_from_yaml(".autotab.yaml")
def is_signed_in_to_google(driver):
cookies = driver.get_cookies()
return len([c for c in cookies if c["name"] == "SAPISID"]) != 0
def google_login(
driver, credentials: Optional[SiteCredentials] = None, navigate: bool = True
):
print("Logging in to Google")
if navigate:
driver.get("https://accounts.google.com/")
time.sleep(1)
if is_signed_in_to_google(driver):
print("Already signed in to Google")
return
if os.path.exists("google_cookies.json"):
print("cookies exist, doing loading")
with open("google_cookies.json", "r") as f:
google_cookies = json.load(f)
for cookie in google_cookies:
if "expiry" in cookie:
cookie["expires"] = cookie["expiry"]
del cookie["expiry"]
driver.execute_cdp_cmd("Network.setCookie", cookie)
time.sleep(1)
driver.refresh()
time.sleep(2)
if not credentials:
credentials = config.google_credentials.default
if credentials is None:
raise Exception("No credentials provided for Google login")
email_input = driver.find_element(By.CSS_SELECTOR, "[type='email']")
email_input.send_keys(credentials.email)
email_input.send_keys(Keys.ENTER)
WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "[type='password']"))
)
password_input = driver.find_element(By.CSS_SELECTOR, "[type='password']")
password_input.send_keys(credentials.password)
password_input.send_keys(Keys.ENTER)
time.sleep(1.5)
print("Successfully logged in to Google")
cookies = driver.get_cookies()
if not is_signed_in_to_google(driver):
# Probably wanted to have us solve a captcha, or 2FA or confirm recovery details
print("Need 2FA help to log in to Google")
# TODO: Show screenshot it to the user
breakpoint()
if not os.path.exists("google_cookies.json"):
print("Setting Google cookies for future use")
# Log out to have access to the right cookies
driver.get("https://accounts.google.com/Logout")
time.sleep(2)
cookies = driver.get_cookies()
cookie_names = ["__Host-GAPS", "SMSV", "NID", "ACCOUNT_CHOOSER"]
google_cookies = [
cookie
for cookie in cookies
if cookie["domain"] in [".google.com", "accounts.google.com"]
and cookie["name"] in cookie_names
]
with open("google_cookies.json", "w") as f:
json.dump(google_cookies, f)
# Log back in
login_button = driver.find_element(
By.CSS_SELECTOR, f"[data-identifier='{credentials.email}']"
)
login_button.click()
time.sleep(1)
password_input = driver.find_element(By.CSS_SELECTOR, "[type='password']")
password_input.send_keys(credentials.password)
password_input.send_keys(Keys.ENTER)
time.sleep(3)
print("Successfully copied Google cookies for the future")
def login(driver, url: str):
domain = extract_domain_from_url(url)
credentials = config.get_site_credentials(domain)
login_url = credentials.login_url
if credentials.login_with_google_account:
google_credentials = config.google_credentials.credentials[
credentials.login_with_google_account
]
_login_with_google(driver, login_url, google_credentials)
else:
_login(driver, login_url, credentials=credentials)
def _login(driver, url: str, credentials: SiteCredentials):
print(f"Logging in to {url}")
driver.get(url)
time.sleep(2)
email_input = driver.find_element(By.NAME, "email")
email_input.send_keys(credentials.email)
password_input = driver.find_element(By.NAME, "password")
password_input.send_keys(credentials.password)
password_input.send_keys(Keys.ENTER)
time.sleep(3)
print(f"Successfully logged in to {url}")
def _login_with_google(driver, url: str, google_credentials: SiteCredentials):
print(f"Logging in to {url} with Google")
google_login(driver, credentials=google_credentials)
driver.get(url)
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
main_window = driver.current_window_handle
xpath = "//*[contains(text(), 'Continue with Google') or contains(text(), 'Sign in with Google') or contains(@title, 'Sign in with Google')]"
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, xpath)))
driver.find_element(
By.XPATH,
xpath,
).click()
driver.switch_to.window(driver.window_handles[-1])
driver.find_element(
By.XPATH, f"//*[contains(text(), '{google_credentials.email}')]"
).click()
driver.switch_to.window(main_window)
time.sleep(5)
print(f"Successfully logged in to {url}")
def update():
print("updating extension...")
# Download the autotab.crx file
response = requests.get(
"https://github.com/Planetary-Computers/autotab-extension/raw/main/autotab.crx",
stream=True,
)
# Check if the directory exists, if not create it
if os.path.exists("src/extension/.autotab"):
shutil.rmtree("src/extension/.autotab")
os.makedirs("src/extension/.autotab")
# Open the file in write binary mode
total_size = int(response.headers.get("content-length", 0))
block_size = 1024 # 1 Kibibyte
t = tqdm(total=total_size, unit="iB", unit_scale=True)
with open("src/extension/.autotab/autotab.crx", "wb") as f:
for data in response.iter_content(block_size):
t.update(len(data))
f.write(data)
t.close()
if total_size != 0 and t.n != total_size:
print("ERROR, something went wrong")
# Unzip the file
with zipfile.ZipFile("src/extension/.autotab/autotab.crx", "r") as zip_ref:
zip_ref.extractall("src/extension/.autotab")
os.remove("src/extension/.autotab/autotab.crx")
if os.path.exists("src/extension/autotab"):
shutil.rmtree("src/extension/autotab")
os.rename("src/extension/.autotab", "src/extension/autotab")
def should_update():
if not os.path.exists("src/extension/autotab"):
return True
# Fetch the XML file
response = requests.get(
"https://raw.githubusercontent.com/Planetary-Computers/autotab-extension/main/update.xml"
)
xml_content = response.content
# Parse the XML file
root = ET.fromstring(xml_content)
namespaces = {"ns": "http://www.google.com/update2/response"} # add namespaces
xml_version = root.find(".//ns:app/ns:updatecheck", namespaces).get("version")
# Load the local JSON file
with open("src/extension/autotab/manifest.json", "r") as f:
json_content = json.load(f)
json_version = json_content["version"]
# Compare versions
return semver.compare(xml_version, json_version) > 0
def load_extension():
should_update() and update()
if __name__ == "__main__":
print("should update:", should_update())
update()
def play(agent_name: Optional[str] = None):
if agent_name is None:
agent_files = os.listdir("agents")
if len(agent_files) == 0:
raise Exception("No agents found in agents/ directory")
elif len(agent_files) == 1:
agent_file = agent_files[0]
else:
print("Found multiple agent files, please select one:")
for i, file in enumerate(agent_files, start=1):
print(f"{i}. {file}")
selected = int(input("Select a file by number: ")) - 1
agent_file = agent_files[selected]
else:
agent_file = f"{agent_name}.py"
os.system(f"python agents/{agent_file}")
if __name__ == "__main__":
play()
"""
chrome_binary_location: /Applications/Google Chrome.app/Contents/MacOS/Google Chrome
autotab_api_key: ... # Go to https://autotab.com/dashboard to get your API key, or
# run `autotab record` with this field blank and you will be prompted to log in to autotab
# Optional, programmatically login to services using "Login with Google" authentication
google_credentials:
- name: default
email: ...
password: ...
# Optional, specify alternative accounts to use with Google login on a per-service basis
- email: you@gmail.com # Credentials without a name use email as key
password: ...
credentials:
notion.so:
alts:
- notion.com
login_with_google_account: default
figma.com:
email: ...
password: ...
airtable.com:
login_with_google_account: you@gmail.com
"""

@ -13,10 +13,11 @@ class SimpleAgent:
>>> from swarms.agents.simple_agent import SimpleAgent
>>> from swarms.structs import Flow
>>> from swarms.models import OpenAIChat
>>> api_key = "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"
>>> api_key = ""
>>> llm = OpenAIChat()
"""
def __init__(
self,
name: str,

@ -91,5 +91,3 @@ class AbstractModel(ABC):
TOKENS: {_num_tokens}
Tokens/SEC: {_time_for_generation}
"""

@ -5,6 +5,7 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Generator
from termcolor import colored
import inspect
# Custome stopping condition
def stop_when_repeats(response: str) -> bool:
# Stop if the word stop appears in the response
@ -40,6 +41,7 @@ class Flow:
retry_attempts (int): The number of retry attempts
retry_interval (int): The interval between retry attempts
interactive (bool): Whether or not to run in interactive mode
dashboard (bool): Whether or not to print the dashboard
**kwargs (Any): Any additional keyword arguments
Example:
@ -68,6 +70,8 @@ class Flow:
retry_attempts: int = 3,
retry_interval: int = 1,
interactive: bool = False,
dashboard: bool = False,
dynamic_temperature: bool = False,
**kwargs: Any,
):
self.llm = llm
@ -82,6 +86,9 @@ class Flow:
self.task = None
self.stopping_token = "<DONE>"
self.interactive = interactive
self.dashboard = dashboard
self.dynamic_temperature = dynamic_temperature
def provide_feedback(self, feedback: str) -> None:
"""Allow users to provide feedback on the responses."""
@ -99,6 +106,16 @@ class Flow:
response = self.llm(prompt, **kwargs)
return response
def dynamic_temperature(self):
"""
1. Check the self.llm object for the temperature
2. If the temperature is not present, then use the default temperature
3. If the temperature is present, then dynamically change the temperature
4. for every loop you can randomly change the temperature on a scale from 0.0 to 1.0
"""
pass
def format_prompt(self, template, **kwargs: Any) -> str:
"""Format the template with the provided kwargs using f-string interpolation."""
return template.format(**kwargs)
@ -108,20 +125,20 @@ class Flow:
init_signature = inspect.signature(self.llm.__init__)
params = init_signature.parameters
params_str_list = []
for name, param in params.items():
if name == 'self':
if name == "self":
continue
if hasattr(self.llm, name):
value = getattr(self.llm, name)
else:
value = self.llm.__dict__.get(name, 'Unknown')
params_str_list.append(f" {name.capitalize().replace('_', ' ')}: {value}")
return '\n'.join(params_str_list)
value = self.llm.__dict__.get(name, "Unknown")
params_str_list.append(
f" {name.capitalize().replace('_', ' ')}: {value}"
)
return "\n".join(params_str_list)
def run(self, task: str):
"""
@ -143,29 +160,35 @@ class Flow:
model_config = self.get_llm_init_params()
dashboard = print(colored(f"""
Flow Dashboard
--------------------------------------------
# If dashboard = True then print the dashboard
if self.dashboard:
dashboard = print(
colored(
f"""
Flow loop is initializing for {self.max_loops} with the following configuration:
Flow Dashboard
--------------------------------------------
{model_config}
----------------------------------------
Flow loop is initializing for {self.max_loops} with the following configuration:
Flow Configuration:
Task: {task}
Max Loops: {self.max_loops}
Stopping Condition: {self.stopping_condition}
Loop Interval: {self.loop_interval}
Retry Attempts: {self.retry_attempts}
Retry Interval: {self.retry_interval}
Interactive: {self.interactive}
{model_config}
----------------------------------------
----------------------------------------""", "green"))
Flow Configuration:
Task: {task}
Max Loops: {self.max_loops}
Stopping Condition: {self.stopping_condition}
Loop Interval: {self.loop_interval}
Retry Attempts: {self.retry_attempts}
Retry Interval: {self.retry_interval}
Interactive: {self.interactive}
----------------------------------------""",
"green",
)
)
print(dashboard)
print(dashboard)
for i in range(self.max_loops):
print(colored(f"\nLoop {i+1} of {self.max_loops}", "blue"))

@ -0,0 +1,87 @@
from swarms.agents import SimpleAgent
from termcolor import colored
class GroupChat:
"""
Groupchat
Args:
agents (list): List of agents
dashboard (bool): Whether to print a dashboard or not
Example:
>>> from swarms.structs import Flow
>>> from swarms.models import OpenAIChat
>>> from swarms.swarms.groupchat import GroupChat
>>> from swarms.agents import SimpleAgent
>>> api_key = ""
>>> llm = OpenAIChat()
>>> agent1 = SimpleAgent("Captain Price", Flow(llm=llm, max_loops=4))
>>> agent2 = SimpleAgent("John Mactavis", Flow(llm=llm, max_loops=4))
>>> chat = GroupChat([agent1, agent2])
>>> chat.assign_duty(agent1.name, "Buy the groceries")
>>> chat.assign_duty(agent2.name, "Clean the house")
>>> response = chat.run("Captain Price", "Hello, how are you John?")
>>> print(response)
"""
def __init__(self, agents, dashboard: bool = False):
# Ensure that all provided agents are instances of simpleagents
if not all(isinstance(agent, SimpleAgent) for agent in agents):
raise ValueError("All agents must be instances of SimpleAgent")
self.agents = {agent.name: agent for agent in agents}
# Dictionary to store duties for each agent
self.duties = {}
# Dictionary to store roles for each agent
self.roles = {}
self.dashboard = dashboard
def assign_duty(self, agent_name, duty):
"""Assigns duty to the agent"""
if agent_name not in self.agents:
raise ValueError(f"No agent named {agent_name} found.")
def assign_role(self, agent_name, role):
"""Assigns a role to the specified agent"""
if agent_name not in self.agents:
raise ValueError(f"No agent named {agent_name} found")
self.roles[agent_name] = role
def run(self, sender_name: str, message: str):
"""Runs the groupchat"""
if self.dashboard:
metrics = print(
colored(
f"""
Groupchat Configuration:
------------------------
Agents: {self.agents}
Message: {message}
Sender: {sender_name}
""",
"red",
)
)
print(metrics)
responses = {}
for agent_name, agent in self.agents.items():
if agent_name != sender_name:
if agent_name in self.duties:
message += f"Your duty is {self.duties[agent_name]}"
if agent_name in self.roles:
message += (
f"You are the {self.roles[agent_name]} in this conversation"
)
responses[agent_name] = agent.run(message)
return responses
Loading…
Cancel
Save