chore: Run black fromater

Former-commit-id: 354ca80f29
blog-agent
Zack 2 years ago
parent efb4bb876c
commit d242335396

@ -9,41 +9,46 @@ from dotenv import load_dotenv
load_dotenv()
bot = commands.Bot(command_prefix='!', intents = discord.Intents.all())
bot = commands.Bot(command_prefix="!", intents=discord.Intents.all())
# init loggger
logger = src.log.setup_logger(__name__)
def restart_bot():
# Replace current process with new instance of bot.py
os.execl(sys.executable, sys.executable, "bot.py")
def check_verion() -> None:
# Read the requirements.txt file and add each line to a list
with open('requirements.txt') as f:
with open("requirements.txt") as f:
required = f.read().splitlines()
# For each library listed in requirements.txt, check if the corresponding version is installed
for package in required:
# Use the pkg_resources library to get information about the installed version of the library
package_name, package_verion = package.split('==')
package_name, package_verion = package.split("==")
installed = pkg_resources.get_distribution(package_name)
# Extract the library name and version number
name, version = installed.project_name, installed.version
# Compare the version number to see if it matches the one in requirements.txt
if package != f'{name}=={version}':
logger.error(f'{name} version {version} is installed but does not match the requirements')
if package != f"{name}=={version}":
logger.error(
f"{name} version {version} is installed but does not match the requirements"
)
sys.exit()
@bot.event
async def on_ready():
bot_status = discord.Status.online
bot_activity = discord.Activity(type=discord.ActivityType.playing, name = "bing.com")
await bot.change_presence(status = bot_status, activity = bot_activity)
for Filename in os.listdir('./cogs'):
if Filename.endswith('.py'):
await bot.load_extension(f'cogs.{Filename[:-3]}')
logger.info(f'{bot.user} is now running!')
bot_activity = discord.Activity(type=discord.ActivityType.playing, name="bing.com")
await bot.change_presence(status=bot_status, activity=bot_activity)
for Filename in os.listdir("./cogs"):
if Filename.endswith(".py"):
await bot.load_extension(f"cogs.{Filename[:-3]}")
logger.info(f"{bot.user} is now running!")
print("Bot is Up and Ready!")
try:
synced = await bot.tree.sync()
@ -51,39 +56,44 @@ async def on_ready():
except Exception as e:
print(e)
# Load command
@commands.is_owner()
@commands.is_owner()
@bot.command()
async def load(ctx, extension):
await bot.load_extension(f'cogs.{extension}')
await ctx.author.send(f'> **Loaded {extension} done.**')
await bot.load_extension(f"cogs.{extension}")
await ctx.author.send(f"> **Loaded {extension} done.**")
# Unload command
@commands.is_owner()
@bot.command()
async def unload(ctx, extension):
await bot.unload_extension(f'cogs.{extension}')
await ctx.author.send(f'> **Un-Loaded {extension} done.**')
await bot.unload_extension(f"cogs.{extension}")
await ctx.author.send(f"> **Un-Loaded {extension} done.**")
# Empty discord_bot.log file
@commands.is_owner()
@bot.command()
async def clean(ctx):
open('discord_bot.log', 'w').close()
await ctx.author.send(f'> **Successfully emptied the file!**')
open("discord_bot.log", "w").close()
await ctx.author.send(f"> **Successfully emptied the file!**")
# Get discord_bot.log file
@commands.is_owner()
@bot.command()
async def getLog(ctx):
try:
with open('discord_bot.log', 'rb') as f:
with open("discord_bot.log", "rb") as f:
file = discord.File(f)
await ctx.author.send(file=file)
await ctx.author.send("> **Send successfully!**")
except:
await ctx.author.send("> **Send failed!**")
# Upload new Bing cookies and restart the bot
@commands.is_owner()
@bot.command()
@ -92,11 +102,11 @@ async def upload(ctx):
for attachment in ctx.message.attachments:
if str(attachment)[-4:] == ".txt":
content = await attachment.read()
with open("cookies.json", "w", encoding = "utf-8") as f:
json.dump(json.loads(content), f, indent = 2)
with open("cookies.json", "w", encoding="utf-8") as f:
json.dump(json.loads(content), f, indent=2)
if not isinstance(ctx.channel, discord.abc.PrivateChannel):
await ctx.message.delete()
await ctx.author.send(f'> **Upload new cookies successfully!**')
await ctx.author.send(f"> **Upload new cookies successfully!**")
logger.warning("\x1b[31mCookies has been setup successfully\x1b[0m")
restart_bot()
else:
@ -104,12 +114,7 @@ async def upload(ctx):
else:
await ctx.author.send("> **Didn't get any file.**")
if __name__ == '__main__':
if __name__ == "__main__":
check_verion()
bot.run(os.getenv("DISCORD_BOT_TOKEN"))

@ -19,6 +19,7 @@ users_chatbot = {}
users_image_generator = {}
user_conversation_style = {}
async def init_chatbot(user_id):
with open("./cookies.json", encoding="utf-8") as file:
cookie_json = json.load(file)
@ -26,7 +27,7 @@ async def init_chatbot(user_id):
if cookie.get("name") == "_U":
auth_cookie = cookie.get("value")
break
auth_cookie = os.environ.get("AUTH_COOKIE")
auth_cookie_SRCHHPGUSR = os.environ.get("AUTH_COOKIE_SRCHHPGUSR")
# auth_cookie_SRCHHPGUSR = os.environ.get("AUTH_COOKIE_SRCHHPGUSR")
@ -34,6 +35,7 @@ async def init_chatbot(user_id):
users_image_generator[user_id] = ImageGenAsync(auth_cookie, quiet=True)
user_conversation_style[user_id] = "balanced"
class UserChatbot:
def __init__(self, cookies):
self.chatbot = Chatbot(cookies=cookies)
@ -47,6 +49,7 @@ class UserChatbot:
async def reset(self):
await self.chatbot.reset()
class EdgeGPT(Cog_Extension):
# Chat with Bing
@app_commands.command(name="bing", description="Have a chat with Bing")
@ -57,7 +60,7 @@ class EdgeGPT(Cog_Extension):
await set_using_send(interaction.user.id, False)
using = await get_using_send(interaction.user.id)
if not using:
await interaction.response.defer(ephemeral=False, thinking=True)
await interaction.response.defer(ephemeral=False, thinking=True)
username = str(interaction.user)
usermessage = message
channel = str(interaction.channel)
@ -65,11 +68,17 @@ class EdgeGPT(Cog_Extension):
if user_id not in users_chatbot:
await init_chatbot(interaction.user.id)
conversation_style = user_conversation_style[user_id]
logger.info(f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}]")
await users_chatbot[user_id].send_message(interaction, usermessage, conversation_style)
logger.info(
f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}]"
)
await users_chatbot[user_id].send_message(
interaction, usermessage, conversation_style
)
else:
await interaction.response.defer(ephemeral=True, thinking=True)
await interaction.followup.send("> **Please wait for your last conversation to finish.**")
await interaction.followup.send(
"> **Please wait for your last conversation to finish.**"
)
# Reset Bing conversation
@app_commands.command(name="reset", description="Reset Bing conversation")
@ -81,25 +90,49 @@ class EdgeGPT(Cog_Extension):
await interaction.followup.send("> **Info: Reset finish.**")
logger.warning("\x1b[31mBing has been successfully reset\x1b[0m")
except:
await interaction.followup.send(f"> **You don't have any conversation yet.**")
await interaction.followup.send(
f"> **You don't have any conversation yet.**"
)
logger.exception("Bing reset failed.")
# Switch conversation style
@app_commands.command(name="switch_style", description="Switch conversation style")
@app_commands.choices(style=[app_commands.Choice(name="Creative", value="creative"), app_commands.Choice(name="Balanced", value="balanced"), app_commands.Choice(name="Precise", value="precise")])
async def switch_style(self, interaction: discord.Interaction, style: app_commands.Choice[str]):
@app_commands.choices(
style=[
app_commands.Choice(name="Creative", value="creative"),
app_commands.Choice(name="Balanced", value="balanced"),
app_commands.Choice(name="Precise", value="precise"),
]
)
async def switch_style(
self, interaction: discord.Interaction, style: app_commands.Choice[str]
):
await interaction.response.defer(ephemeral=True, thinking=True)
user_id = interaction.user.id
if user_id not in users_chatbot:
await init_chatbot(user_id)
user_conversation_style[user_id] = style.value
await interaction.followup.send(f"> **Info: successfull switch conversation style to {style.value}.**")
logger.warning(f"\x1b[31mConversation style has been successfully switch to {style.value}\x1b[0m")
await interaction.followup.send(
f"> **Info: successfull switch conversation style to {style.value}.**"
)
logger.warning(
f"\x1b[31mConversation style has been successfully switch to {style.value}\x1b[0m"
)
# Set and delete personal Bing Cookies
@app_commands.command(name="bing_cookies", description="Set or delete Bing Cookies")
@app_commands.choices(choice=[app_commands.Choice(name="set", value="set"), app_commands.Choice(name="delete", value="delete")])
async def cookies_setting(self, interaction: discord.Interaction, choice: app_commands.Choice[str], cookies_file: Optional[discord.Attachment]=None):
@app_commands.choices(
choice=[
app_commands.Choice(name="set", value="set"),
app_commands.Choice(name="delete", value="delete"),
]
)
async def cookies_setting(
self,
interaction: discord.Interaction,
choice: app_commands.Choice[str],
cookies_file: Optional[discord.Attachment] = None,
):
await interaction.response.defer(ephemeral=True, thinking=True)
user_id = interaction.user.id
if choice.value == "set":
@ -111,11 +144,15 @@ class EdgeGPT(Cog_Extension):
break
users_image_generator[user_id] = ImageGenAsync(auth_cookie, quiet=True)
users_chatbot[user_id] = UserChatbot(cookies=content)
user_conversation_style[user_id] = "balanced"
user_conversation_style[user_id] = "balanced"
await interaction.followup.send("> **Upload successful!**")
logger.warning(f"\x1b[31m{interaction.user} set Bing Cookies successful\x1b[0m")
logger.warning(
f"\x1b[31m{interaction.user} set Bing Cookies successful\x1b[0m"
)
except:
await interaction.followup.send("> **Please upload your Bing Cookies.**")
await interaction.followup.send(
"> **Please upload your Bing Cookies.**"
)
else:
try:
del users_chatbot[user_id]
@ -124,10 +161,14 @@ class EdgeGPT(Cog_Extension):
await interaction.followup.send("> **Delete finish.**")
logger.warning(f"\x1b[31m{interaction.user} delete Cookies\x1b[0m")
except:
await interaction.followup.send("> **You don't have any Bing Cookies.**")
await interaction.followup.send(
"> **You don't have any Bing Cookies.**"
)
# Create images
@app_commands.command(name="create_image", description="generate image by Bing image creator")
@app_commands.command(
name="create_image", description="generate image by Bing image creator"
)
async def create_image(self, interaction: discord.Interaction, *, prompt: str):
user_id = interaction.user.id
if interaction.user.id not in users_chatbot:
@ -137,12 +178,19 @@ class EdgeGPT(Cog_Extension):
except:
await set_using_create(user_id, False)
using = await get_using_create(user_id)
if not using:
logger.info(f"\x1b[31m{interaction.user}\x1b[0m : '{prompt}' ({interaction.channel}) [BingImageCreator]")
await users_chatbot[user_id].create_image(interaction, prompt, users_image_generator[user_id] )
if not using:
logger.info(
f"\x1b[31m{interaction.user}\x1b[0m : '{prompt}' ({interaction.channel}) [BingImageCreator]"
)
await users_chatbot[user_id].create_image(
interaction, prompt, users_image_generator[user_id]
)
else:
await interaction.response.defer(ephemeral=True, thinking=True)
await interaction.followup.send("> **Please wait for your last image to create finish.**")
await interaction.followup.send(
"> **Please wait for your last image to create finish.**"
)
async def setup(bot):
await bot.add_cog(EdgeGPT(bot))

@ -25,64 +25,101 @@ with open("./cookies.json", encoding="utf-8") as file:
cookies = json.load(file)
chatbot = Chatbot(cookies=cookies)
# To add suggest responses
class MyView(discord.ui.View):
def __init__(self, chatbot: Chatbot, suggest_responses:list):
def __init__(self, chatbot: Chatbot, suggest_responses: list):
super().__init__(timeout=120)
# Add buttons
for label in suggest_responses:
button = discord.ui.Button(label=label)
# Button event
async def callback(interaction: discord.Interaction, button: discord.ui.Button):
await interaction.response.defer(ephemeral=False, thinking=True)
# When click the button, all buttons will disable.
for child in self.children:
child.disabled = True
await interaction.followup.edit_message(message_id=interaction.message.id, view=self)
username = str(interaction.user)
usermessage = button.label
channel = str(interaction.channel)
logger.info(f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}] [button]")
task = asyncio.create_task(send_message(chatbot, interaction, usermessage))
await asyncio.gather(task)
async def callback(
interaction: discord.Interaction, button: discord.ui.Button
):
await interaction.response.defer(ephemeral=False, thinking=True)
# When click the button, all buttons will disable.
for child in self.children:
child.disabled = True
await interaction.followup.edit_message(
message_id=interaction.message.id, view=self
)
username = str(interaction.user)
usermessage = button.label
channel = str(interaction.channel)
logger.info(
f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}] [button]"
)
task = asyncio.create_task(
send_message(chatbot, interaction, usermessage)
)
await asyncio.gather(task)
self.add_item(button)
self.children[-1].callback = partial(callback, button=button)
# Show Dropdown
class DropdownView(discord.ui.View):
def __init__(self):
super().__init__(timeout=180)
options = [
discord.SelectOption(label="Creative", description="Switch conversation style to Creative", emoji='🎨'),
discord.SelectOption(label="Balanced", description="Switch conversation style to Balanced", emoji='⚖️'),
discord.SelectOption(label="Precise", description="Switch conversation style to Precise", emoji='🔎'),
discord.SelectOption(label="Reset", description="Reset conversation", emoji="🔄")
discord.SelectOption(
label="Creative",
description="Switch conversation style to Creative",
emoji="🎨",
),
discord.SelectOption(
label="Balanced",
description="Switch conversation style to Balanced",
emoji="⚖️",
),
discord.SelectOption(
label="Precise",
description="Switch conversation style to Precise",
emoji="🔎",
),
discord.SelectOption(
label="Reset", description="Reset conversation", emoji="🔄"
),
]
dropdown = discord.ui.Select(
placeholder="Choose setting",
min_values=1,
max_values=1,
options=options
placeholder="Choose setting", min_values=1, max_values=1, options=options
)
dropdown.callback = self.dropdown_callback
self.add_item(dropdown)
# Dropdown event
async def dropdown_callback(self, interaction: discord.Interaction):
await interaction.response.defer(ephemeral=False, thinking=True)
if interaction.data['values'][0] == "Creative":
if interaction.data["values"][0] == "Creative":
await set_conversation_style("creative")
await interaction.followup.send(f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**")
logger.warning(f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m")
elif interaction.data['values'][0] == "Balanced":
await interaction.followup.send(
f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**"
)
logger.warning(
f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m"
)
elif interaction.data["values"][0] == "Balanced":
await set_conversation_style("balanced")
await interaction.followup.send(f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**")
logger.warning(f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m")
elif interaction.data['values'][0] == "Precise":
await interaction.followup.send(
f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**"
)
logger.warning(
f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m"
)
elif interaction.data["values"][0] == "Precise":
await set_conversation_style("precise")
await interaction.followup.send(f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**")
logger.warning(f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m")
await interaction.followup.send(
f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**"
)
logger.warning(
f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m"
)
else:
await chatbot.reset()
await interaction.followup.send(f"> **Info: Reset finish.**")
@ -90,46 +127,66 @@ class DropdownView(discord.ui.View):
# disable dropdown after select
for dropdown in self.children:
dropdown.disabled = True
await interaction.followup.edit_message(message_id=interaction.message.id, view=self)
await interaction.followup.edit_message(
message_id=interaction.message.id, view=self
)
# Set conversation style
async def set_conversation_style(style: str):
global conversation_style
conversation_style = style
async def set_chatbot(cookies):
global chatbot
chatbot = Chatbot(cookies=cookies)
async def send_message(chatbot: Chatbot, message, user_message: str):
async with sem:
if isinstance(message, discord.message.Message):
await message.channel.typing()
reply = ''
text = ''
link_embed = ''
reply = ""
text = ""
link_embed = ""
images_embed = []
all_url = []
try:
# Change conversation style
# Change conversation style
if conversation_style == "creative":
reply = await chatbot.ask(prompt=user_message, conversation_style=ConversationStyle.creative, simplify_response=True)
reply = await chatbot.ask(
prompt=user_message,
conversation_style=ConversationStyle.creative,
simplify_response=True,
)
elif conversation_style == "precise":
reply = await chatbot.ask(prompt=user_message, conversation_style=ConversationStyle.precise, simplify_response=True)
reply = await chatbot.ask(
prompt=user_message,
conversation_style=ConversationStyle.precise,
simplify_response=True,
)
else:
reply = await chatbot.ask(prompt=user_message, conversation_style=ConversationStyle.balanced, simplify_response=True)
reply = await chatbot.ask(
prompt=user_message,
conversation_style=ConversationStyle.balanced,
simplify_response=True,
)
# Get reply text
text = f"{reply['text']}"
text = re.sub(r'\[\^(\d+)\^\]', lambda match: '', text)
text = re.sub(r"\[\^(\d+)\^\]", lambda match: "", text)
# Get the URL, if available
try:
if len(reply['sources']) != 0:
for i, url in enumerate(reply['sources'], start=1):
if len(url['providerDisplayName']) == 0:
if len(reply["sources"]) != 0:
for i, url in enumerate(reply["sources"], start=1):
if len(url["providerDisplayName"]) == 0:
all_url.append(f"{i}. {url['seeMoreUrl']}")
else:
all_url.append(f"{i}. [{url['providerDisplayName']}]({url['seeMoreUrl']})")
all_url.append(
f"{i}. [{url['providerDisplayName']}]({url['seeMoreUrl']})"
)
link_text = "\n".join(all_url)
link_embed = discord.Embed(description=link_text)
except:
@ -147,45 +204,80 @@ async def send_message(chatbot: Chatbot, message, user_message: str):
while len(response) > 2000:
temp = response[:2000]
response = response[2000:]
if isinstance(message, discord.interactions.Interaction):
if isinstance(message, discord.interactions.Interaction):
await message.followup.send(temp)
else:
else:
await message.channel.send(temp)
# Get the image, if available
try:
if len(link_embed) == 0:
all_image = re.findall("https?://[\w\./]+", str(reply["sources_text"]))
[images_embed.append(discord.Embed(url="https://www.bing.com/").set_image(url=image_link)) for image_link in all_image]
all_image = re.findall(
"https?://[\w\./]+", str(reply["sources_text"])
)
[
images_embed.append(
discord.Embed(url="https://www.bing.com/").set_image(
url=image_link
)
)
for image_link in all_image
]
except:
pass
if USE_SUGGEST_RESPONSES:
suggest_responses = reply["suggestions"]
if images_embed:
if isinstance(message, discord.interactions.Interaction):
await message.followup.send(response, view=MyView(chatbot, suggest_responses), embeds=images_embed, wait=True)
await message.followup.send(
response,
view=MyView(chatbot, suggest_responses),
embeds=images_embed,
wait=True,
)
else:
await message.channel.send(response, view=MyView(chatbot, suggest_responses), embeds=images_embed)
await message.channel.send(
response,
view=MyView(chatbot, suggest_responses),
embeds=images_embed,
)
elif link_embed:
if isinstance(message, discord.interactions.Interaction):
await message.followup.send(response, view=MyView(chatbot, suggest_responses), embed=link_embed, wait=True)
await message.followup.send(
response,
view=MyView(chatbot, suggest_responses),
embed=link_embed,
wait=True,
)
else:
await message.channel.send(response, view=MyView(chatbot, suggest_responses), embed=link_embed)
await message.channel.send(
response,
view=MyView(chatbot, suggest_responses),
embed=link_embed,
)
else:
if isinstance(message, discord.interactions.Interaction):
await message.followup.send(response, view=MyView(chatbot, suggest_responses), wait=True)
await message.followup.send(
response, view=MyView(chatbot, suggest_responses), wait=True
)
else:
await message.channel.send(response, view=MyView(chatbot, suggest_responses))
await message.channel.send(
response, view=MyView(chatbot, suggest_responses)
)
else:
if images_embed:
if isinstance(message, discord.interactions.Interaction):
await message.followup.send(response, embeds=images_embed, wait=True)
await message.followup.send(
response, embeds=images_embed, wait=True
)
else:
await message.channel.send(response, embeds=images_embed)
elif link_embed:
if isinstance(message, discord.interactions.Interaction):
await message.followup.send(response, embed=link_embed, wait=True)
await message.followup.send(
response, embed=link_embed, wait=True
)
else:
await message.channel.send(response, embed=link_embed)
else:
@ -200,6 +292,7 @@ async def send_message(chatbot: Chatbot, message, user_message: str):
await message.channel.send(f">>> **Error: {e}**")
logger.exception(f"Error while sending message: {e}")
class Event(Cog_Extension):
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
@ -207,17 +300,22 @@ class Event(Cog_Extension):
return
if self.bot.user in message.mentions:
if not MENTION_CHANNEL_ID or message.channel.id == MENTION_CHANNEL_ID:
content = re.sub(r'<@.*?>', '', message.content).strip()
content = re.sub(r"<@.*?>", "", message.content).strip()
if len(content) > 0:
username = str(message.author)
channel = str(message.channel)
logger.info(f"\x1b[31m{username}\x1b[0m : '{content}' ({channel}) [Style: {conversation_style}]")
logger.info(
f"\x1b[31m{username}\x1b[0m : '{content}' ({channel}) [Style: {conversation_style}]"
)
task = asyncio.create_task(send_message(chatbot, message, content))
await asyncio.gather(task)
else:
await message.channel.send(view=DropdownView())
elif MENTION_CHANNEL_ID is not None:
await message.channel.send(f"> **Can only be mentioned at <#{self.bot.get_channel(MENTION_CHANNEL_ID).id}>**")
await message.channel.send(
f"> **Can only be mentioned at <#{self.bot.get_channel(MENTION_CHANNEL_ID).id}>**"
)
async def setup(bot):
await bot.add_cog(Event(bot))

@ -2,16 +2,35 @@ import discord
from core.classes import Cog_Extension
from discord import app_commands
class Help(Cog_Extension):
@app_commands.command(name = "help", description = "Show how to use")
@app_commands.command(name="help", description="Show how to use")
async def help(self, interaction: discord.Interaction):
embed=discord.Embed(title="Help", description="[see more](https://github.com/FuseFairy/DiscordBot-EdgeGPT/blob/main/README.md)\n\n**COMMANDS -**")
embed.add_field(name="/bing_cookies", value="Set and delete your Bing Cookies.", inline=False)
embed = discord.Embed(
title="Help",
description="[see more](https://github.com/FuseFairy/DiscordBot-EdgeGPT/blob/main/README.md)\n\n**COMMANDS -**",
)
embed.add_field(
name="/bing_cookies",
value="Set and delete your Bing Cookies.",
inline=False,
)
embed.add_field(name="/bing", value="Chat with Bing.", inline=False)
embed.add_field(name="/reset", value="Reset your Bing conversation.", inline=False)
embed.add_field(name="/switch_style", value="Switch your Bing conversation style.", inline=False)
embed.add_field(name="/create_image", value="Generate image by Bing Image Creator.", inline=False)
embed.add_field(
name="/reset", value="Reset your Bing conversation.", inline=False
)
embed.add_field(
name="/switch_style",
value="Switch your Bing conversation style.",
inline=False,
)
embed.add_field(
name="/create_image",
value="Generate image by Bing Image Creator.",
inline=False,
)
await interaction.response.send_message(embed=embed)
async def setup(bot):
await bot.add_cog(Help(bot))
await bot.add_cog(Help(bot))

@ -1,7 +1,7 @@
version: '3'
services:
discord_edgegpt:
BingBot:
build: .
environment:
- DISCORD_BOT_TOKEN=${DISCORD_BOT_TOKEN}

@ -1,5 +1,6 @@
from discord.ext import commands
class Cog_Extension(commands.Cog):
def __init__(self, bot):
self.bot = bot

@ -5,10 +5,14 @@ from src import log
logger = log.setup_logger(__name__)
using_func = {}
async def get_using_create(user_id):
return using_func[user_id]
async def set_using_create(user_id, status: bool):
using_func[user_id] = status
using_func[user_id] = status
async def create_image(interaction: discord.Interaction, prompt: str, image_generator):
await interaction.response.defer(ephemeral=False, thinking=True)
@ -16,10 +20,15 @@ async def create_image(interaction: discord.Interaction, prompt: str, image_gene
try:
embeds = []
prompts = f"> **{prompt}** - <@{str(interaction.user.id)}> (***BingImageCreator***)\n\n"
# Fetches image links
# Fetches image links
images = await image_generator.get_images(prompt)
# Add embed to list of embeds
[embeds.append(discord.Embed(url="https://www.bing.com/").set_image(url=image_link)) for image_link in images]
[
embeds.append(
discord.Embed(url="https://www.bing.com/").set_image(url=image_link)
)
for image_link in images
]
await interaction.followup.send(prompts, embeds=embeds, wait=True)
except asyncio.TimeoutError:
await interaction.followup.send("> **Error: Request timed out.**")
@ -28,4 +37,4 @@ async def create_image(interaction: discord.Interaction, prompt: str, image_gene
await interaction.followup.send(f"> **Error: {e}**")
logger.exception(f"Error while create image: {e}")
finally:
using_func[interaction.user.id] = False
using_func[interaction.user.id] = False

@ -4,18 +4,17 @@ import logging.handlers
class CustomFormatter(logging.Formatter):
LEVEL_COLORS = [
(logging.DEBUG, '\x1b[40;1m'),
(logging.INFO, '\x1b[34;1m'),
(logging.WARNING, '\x1b[33;1m'),
(logging.ERROR, '\x1b[31m'),
(logging.CRITICAL, '\x1b[41m'),
(logging.DEBUG, "\x1b[40;1m"),
(logging.INFO, "\x1b[34;1m"),
(logging.WARNING, "\x1b[33;1m"),
(logging.ERROR, "\x1b[31m"),
(logging.CRITICAL, "\x1b[41m"),
]
FORMATS = {
level: logging.Formatter(
f'\x1b[30;1m%(asctime)s\x1b[0m {color}%(levelname)-8s\x1b[0m \x1b[35m%(name)s\x1b[0m -> %(message)s',
'%Y-%m-%d %H:%M:%S'
f"\x1b[30;1m%(asctime)s\x1b[0m {color}%(levelname)-8s\x1b[0m \x1b[35m%(name)s\x1b[0m -> %(message)s",
"%Y-%m-%d %H:%M:%S",
)
for level, color in LEVEL_COLORS
}
@ -28,7 +27,7 @@ class CustomFormatter(logging.Formatter):
# Override the traceback to always print in red
if record.exc_info:
text = formatter.formatException(record.exc_info)
record.exc_text = f'\x1b[31m{text}\x1b[0m'
record.exc_text = f"\x1b[31m{text}\x1b[0m"
output = formatter.format(record)
@ -37,9 +36,9 @@ class CustomFormatter(logging.Formatter):
return output
def setup_logger(module_name:str) -> logging.Logger:
def setup_logger(module_name: str) -> logging.Logger:
# create logger
library, _, _ = module_name.partition('.py')
library, _, _ = module_name.partition(".py")
logger = logging.getLogger(library)
logger.setLevel(logging.INFO)
@ -50,12 +49,12 @@ def setup_logger(module_name:str) -> logging.Logger:
console_handler.setFormatter(CustomFormatter())
# specify that the log file path is the same as `main.py` file path
grandparent_dir = os.path.abspath(__file__ + "/../../")
log_name='discord_bot.log'
log_name = "discord_bot.log"
log_path = os.path.join(grandparent_dir, log_name)
# create local log handler
log_handler = logging.handlers.RotatingFileHandler(
filename=log_path,
encoding='utf-8',
encoding="utf-8",
maxBytes=32 * 1024 * 1024, # 32 MiB
backupCount=2, # Rotate through 5 files
)

@ -8,79 +8,127 @@ USE_SUGGEST_RESPONSES = True
logger = log.setup_logger(__name__)
using_func = {}
# To add suggest responses
class MyView(discord.ui.View):
def __init__(self, interaction: discord.Interaction, chatbot: Chatbot, conversation_style:str, suggest_responses:list):
def __init__(
self,
interaction: discord.Interaction,
chatbot: Chatbot,
conversation_style: str,
suggest_responses: list,
):
super().__init__(timeout=120)
self.button_author =interaction.user.id
self.button_author = interaction.user.id
# Add buttons
for label in suggest_responses:
button = discord.ui.Button(label=label)
# Button event
async def callback(interaction: discord.Interaction, button_author: int, button: discord.ui.Button):
async def callback(
interaction: discord.Interaction,
button_author: int,
button: discord.ui.Button,
):
if interaction.user.id != button_author:
await interaction.response.defer(ephemeral=True, thinking=True)
await interaction.followup.send("You don't have permission to press this button.")
await interaction.followup.send(
"You don't have permission to press this button."
)
elif not using_func[interaction.user.id]:
await interaction.response.defer(ephemeral=False, thinking=True)
# When click the button, all buttons will disable.
for child in self.children:
child.disabled = True
await interaction.followup.edit_message(message_id=interaction.message.id, view=self)
await interaction.followup.edit_message(
message_id=interaction.message.id, view=self
)
username = str(interaction.user)
usermessage = button.label
channel = str(interaction.channel)
logger.info(f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}] [button]")
await send_message(chatbot, interaction, usermessage, conversation_style)
logger.info(
f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}] [button]"
)
await send_message(
chatbot, interaction, usermessage, conversation_style
)
else:
await interaction.response.defer(ephemeral=True, thinking=True)
await interaction.followup.send("Please wait for your last conversation to finish.")
await interaction.followup.send(
"Please wait for your last conversation to finish."
)
self.add_item(button)
self.children[-1].callback = partial(callback, button_author=self.button_author, button=button)
self.children[-1].callback = partial(
callback, button_author=self.button_author, button=button
)
async def get_using_send(user_id):
return using_func[user_id]
async def set_using_send(user_id, status: bool):
using_func[user_id] = status
async def send_message(chatbot: Chatbot, interaction: discord.Interaction, user_message: str, conversation_style: str):
async def send_message(
chatbot: Chatbot,
interaction: discord.Interaction,
user_message: str,
conversation_style: str,
):
using_func[interaction.user.id] = True
reply = ''
text = ''
link_embed = ''
reply = ""
text = ""
link_embed = ""
images_embed = []
all_url = []
try:
# Change conversation style
if conversation_style == "creative":
reply = await chatbot.ask(prompt=user_message, conversation_style=ConversationStyle.creative, simplify_response=True)
reply = await chatbot.ask(
prompt=user_message,
conversation_style=ConversationStyle.creative,
simplify_response=True,
)
elif conversation_style == "precise":
reply = await chatbot.ask(prompt=user_message, conversation_style=ConversationStyle.precise, simplify_response=True)
reply = await chatbot.ask(
prompt=user_message,
conversation_style=ConversationStyle.precise,
simplify_response=True,
)
else:
reply = await chatbot.ask(prompt=user_message, conversation_style=ConversationStyle.balanced, simplify_response=True)
reply = await chatbot.ask(
prompt=user_message,
conversation_style=ConversationStyle.balanced,
simplify_response=True,
)
# Get reply text
text = f"{reply['text']}"
text = re.sub(r'\[\^(\d+)\^\]', lambda match: '', text)
text = re.sub(r"\[\^(\d+)\^\]", lambda match: "", text)
# Get the URL, if available
try:
if len(reply['sources']) != 0:
for i, url in enumerate(reply['sources'], start=1):
if len(url['providerDisplayName']) == 0:
if len(reply["sources"]) != 0:
for i, url in enumerate(reply["sources"], start=1):
if len(url["providerDisplayName"]) == 0:
all_url.append(f"{i}. {url['seeMoreUrl']}")
else:
all_url.append(f"{i}. [{url['providerDisplayName']}]({url['seeMoreUrl']})")
all_url.append(
f"{i}. [{url['providerDisplayName']}]({url['seeMoreUrl']})"
)
link_text = "\n".join(all_url)
link_embed = discord.Embed(description=link_text)
except:
pass
# Set the final message
user_message = user_message.replace("\n", "")
ask = f"> **{user_message}** - <@{str(interaction.user.id)}> (***style: {conversation_style}***)\n\n"
response = f"{ask}{text}"
# Discord limit about 2000 characters for a message
while len(response) > 2000:
temp = response[:2000]
@ -91,27 +139,56 @@ async def send_message(chatbot: Chatbot, interaction: discord.Interaction, user_
try:
if len(link_embed) == 0:
all_image = re.findall("https?://[\w\./]+", str(reply["sources_text"]))
[images_embed.append(discord.Embed(url="https://www.bing.com/").set_image(url=image_link)) for image_link in all_image]
[
images_embed.append(
discord.Embed(url="https://www.bing.com/").set_image(
url=image_link
)
)
for image_link in all_image
]
except:
pass
# Add all suggest responses in list
if USE_SUGGEST_RESPONSES:
suggest_responses = reply["suggestions"]
if images_embed:
await interaction.followup.send(response, view=MyView(interaction, chatbot, conversation_style, suggest_responses), embeds=images_embed, wait=True)
await interaction.followup.send(
response,
view=MyView(
interaction, chatbot, conversation_style, suggest_responses
),
embeds=images_embed,
wait=True,
)
elif link_embed:
await interaction.followup.send(response, view=MyView(interaction, chatbot, conversation_style, suggest_responses), embed=link_embed, wait=True)
await interaction.followup.send(
response,
view=MyView(
interaction, chatbot, conversation_style, suggest_responses
),
embed=link_embed,
wait=True,
)
else:
await interaction.followup.send(response, view=MyView(interaction, chatbot, conversation_style, suggest_responses), wait=True)
await interaction.followup.send(
response,
view=MyView(
interaction, chatbot, conversation_style, suggest_responses
),
wait=True,
)
else:
if images_embed:
await interaction.followup.send(response, embeds=images_embed, wait=True)
await interaction.followup.send(
response, embeds=images_embed, wait=True
)
elif link_embed:
await interaction.followup.send(response, embed=link_embed, wait=True)
else:
await interaction.followup.send(response, wait=True)
except Exception as e:
await interaction.followup.send(f">>> **Error: {e}**")
logger.exception(f"Error while sending message: {e}")
await interaction.followup.send(f">>> **Error: {e}**")
logger.exception(f"Error while sending message: {e}")
finally:
using_func[interaction.user.id] = False

@ -6,13 +6,15 @@ import gradio as gr
from BingImageCreator import ImageGen
from swarms.models.bing_chat import BingChat
# from swarms.models.bingchat import BingChat
# from swarms.models.bingchat import BingChat
dotenv.load_dotenv(".env")
# Initialize the EdgeGPTModel
cookie = os.environ.get("BING_COOKIE")
auth = os.environ.get("AUTH_COOKIE")
model = BingChat(cookies_path="./cookies.json", bing_cookie="BING_COOKIE",auth_cookie="AUTH_COOKIE")
model = BingChat(
cookies_path="./cookies.json", bing_cookie="BING_COOKIE", auth_cookie="AUTH_COOKIE"
)
response = model("Generate")
@ -23,34 +25,39 @@ latest_caption = ""
standard_suffix = ""
storyboard = []
def generate_images_with_bingchat(caption):
img_path = model.create_img(caption)
img_urls = model.images(caption)
return img_urls
def generate_single_caption(text):
prompt = f"A comic about {text}."
response = model(text)
return response
def interpret_text_with_gpt(text, suffix):
return generate_single_caption(f"{text} {suffix}")
def create_standard_suffix(original_prompt):
return f"In the style of {original_prompt}"
def gradio_interface(text=None, next_button_clicked=False):
global accumulated_story, latest_caption, standard_suffix, storyboard
if not standard_suffix:
standard_suffix = create_standard_suffix(text)
if next_button_clicked:
new_caption = generate_single_caption(latest_caption + " " + standard_suffix)
new_urls = generate_images_with_bingchat(new_caption)
latest_caption = new_caption
storyboard.append((new_urls, new_caption))
elif text:
caption = generate_single_caption(text + " " + standard_suffix)
comic_panel_urls = generate_images_with_bingchat(caption)
@ -60,18 +67,25 @@ def gradio_interface(text=None, next_button_clicked=False):
storyboard_html = ""
for urls, cap in storyboard:
for url in urls:
storyboard_html += f'<img src="{url}" alt="{cap}" width="300"/><br>{cap}<br>'
storyboard_html += (
f'<img src="{url}" alt="{cap}" width="300"/><br>{cap}<br>'
)
return storyboard_html
if __name__ == "__main__":
iface = gr.Interface(
fn=gradio_interface,
inputs=[
gr.inputs.Textbox(default="Type your story concept here", optional=True, label="Story Concept"),
gr.inputs.Checkbox(label="Generate Next Part")
gr.inputs.Textbox(
default="Type your story concept here",
optional=True,
label="Story Concept",
),
gr.inputs.Checkbox(label="Generate Next Part"),
],
outputs=[gr.outputs.HTML()],
live=False # Submit button will appear
live=False, # Submit button will appear
)
iface.launch()

@ -5,6 +5,7 @@ import os
from dotenv import load_dotenv
from invoke import Executor
class BotCommands(commands.Cog):
def __init__(self, bot):
self.bot = bot
@ -46,7 +47,7 @@ class BotCommands(commands.Cog):
"""starts listening to voice in the voice channel that the bot is in."""
if ctx.voice_client:
# create a wavesink to record the audio
sink = discord.sinks.wavesink('audio.wav')
sink = discord.sinks.wavesink("audio.wav")
# start recording
ctx.voice_client.start_recording(sink)
await ctx.send("started listening and recording.")
@ -68,7 +69,11 @@ class BotCommands(commands.Cog):
print("done generating images!")
# list all files in the save_directory
all_files = [os.path.join(root, file) for root, _, files in os.walk(os.environ("SAVE_DIRECTORY")) for file in files]
all_files = [
os.path.join(root, file)
for root, _, files in os.walk(os.environ("SAVE_DIRECTORY"))
for file in files
]
# sort files by their creation time (latest first)
sorted_files = sorted(all_files, key=os.path.getctime, reverse=True)
@ -82,7 +87,9 @@ class BotCommands(commands.Cog):
# await ctx.send(files=[storage_service.upload(filepath) for filepath in latest_files])
except asyncio.timeouterror:
await ctx.send("the request took too long! it might have been censored or you're out of boosts. please try entering the prompt again.")
await ctx.send(
"the request took too long! it might have been censored or you're out of boosts. please try entering the prompt again."
)
except Exception as e:
await ctx.send(f"an error occurred: {e}")
@ -107,10 +114,11 @@ class BotCommands(commands.Cog):
else:
await ctx.send(f"an error occurred: {error}")
class Bot:
def __init__(self, llm, command_prefix="!"):
load_dotenv()
intents = discord.Intents.default()
intents.messages = True
intents.guilds = True

@ -1,19 +0,0 @@
import os
from swarms.models.bing_chat import BingChat
from dotenv import load_dotenv
load_dotenv()
# Initialize the EdgeGPTModel
edgegpt = BingChat(cookies_path="./cookies.json")
cookie = os.environ.get("BING_COOKIE")
auth = os.environ.get("AUTH_COOKIE")
# Use the worker to process a task
# task = "hi"
img_task = "Sunset over mountains"
# response = edgegpt(task)
response = edgegpt.create_img(auth_cookie=cookie,auth_cookie_SRCHHPGUSR=auth,prompt=img_task)
print(response)
Loading…
Cancel
Save