From d242335396755ed10d9bf495bd5275f6c02697d1 Mon Sep 17 00:00:00 2001 From: Zack Date: Wed, 25 Oct 2023 00:05:27 -0500 Subject: [PATCH] chore: Run black fromater Former-commit-id: 354ca80f293422f24a3ad4acd2bb3df1090b8fd3 --- apps/BingBot/bing_bot.py | 63 +++++----- apps/BingBot/cogs/edgegpt.py | 92 ++++++++++---- apps/BingBot/cogs/event.py | 216 +++++++++++++++++++++++--------- apps/BingBot/cogs/help.py | 33 +++-- apps/BingBot/compose.yaml | 2 +- apps/BingBot/core/classes.py | 1 + apps/BingBot/src/imageCreate.py | 17 ++- apps/BingBot/src/log.py | 25 ++-- apps/BingBot/src/response.py | 139 +++++++++++++++----- apps/MythGen/main.py | 32 +++-- apps/discord.py | 16 ++- bingchat.py | 19 --- 12 files changed, 457 insertions(+), 198 deletions(-) delete mode 100644 bingchat.py diff --git a/apps/BingBot/bing_bot.py b/apps/BingBot/bing_bot.py index 4a562411..233ca9a7 100644 --- a/apps/BingBot/bing_bot.py +++ b/apps/BingBot/bing_bot.py @@ -9,41 +9,46 @@ from dotenv import load_dotenv load_dotenv() -bot = commands.Bot(command_prefix='!', intents = discord.Intents.all()) +bot = commands.Bot(command_prefix="!", intents=discord.Intents.all()) # init loggger logger = src.log.setup_logger(__name__) + def restart_bot(): # Replace current process with new instance of bot.py os.execl(sys.executable, sys.executable, "bot.py") + def check_verion() -> None: # Read the requirements.txt file and add each line to a list - with open('requirements.txt') as f: + with open("requirements.txt") as f: required = f.read().splitlines() # For each library listed in requirements.txt, check if the corresponding version is installed for package in required: # Use the pkg_resources library to get information about the installed version of the library - package_name, package_verion = package.split('==') + package_name, package_verion = package.split("==") installed = pkg_resources.get_distribution(package_name) # Extract the library name and version number name, version = installed.project_name, installed.version # Compare the version number to see if it matches the one in requirements.txt - if package != f'{name}=={version}': - logger.error(f'{name} version {version} is installed but does not match the requirements') + if package != f"{name}=={version}": + logger.error( + f"{name} version {version} is installed but does not match the requirements" + ) sys.exit() + @bot.event async def on_ready(): bot_status = discord.Status.online - bot_activity = discord.Activity(type=discord.ActivityType.playing, name = "bing.com") - await bot.change_presence(status = bot_status, activity = bot_activity) - for Filename in os.listdir('./cogs'): - if Filename.endswith('.py'): - await bot.load_extension(f'cogs.{Filename[:-3]}') - logger.info(f'{bot.user} is now running!') + bot_activity = discord.Activity(type=discord.ActivityType.playing, name="bing.com") + await bot.change_presence(status=bot_status, activity=bot_activity) + for Filename in os.listdir("./cogs"): + if Filename.endswith(".py"): + await bot.load_extension(f"cogs.{Filename[:-3]}") + logger.info(f"{bot.user} is now running!") print("Bot is Up and Ready!") try: synced = await bot.tree.sync() @@ -51,39 +56,44 @@ async def on_ready(): except Exception as e: print(e) + # Load command -@commands.is_owner() +@commands.is_owner() @bot.command() async def load(ctx, extension): - await bot.load_extension(f'cogs.{extension}') - await ctx.author.send(f'> **Loaded {extension} done.**') + await bot.load_extension(f"cogs.{extension}") + await ctx.author.send(f"> **Loaded {extension} done.**") + # Unload command @commands.is_owner() @bot.command() async def unload(ctx, extension): - await bot.unload_extension(f'cogs.{extension}') - await ctx.author.send(f'> **Un-Loaded {extension} done.**') + await bot.unload_extension(f"cogs.{extension}") + await ctx.author.send(f"> **Un-Loaded {extension} done.**") + # Empty discord_bot.log file @commands.is_owner() @bot.command() async def clean(ctx): - open('discord_bot.log', 'w').close() - await ctx.author.send(f'> **Successfully emptied the file!**') + open("discord_bot.log", "w").close() + await ctx.author.send(f"> **Successfully emptied the file!**") + # Get discord_bot.log file @commands.is_owner() @bot.command() async def getLog(ctx): try: - with open('discord_bot.log', 'rb') as f: + with open("discord_bot.log", "rb") as f: file = discord.File(f) await ctx.author.send(file=file) await ctx.author.send("> **Send successfully!**") except: await ctx.author.send("> **Send failed!**") + # Upload new Bing cookies and restart the bot @commands.is_owner() @bot.command() @@ -92,11 +102,11 @@ async def upload(ctx): for attachment in ctx.message.attachments: if str(attachment)[-4:] == ".txt": content = await attachment.read() - with open("cookies.json", "w", encoding = "utf-8") as f: - json.dump(json.loads(content), f, indent = 2) + with open("cookies.json", "w", encoding="utf-8") as f: + json.dump(json.loads(content), f, indent=2) if not isinstance(ctx.channel, discord.abc.PrivateChannel): await ctx.message.delete() - await ctx.author.send(f'> **Upload new cookies successfully!**') + await ctx.author.send(f"> **Upload new cookies successfully!**") logger.warning("\x1b[31mCookies has been setup successfully\x1b[0m") restart_bot() else: @@ -104,12 +114,7 @@ async def upload(ctx): else: await ctx.author.send("> **Didn't get any file.**") -if __name__ == '__main__': + +if __name__ == "__main__": check_verion() bot.run(os.getenv("DISCORD_BOT_TOKEN")) - - - - - - diff --git a/apps/BingBot/cogs/edgegpt.py b/apps/BingBot/cogs/edgegpt.py index 683780db..6b303418 100644 --- a/apps/BingBot/cogs/edgegpt.py +++ b/apps/BingBot/cogs/edgegpt.py @@ -19,6 +19,7 @@ users_chatbot = {} users_image_generator = {} user_conversation_style = {} + async def init_chatbot(user_id): with open("./cookies.json", encoding="utf-8") as file: cookie_json = json.load(file) @@ -26,7 +27,7 @@ async def init_chatbot(user_id): if cookie.get("name") == "_U": auth_cookie = cookie.get("value") break - + auth_cookie = os.environ.get("AUTH_COOKIE") auth_cookie_SRCHHPGUSR = os.environ.get("AUTH_COOKIE_SRCHHPGUSR") # auth_cookie_SRCHHPGUSR = os.environ.get("AUTH_COOKIE_SRCHHPGUSR") @@ -34,6 +35,7 @@ async def init_chatbot(user_id): users_image_generator[user_id] = ImageGenAsync(auth_cookie, quiet=True) user_conversation_style[user_id] = "balanced" + class UserChatbot: def __init__(self, cookies): self.chatbot = Chatbot(cookies=cookies) @@ -47,6 +49,7 @@ class UserChatbot: async def reset(self): await self.chatbot.reset() + class EdgeGPT(Cog_Extension): # Chat with Bing @app_commands.command(name="bing", description="Have a chat with Bing") @@ -57,7 +60,7 @@ class EdgeGPT(Cog_Extension): await set_using_send(interaction.user.id, False) using = await get_using_send(interaction.user.id) if not using: - await interaction.response.defer(ephemeral=False, thinking=True) + await interaction.response.defer(ephemeral=False, thinking=True) username = str(interaction.user) usermessage = message channel = str(interaction.channel) @@ -65,11 +68,17 @@ class EdgeGPT(Cog_Extension): if user_id not in users_chatbot: await init_chatbot(interaction.user.id) conversation_style = user_conversation_style[user_id] - logger.info(f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}]") - await users_chatbot[user_id].send_message(interaction, usermessage, conversation_style) + logger.info( + f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}]" + ) + await users_chatbot[user_id].send_message( + interaction, usermessage, conversation_style + ) else: await interaction.response.defer(ephemeral=True, thinking=True) - await interaction.followup.send("> **Please wait for your last conversation to finish.**") + await interaction.followup.send( + "> **Please wait for your last conversation to finish.**" + ) # Reset Bing conversation @app_commands.command(name="reset", description="Reset Bing conversation") @@ -81,25 +90,49 @@ class EdgeGPT(Cog_Extension): await interaction.followup.send("> **Info: Reset finish.**") logger.warning("\x1b[31mBing has been successfully reset\x1b[0m") except: - await interaction.followup.send(f"> **You don't have any conversation yet.**") + await interaction.followup.send( + f"> **You don't have any conversation yet.**" + ) logger.exception("Bing reset failed.") # Switch conversation style @app_commands.command(name="switch_style", description="Switch conversation style") - @app_commands.choices(style=[app_commands.Choice(name="Creative", value="creative"), app_commands.Choice(name="Balanced", value="balanced"), app_commands.Choice(name="Precise", value="precise")]) - async def switch_style(self, interaction: discord.Interaction, style: app_commands.Choice[str]): + @app_commands.choices( + style=[ + app_commands.Choice(name="Creative", value="creative"), + app_commands.Choice(name="Balanced", value="balanced"), + app_commands.Choice(name="Precise", value="precise"), + ] + ) + async def switch_style( + self, interaction: discord.Interaction, style: app_commands.Choice[str] + ): await interaction.response.defer(ephemeral=True, thinking=True) user_id = interaction.user.id if user_id not in users_chatbot: await init_chatbot(user_id) user_conversation_style[user_id] = style.value - await interaction.followup.send(f"> **Info: successfull switch conversation style to {style.value}.**") - logger.warning(f"\x1b[31mConversation style has been successfully switch to {style.value}\x1b[0m") - + await interaction.followup.send( + f"> **Info: successfull switch conversation style to {style.value}.**" + ) + logger.warning( + f"\x1b[31mConversation style has been successfully switch to {style.value}\x1b[0m" + ) + # Set and delete personal Bing Cookies @app_commands.command(name="bing_cookies", description="Set or delete Bing Cookies") - @app_commands.choices(choice=[app_commands.Choice(name="set", value="set"), app_commands.Choice(name="delete", value="delete")]) - async def cookies_setting(self, interaction: discord.Interaction, choice: app_commands.Choice[str], cookies_file: Optional[discord.Attachment]=None): + @app_commands.choices( + choice=[ + app_commands.Choice(name="set", value="set"), + app_commands.Choice(name="delete", value="delete"), + ] + ) + async def cookies_setting( + self, + interaction: discord.Interaction, + choice: app_commands.Choice[str], + cookies_file: Optional[discord.Attachment] = None, + ): await interaction.response.defer(ephemeral=True, thinking=True) user_id = interaction.user.id if choice.value == "set": @@ -111,11 +144,15 @@ class EdgeGPT(Cog_Extension): break users_image_generator[user_id] = ImageGenAsync(auth_cookie, quiet=True) users_chatbot[user_id] = UserChatbot(cookies=content) - user_conversation_style[user_id] = "balanced" + user_conversation_style[user_id] = "balanced" await interaction.followup.send("> **Upload successful!**") - logger.warning(f"\x1b[31m{interaction.user} set Bing Cookies successful\x1b[0m") + logger.warning( + f"\x1b[31m{interaction.user} set Bing Cookies successful\x1b[0m" + ) except: - await interaction.followup.send("> **Please upload your Bing Cookies.**") + await interaction.followup.send( + "> **Please upload your Bing Cookies.**" + ) else: try: del users_chatbot[user_id] @@ -124,10 +161,14 @@ class EdgeGPT(Cog_Extension): await interaction.followup.send("> **Delete finish.**") logger.warning(f"\x1b[31m{interaction.user} delete Cookies\x1b[0m") except: - await interaction.followup.send("> **You don't have any Bing Cookies.**") + await interaction.followup.send( + "> **You don't have any Bing Cookies.**" + ) # Create images - @app_commands.command(name="create_image", description="generate image by Bing image creator") + @app_commands.command( + name="create_image", description="generate image by Bing image creator" + ) async def create_image(self, interaction: discord.Interaction, *, prompt: str): user_id = interaction.user.id if interaction.user.id not in users_chatbot: @@ -137,12 +178,19 @@ class EdgeGPT(Cog_Extension): except: await set_using_create(user_id, False) using = await get_using_create(user_id) - if not using: - logger.info(f"\x1b[31m{interaction.user}\x1b[0m : '{prompt}' ({interaction.channel}) [BingImageCreator]") - await users_chatbot[user_id].create_image(interaction, prompt, users_image_generator[user_id] ) + if not using: + logger.info( + f"\x1b[31m{interaction.user}\x1b[0m : '{prompt}' ({interaction.channel}) [BingImageCreator]" + ) + await users_chatbot[user_id].create_image( + interaction, prompt, users_image_generator[user_id] + ) else: await interaction.response.defer(ephemeral=True, thinking=True) - await interaction.followup.send("> **Please wait for your last image to create finish.**") + await interaction.followup.send( + "> **Please wait for your last image to create finish.**" + ) + async def setup(bot): await bot.add_cog(EdgeGPT(bot)) diff --git a/apps/BingBot/cogs/event.py b/apps/BingBot/cogs/event.py index f42b6e5b..9b81cf30 100644 --- a/apps/BingBot/cogs/event.py +++ b/apps/BingBot/cogs/event.py @@ -25,64 +25,101 @@ with open("./cookies.json", encoding="utf-8") as file: cookies = json.load(file) chatbot = Chatbot(cookies=cookies) + # To add suggest responses class MyView(discord.ui.View): - def __init__(self, chatbot: Chatbot, suggest_responses:list): + def __init__(self, chatbot: Chatbot, suggest_responses: list): super().__init__(timeout=120) # Add buttons for label in suggest_responses: button = discord.ui.Button(label=label) + # Button event - async def callback(interaction: discord.Interaction, button: discord.ui.Button): - await interaction.response.defer(ephemeral=False, thinking=True) - # When click the button, all buttons will disable. - for child in self.children: - child.disabled = True - await interaction.followup.edit_message(message_id=interaction.message.id, view=self) - username = str(interaction.user) - usermessage = button.label - channel = str(interaction.channel) - logger.info(f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}] [button]") - task = asyncio.create_task(send_message(chatbot, interaction, usermessage)) - await asyncio.gather(task) + async def callback( + interaction: discord.Interaction, button: discord.ui.Button + ): + await interaction.response.defer(ephemeral=False, thinking=True) + # When click the button, all buttons will disable. + for child in self.children: + child.disabled = True + await interaction.followup.edit_message( + message_id=interaction.message.id, view=self + ) + username = str(interaction.user) + usermessage = button.label + channel = str(interaction.channel) + logger.info( + f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}] [button]" + ) + task = asyncio.create_task( + send_message(chatbot, interaction, usermessage) + ) + await asyncio.gather(task) + self.add_item(button) self.children[-1].callback = partial(callback, button=button) + + # Show Dropdown class DropdownView(discord.ui.View): def __init__(self): super().__init__(timeout=180) options = [ - discord.SelectOption(label="Creative", description="Switch conversation style to Creative", emoji='🎨'), - discord.SelectOption(label="Balanced", description="Switch conversation style to Balanced", emoji='⚖️'), - discord.SelectOption(label="Precise", description="Switch conversation style to Precise", emoji='🔎'), - discord.SelectOption(label="Reset", description="Reset conversation", emoji="🔄") + discord.SelectOption( + label="Creative", + description="Switch conversation style to Creative", + emoji="🎨", + ), + discord.SelectOption( + label="Balanced", + description="Switch conversation style to Balanced", + emoji="⚖️", + ), + discord.SelectOption( + label="Precise", + description="Switch conversation style to Precise", + emoji="🔎", + ), + discord.SelectOption( + label="Reset", description="Reset conversation", emoji="🔄" + ), ] dropdown = discord.ui.Select( - placeholder="Choose setting", - min_values=1, - max_values=1, - options=options + placeholder="Choose setting", min_values=1, max_values=1, options=options ) dropdown.callback = self.dropdown_callback self.add_item(dropdown) + # Dropdown event async def dropdown_callback(self, interaction: discord.Interaction): await interaction.response.defer(ephemeral=False, thinking=True) - if interaction.data['values'][0] == "Creative": + if interaction.data["values"][0] == "Creative": await set_conversation_style("creative") - await interaction.followup.send(f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**") - logger.warning(f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m") - elif interaction.data['values'][0] == "Balanced": + await interaction.followup.send( + f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**" + ) + logger.warning( + f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m" + ) + elif interaction.data["values"][0] == "Balanced": await set_conversation_style("balanced") - await interaction.followup.send(f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**") - logger.warning(f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m") - elif interaction.data['values'][0] == "Precise": + await interaction.followup.send( + f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**" + ) + logger.warning( + f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m" + ) + elif interaction.data["values"][0] == "Precise": await set_conversation_style("precise") - await interaction.followup.send(f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**") - logger.warning(f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m") + await interaction.followup.send( + f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**" + ) + logger.warning( + f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m" + ) else: await chatbot.reset() await interaction.followup.send(f"> **Info: Reset finish.**") @@ -90,46 +127,66 @@ class DropdownView(discord.ui.View): # disable dropdown after select for dropdown in self.children: dropdown.disabled = True - await interaction.followup.edit_message(message_id=interaction.message.id, view=self) + await interaction.followup.edit_message( + message_id=interaction.message.id, view=self + ) + # Set conversation style async def set_conversation_style(style: str): global conversation_style conversation_style = style + + async def set_chatbot(cookies): global chatbot chatbot = Chatbot(cookies=cookies) + async def send_message(chatbot: Chatbot, message, user_message: str): async with sem: if isinstance(message, discord.message.Message): await message.channel.typing() - reply = '' - text = '' - link_embed = '' + reply = "" + text = "" + link_embed = "" images_embed = [] all_url = [] try: - # Change conversation style + # Change conversation style if conversation_style == "creative": - reply = await chatbot.ask(prompt=user_message, conversation_style=ConversationStyle.creative, simplify_response=True) + reply = await chatbot.ask( + prompt=user_message, + conversation_style=ConversationStyle.creative, + simplify_response=True, + ) elif conversation_style == "precise": - reply = await chatbot.ask(prompt=user_message, conversation_style=ConversationStyle.precise, simplify_response=True) + reply = await chatbot.ask( + prompt=user_message, + conversation_style=ConversationStyle.precise, + simplify_response=True, + ) else: - reply = await chatbot.ask(prompt=user_message, conversation_style=ConversationStyle.balanced, simplify_response=True) + reply = await chatbot.ask( + prompt=user_message, + conversation_style=ConversationStyle.balanced, + simplify_response=True, + ) # Get reply text text = f"{reply['text']}" - text = re.sub(r'\[\^(\d+)\^\]', lambda match: '', text) + text = re.sub(r"\[\^(\d+)\^\]", lambda match: "", text) # Get the URL, if available try: - if len(reply['sources']) != 0: - for i, url in enumerate(reply['sources'], start=1): - if len(url['providerDisplayName']) == 0: + if len(reply["sources"]) != 0: + for i, url in enumerate(reply["sources"], start=1): + if len(url["providerDisplayName"]) == 0: all_url.append(f"{i}. {url['seeMoreUrl']}") else: - all_url.append(f"{i}. [{url['providerDisplayName']}]({url['seeMoreUrl']})") + all_url.append( + f"{i}. [{url['providerDisplayName']}]({url['seeMoreUrl']})" + ) link_text = "\n".join(all_url) link_embed = discord.Embed(description=link_text) except: @@ -147,45 +204,80 @@ async def send_message(chatbot: Chatbot, message, user_message: str): while len(response) > 2000: temp = response[:2000] response = response[2000:] - if isinstance(message, discord.interactions.Interaction): + if isinstance(message, discord.interactions.Interaction): await message.followup.send(temp) - else: + else: await message.channel.send(temp) # Get the image, if available try: if len(link_embed) == 0: - all_image = re.findall("https?://[\w\./]+", str(reply["sources_text"])) - [images_embed.append(discord.Embed(url="https://www.bing.com/").set_image(url=image_link)) for image_link in all_image] + all_image = re.findall( + "https?://[\w\./]+", str(reply["sources_text"]) + ) + [ + images_embed.append( + discord.Embed(url="https://www.bing.com/").set_image( + url=image_link + ) + ) + for image_link in all_image + ] except: pass - + if USE_SUGGEST_RESPONSES: suggest_responses = reply["suggestions"] if images_embed: if isinstance(message, discord.interactions.Interaction): - await message.followup.send(response, view=MyView(chatbot, suggest_responses), embeds=images_embed, wait=True) + await message.followup.send( + response, + view=MyView(chatbot, suggest_responses), + embeds=images_embed, + wait=True, + ) else: - await message.channel.send(response, view=MyView(chatbot, suggest_responses), embeds=images_embed) + await message.channel.send( + response, + view=MyView(chatbot, suggest_responses), + embeds=images_embed, + ) elif link_embed: if isinstance(message, discord.interactions.Interaction): - await message.followup.send(response, view=MyView(chatbot, suggest_responses), embed=link_embed, wait=True) + await message.followup.send( + response, + view=MyView(chatbot, suggest_responses), + embed=link_embed, + wait=True, + ) else: - await message.channel.send(response, view=MyView(chatbot, suggest_responses), embed=link_embed) + await message.channel.send( + response, + view=MyView(chatbot, suggest_responses), + embed=link_embed, + ) else: if isinstance(message, discord.interactions.Interaction): - await message.followup.send(response, view=MyView(chatbot, suggest_responses), wait=True) + await message.followup.send( + response, view=MyView(chatbot, suggest_responses), wait=True + ) else: - await message.channel.send(response, view=MyView(chatbot, suggest_responses)) + await message.channel.send( + response, view=MyView(chatbot, suggest_responses) + ) else: if images_embed: if isinstance(message, discord.interactions.Interaction): - await message.followup.send(response, embeds=images_embed, wait=True) + await message.followup.send( + response, embeds=images_embed, wait=True + ) else: await message.channel.send(response, embeds=images_embed) elif link_embed: if isinstance(message, discord.interactions.Interaction): - await message.followup.send(response, embed=link_embed, wait=True) + await message.followup.send( + response, embed=link_embed, wait=True + ) else: await message.channel.send(response, embed=link_embed) else: @@ -200,6 +292,7 @@ async def send_message(chatbot: Chatbot, message, user_message: str): await message.channel.send(f">>> **Error: {e}**") logger.exception(f"Error while sending message: {e}") + class Event(Cog_Extension): @commands.Cog.listener() async def on_message(self, message: discord.Message): @@ -207,17 +300,22 @@ class Event(Cog_Extension): return if self.bot.user in message.mentions: if not MENTION_CHANNEL_ID or message.channel.id == MENTION_CHANNEL_ID: - content = re.sub(r'<@.*?>', '', message.content).strip() + content = re.sub(r"<@.*?>", "", message.content).strip() if len(content) > 0: username = str(message.author) channel = str(message.channel) - logger.info(f"\x1b[31m{username}\x1b[0m : '{content}' ({channel}) [Style: {conversation_style}]") + logger.info( + f"\x1b[31m{username}\x1b[0m : '{content}' ({channel}) [Style: {conversation_style}]" + ) task = asyncio.create_task(send_message(chatbot, message, content)) await asyncio.gather(task) else: await message.channel.send(view=DropdownView()) elif MENTION_CHANNEL_ID is not None: - await message.channel.send(f"> **Can only be mentioned at <#{self.bot.get_channel(MENTION_CHANNEL_ID).id}>**") + await message.channel.send( + f"> **Can only be mentioned at <#{self.bot.get_channel(MENTION_CHANNEL_ID).id}>**" + ) + async def setup(bot): await bot.add_cog(Event(bot)) diff --git a/apps/BingBot/cogs/help.py b/apps/BingBot/cogs/help.py index 16ecff78..eef6a61d 100644 --- a/apps/BingBot/cogs/help.py +++ b/apps/BingBot/cogs/help.py @@ -2,16 +2,35 @@ import discord from core.classes import Cog_Extension from discord import app_commands + class Help(Cog_Extension): - @app_commands.command(name = "help", description = "Show how to use") + @app_commands.command(name="help", description="Show how to use") async def help(self, interaction: discord.Interaction): - embed=discord.Embed(title="Help", description="[see more](https://github.com/FuseFairy/DiscordBot-EdgeGPT/blob/main/README.md)\n\n**COMMANDS -**") - embed.add_field(name="/bing_cookies", value="Set and delete your Bing Cookies.", inline=False) + embed = discord.Embed( + title="Help", + description="[see more](https://github.com/FuseFairy/DiscordBot-EdgeGPT/blob/main/README.md)\n\n**COMMANDS -**", + ) + embed.add_field( + name="/bing_cookies", + value="Set and delete your Bing Cookies.", + inline=False, + ) embed.add_field(name="/bing", value="Chat with Bing.", inline=False) - embed.add_field(name="/reset", value="Reset your Bing conversation.", inline=False) - embed.add_field(name="/switch_style", value="Switch your Bing conversation style.", inline=False) - embed.add_field(name="/create_image", value="Generate image by Bing Image Creator.", inline=False) + embed.add_field( + name="/reset", value="Reset your Bing conversation.", inline=False + ) + embed.add_field( + name="/switch_style", + value="Switch your Bing conversation style.", + inline=False, + ) + embed.add_field( + name="/create_image", + value="Generate image by Bing Image Creator.", + inline=False, + ) await interaction.response.send_message(embed=embed) + async def setup(bot): - await bot.add_cog(Help(bot)) \ No newline at end of file + await bot.add_cog(Help(bot)) diff --git a/apps/BingBot/compose.yaml b/apps/BingBot/compose.yaml index f574f912..64e278fa 100644 --- a/apps/BingBot/compose.yaml +++ b/apps/BingBot/compose.yaml @@ -1,7 +1,7 @@ version: '3' services: - discord_edgegpt: + BingBot: build: . environment: - DISCORD_BOT_TOKEN=${DISCORD_BOT_TOKEN} diff --git a/apps/BingBot/core/classes.py b/apps/BingBot/core/classes.py index 8dfdb114..23c4cbb1 100644 --- a/apps/BingBot/core/classes.py +++ b/apps/BingBot/core/classes.py @@ -1,5 +1,6 @@ from discord.ext import commands + class Cog_Extension(commands.Cog): def __init__(self, bot): self.bot = bot diff --git a/apps/BingBot/src/imageCreate.py b/apps/BingBot/src/imageCreate.py index b88d1d4b..0b68b44d 100644 --- a/apps/BingBot/src/imageCreate.py +++ b/apps/BingBot/src/imageCreate.py @@ -5,10 +5,14 @@ from src import log logger = log.setup_logger(__name__) using_func = {} + async def get_using_create(user_id): return using_func[user_id] + + async def set_using_create(user_id, status: bool): - using_func[user_id] = status + using_func[user_id] = status + async def create_image(interaction: discord.Interaction, prompt: str, image_generator): await interaction.response.defer(ephemeral=False, thinking=True) @@ -16,10 +20,15 @@ async def create_image(interaction: discord.Interaction, prompt: str, image_gene try: embeds = [] prompts = f"> **{prompt}** - <@{str(interaction.user.id)}> (***BingImageCreator***)\n\n" - # Fetches image links + # Fetches image links images = await image_generator.get_images(prompt) # Add embed to list of embeds - [embeds.append(discord.Embed(url="https://www.bing.com/").set_image(url=image_link)) for image_link in images] + [ + embeds.append( + discord.Embed(url="https://www.bing.com/").set_image(url=image_link) + ) + for image_link in images + ] await interaction.followup.send(prompts, embeds=embeds, wait=True) except asyncio.TimeoutError: await interaction.followup.send("> **Error: Request timed out.**") @@ -28,4 +37,4 @@ async def create_image(interaction: discord.Interaction, prompt: str, image_gene await interaction.followup.send(f"> **Error: {e}**") logger.exception(f"Error while create image: {e}") finally: - using_func[interaction.user.id] = False \ No newline at end of file + using_func[interaction.user.id] = False diff --git a/apps/BingBot/src/log.py b/apps/BingBot/src/log.py index fba4e94d..ed04a4a3 100644 --- a/apps/BingBot/src/log.py +++ b/apps/BingBot/src/log.py @@ -4,18 +4,17 @@ import logging.handlers class CustomFormatter(logging.Formatter): - LEVEL_COLORS = [ - (logging.DEBUG, '\x1b[40;1m'), - (logging.INFO, '\x1b[34;1m'), - (logging.WARNING, '\x1b[33;1m'), - (logging.ERROR, '\x1b[31m'), - (logging.CRITICAL, '\x1b[41m'), + (logging.DEBUG, "\x1b[40;1m"), + (logging.INFO, "\x1b[34;1m"), + (logging.WARNING, "\x1b[33;1m"), + (logging.ERROR, "\x1b[31m"), + (logging.CRITICAL, "\x1b[41m"), ] FORMATS = { level: logging.Formatter( - f'\x1b[30;1m%(asctime)s\x1b[0m {color}%(levelname)-8s\x1b[0m \x1b[35m%(name)s\x1b[0m -> %(message)s', - '%Y-%m-%d %H:%M:%S' + f"\x1b[30;1m%(asctime)s\x1b[0m {color}%(levelname)-8s\x1b[0m \x1b[35m%(name)s\x1b[0m -> %(message)s", + "%Y-%m-%d %H:%M:%S", ) for level, color in LEVEL_COLORS } @@ -28,7 +27,7 @@ class CustomFormatter(logging.Formatter): # Override the traceback to always print in red if record.exc_info: text = formatter.formatException(record.exc_info) - record.exc_text = f'\x1b[31m{text}\x1b[0m' + record.exc_text = f"\x1b[31m{text}\x1b[0m" output = formatter.format(record) @@ -37,9 +36,9 @@ class CustomFormatter(logging.Formatter): return output -def setup_logger(module_name:str) -> logging.Logger: +def setup_logger(module_name: str) -> logging.Logger: # create logger - library, _, _ = module_name.partition('.py') + library, _, _ = module_name.partition(".py") logger = logging.getLogger(library) logger.setLevel(logging.INFO) @@ -50,12 +49,12 @@ def setup_logger(module_name:str) -> logging.Logger: console_handler.setFormatter(CustomFormatter()) # specify that the log file path is the same as `main.py` file path grandparent_dir = os.path.abspath(__file__ + "/../../") - log_name='discord_bot.log' + log_name = "discord_bot.log" log_path = os.path.join(grandparent_dir, log_name) # create local log handler log_handler = logging.handlers.RotatingFileHandler( filename=log_path, - encoding='utf-8', + encoding="utf-8", maxBytes=32 * 1024 * 1024, # 32 MiB backupCount=2, # Rotate through 5 files ) diff --git a/apps/BingBot/src/response.py b/apps/BingBot/src/response.py index 371622b8..47960a73 100644 --- a/apps/BingBot/src/response.py +++ b/apps/BingBot/src/response.py @@ -8,79 +8,127 @@ USE_SUGGEST_RESPONSES = True logger = log.setup_logger(__name__) using_func = {} + # To add suggest responses class MyView(discord.ui.View): - def __init__(self, interaction: discord.Interaction, chatbot: Chatbot, conversation_style:str, suggest_responses:list): + def __init__( + self, + interaction: discord.Interaction, + chatbot: Chatbot, + conversation_style: str, + suggest_responses: list, + ): super().__init__(timeout=120) - self.button_author =interaction.user.id + self.button_author = interaction.user.id # Add buttons for label in suggest_responses: button = discord.ui.Button(label=label) + # Button event - async def callback(interaction: discord.Interaction, button_author: int, button: discord.ui.Button): + async def callback( + interaction: discord.Interaction, + button_author: int, + button: discord.ui.Button, + ): if interaction.user.id != button_author: await interaction.response.defer(ephemeral=True, thinking=True) - await interaction.followup.send("You don't have permission to press this button.") + await interaction.followup.send( + "You don't have permission to press this button." + ) elif not using_func[interaction.user.id]: await interaction.response.defer(ephemeral=False, thinking=True) # When click the button, all buttons will disable. for child in self.children: child.disabled = True - await interaction.followup.edit_message(message_id=interaction.message.id, view=self) + await interaction.followup.edit_message( + message_id=interaction.message.id, view=self + ) username = str(interaction.user) usermessage = button.label channel = str(interaction.channel) - logger.info(f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}] [button]") - await send_message(chatbot, interaction, usermessage, conversation_style) + logger.info( + f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}] [button]" + ) + await send_message( + chatbot, interaction, usermessage, conversation_style + ) else: await interaction.response.defer(ephemeral=True, thinking=True) - await interaction.followup.send("Please wait for your last conversation to finish.") + await interaction.followup.send( + "Please wait for your last conversation to finish." + ) + self.add_item(button) - self.children[-1].callback = partial(callback, button_author=self.button_author, button=button) + self.children[-1].callback = partial( + callback, button_author=self.button_author, button=button + ) + async def get_using_send(user_id): return using_func[user_id] + + async def set_using_send(user_id, status: bool): using_func[user_id] = status -async def send_message(chatbot: Chatbot, interaction: discord.Interaction, user_message: str, conversation_style: str): + +async def send_message( + chatbot: Chatbot, + interaction: discord.Interaction, + user_message: str, + conversation_style: str, +): using_func[interaction.user.id] = True - reply = '' - text = '' - link_embed = '' + reply = "" + text = "" + link_embed = "" images_embed = [] all_url = [] try: # Change conversation style if conversation_style == "creative": - reply = await chatbot.ask(prompt=user_message, conversation_style=ConversationStyle.creative, simplify_response=True) + reply = await chatbot.ask( + prompt=user_message, + conversation_style=ConversationStyle.creative, + simplify_response=True, + ) elif conversation_style == "precise": - reply = await chatbot.ask(prompt=user_message, conversation_style=ConversationStyle.precise, simplify_response=True) + reply = await chatbot.ask( + prompt=user_message, + conversation_style=ConversationStyle.precise, + simplify_response=True, + ) else: - reply = await chatbot.ask(prompt=user_message, conversation_style=ConversationStyle.balanced, simplify_response=True) + reply = await chatbot.ask( + prompt=user_message, + conversation_style=ConversationStyle.balanced, + simplify_response=True, + ) # Get reply text text = f"{reply['text']}" - text = re.sub(r'\[\^(\d+)\^\]', lambda match: '', text) - + text = re.sub(r"\[\^(\d+)\^\]", lambda match: "", text) + # Get the URL, if available try: - if len(reply['sources']) != 0: - for i, url in enumerate(reply['sources'], start=1): - if len(url['providerDisplayName']) == 0: + if len(reply["sources"]) != 0: + for i, url in enumerate(reply["sources"], start=1): + if len(url["providerDisplayName"]) == 0: all_url.append(f"{i}. {url['seeMoreUrl']}") else: - all_url.append(f"{i}. [{url['providerDisplayName']}]({url['seeMoreUrl']})") + all_url.append( + f"{i}. [{url['providerDisplayName']}]({url['seeMoreUrl']})" + ) link_text = "\n".join(all_url) link_embed = discord.Embed(description=link_text) except: pass - + # Set the final message user_message = user_message.replace("\n", "") ask = f"> **{user_message}** - <@{str(interaction.user.id)}> (***style: {conversation_style}***)\n\n" response = f"{ask}{text}" - + # Discord limit about 2000 characters for a message while len(response) > 2000: temp = response[:2000] @@ -91,27 +139,56 @@ async def send_message(chatbot: Chatbot, interaction: discord.Interaction, user_ try: if len(link_embed) == 0: all_image = re.findall("https?://[\w\./]+", str(reply["sources_text"])) - [images_embed.append(discord.Embed(url="https://www.bing.com/").set_image(url=image_link)) for image_link in all_image] + [ + images_embed.append( + discord.Embed(url="https://www.bing.com/").set_image( + url=image_link + ) + ) + for image_link in all_image + ] except: pass # Add all suggest responses in list if USE_SUGGEST_RESPONSES: suggest_responses = reply["suggestions"] if images_embed: - await interaction.followup.send(response, view=MyView(interaction, chatbot, conversation_style, suggest_responses), embeds=images_embed, wait=True) + await interaction.followup.send( + response, + view=MyView( + interaction, chatbot, conversation_style, suggest_responses + ), + embeds=images_embed, + wait=True, + ) elif link_embed: - await interaction.followup.send(response, view=MyView(interaction, chatbot, conversation_style, suggest_responses), embed=link_embed, wait=True) + await interaction.followup.send( + response, + view=MyView( + interaction, chatbot, conversation_style, suggest_responses + ), + embed=link_embed, + wait=True, + ) else: - await interaction.followup.send(response, view=MyView(interaction, chatbot, conversation_style, suggest_responses), wait=True) + await interaction.followup.send( + response, + view=MyView( + interaction, chatbot, conversation_style, suggest_responses + ), + wait=True, + ) else: if images_embed: - await interaction.followup.send(response, embeds=images_embed, wait=True) + await interaction.followup.send( + response, embeds=images_embed, wait=True + ) elif link_embed: await interaction.followup.send(response, embed=link_embed, wait=True) else: await interaction.followup.send(response, wait=True) except Exception as e: - await interaction.followup.send(f">>> **Error: {e}**") - logger.exception(f"Error while sending message: {e}") + await interaction.followup.send(f">>> **Error: {e}**") + logger.exception(f"Error while sending message: {e}") finally: using_func[interaction.user.id] = False diff --git a/apps/MythGen/main.py b/apps/MythGen/main.py index 96d02e3c..1c684286 100644 --- a/apps/MythGen/main.py +++ b/apps/MythGen/main.py @@ -6,13 +6,15 @@ import gradio as gr from BingImageCreator import ImageGen from swarms.models.bing_chat import BingChat -# from swarms.models.bingchat import BingChat +# from swarms.models.bingchat import BingChat dotenv.load_dotenv(".env") # Initialize the EdgeGPTModel cookie = os.environ.get("BING_COOKIE") auth = os.environ.get("AUTH_COOKIE") -model = BingChat(cookies_path="./cookies.json", bing_cookie="BING_COOKIE",auth_cookie="AUTH_COOKIE") +model = BingChat( + cookies_path="./cookies.json", bing_cookie="BING_COOKIE", auth_cookie="AUTH_COOKIE" +) response = model("Generate") @@ -23,34 +25,39 @@ latest_caption = "" standard_suffix = "" storyboard = [] + def generate_images_with_bingchat(caption): img_path = model.create_img(caption) img_urls = model.images(caption) return img_urls + def generate_single_caption(text): prompt = f"A comic about {text}." response = model(text) return response + def interpret_text_with_gpt(text, suffix): return generate_single_caption(f"{text} {suffix}") + def create_standard_suffix(original_prompt): return f"In the style of {original_prompt}" + def gradio_interface(text=None, next_button_clicked=False): global accumulated_story, latest_caption, standard_suffix, storyboard - + if not standard_suffix: standard_suffix = create_standard_suffix(text) - + if next_button_clicked: new_caption = generate_single_caption(latest_caption + " " + standard_suffix) new_urls = generate_images_with_bingchat(new_caption) latest_caption = new_caption storyboard.append((new_urls, new_caption)) - + elif text: caption = generate_single_caption(text + " " + standard_suffix) comic_panel_urls = generate_images_with_bingchat(caption) @@ -60,18 +67,25 @@ def gradio_interface(text=None, next_button_clicked=False): storyboard_html = "" for urls, cap in storyboard: for url in urls: - storyboard_html += f'{cap}
{cap}
' + storyboard_html += ( + f'{cap}
{cap}
' + ) return storyboard_html + if __name__ == "__main__": iface = gr.Interface( fn=gradio_interface, inputs=[ - gr.inputs.Textbox(default="Type your story concept here", optional=True, label="Story Concept"), - gr.inputs.Checkbox(label="Generate Next Part") + gr.inputs.Textbox( + default="Type your story concept here", + optional=True, + label="Story Concept", + ), + gr.inputs.Checkbox(label="Generate Next Part"), ], outputs=[gr.outputs.HTML()], - live=False # Submit button will appear + live=False, # Submit button will appear ) iface.launch() diff --git a/apps/discord.py b/apps/discord.py index eebc48c8..4d4fcd71 100644 --- a/apps/discord.py +++ b/apps/discord.py @@ -5,6 +5,7 @@ import os from dotenv import load_dotenv from invoke import Executor + class BotCommands(commands.Cog): def __init__(self, bot): self.bot = bot @@ -46,7 +47,7 @@ class BotCommands(commands.Cog): """starts listening to voice in the voice channel that the bot is in.""" if ctx.voice_client: # create a wavesink to record the audio - sink = discord.sinks.wavesink('audio.wav') + sink = discord.sinks.wavesink("audio.wav") # start recording ctx.voice_client.start_recording(sink) await ctx.send("started listening and recording.") @@ -68,7 +69,11 @@ class BotCommands(commands.Cog): print("done generating images!") # list all files in the save_directory - all_files = [os.path.join(root, file) for root, _, files in os.walk(os.environ("SAVE_DIRECTORY")) for file in files] + all_files = [ + os.path.join(root, file) + for root, _, files in os.walk(os.environ("SAVE_DIRECTORY")) + for file in files + ] # sort files by their creation time (latest first) sorted_files = sorted(all_files, key=os.path.getctime, reverse=True) @@ -82,7 +87,9 @@ class BotCommands(commands.Cog): # await ctx.send(files=[storage_service.upload(filepath) for filepath in latest_files]) except asyncio.timeouterror: - await ctx.send("the request took too long! it might have been censored or you're out of boosts. please try entering the prompt again.") + await ctx.send( + "the request took too long! it might have been censored or you're out of boosts. please try entering the prompt again." + ) except Exception as e: await ctx.send(f"an error occurred: {e}") @@ -107,10 +114,11 @@ class BotCommands(commands.Cog): else: await ctx.send(f"an error occurred: {error}") + class Bot: def __init__(self, llm, command_prefix="!"): load_dotenv() - + intents = discord.Intents.default() intents.messages = True intents.guilds = True diff --git a/bingchat.py b/bingchat.py deleted file mode 100644 index f4b91cd7..00000000 --- a/bingchat.py +++ /dev/null @@ -1,19 +0,0 @@ -import os -from swarms.models.bing_chat import BingChat -from dotenv import load_dotenv - -load_dotenv() - -# Initialize the EdgeGPTModel -edgegpt = BingChat(cookies_path="./cookies.json") -cookie = os.environ.get("BING_COOKIE") -auth = os.environ.get("AUTH_COOKIE") - -# Use the worker to process a task -# task = "hi" -img_task = "Sunset over mountains" - -# response = edgegpt(task) -response = edgegpt.create_img(auth_cookie=cookie,auth_cookie_SRCHHPGUSR=auth,prompt=img_task) - -print(response)