commit
29b70f28e0
@ -0,0 +1,4 @@
|
||||
DISCORD_BOT_TOKEN=
|
||||
MENTION_CHANNEL_ID=
|
||||
AUTH_COOKIE=
|
||||
AUTH_COOKIE_SRCHHPGUSR=
|
@ -0,0 +1,6 @@
|
||||
FROM python:3.9.16
|
||||
WORKDIR /bot
|
||||
COPY requirements.txt /bot/
|
||||
RUN pip install -r requirements.txt
|
||||
COPY . /bot
|
||||
CMD python bot.py
|
@ -0,0 +1,120 @@
|
||||
import discord
|
||||
import os
|
||||
import src.log
|
||||
import sys
|
||||
import pkg_resources
|
||||
import json
|
||||
from discord.ext import commands
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
bot = commands.Bot(command_prefix="!", intents=discord.Intents.all())
|
||||
|
||||
# init loggger
|
||||
logger = src.log.setup_logger(__name__)
|
||||
|
||||
|
||||
def restart_bot():
|
||||
# Replace current process with new instance of bot.py
|
||||
os.execl(sys.executable, sys.executable, "bot.py")
|
||||
|
||||
|
||||
def check_verion() -> None:
|
||||
# Read the requirements.txt file and add each line to a list
|
||||
with open("requirements.txt") as f:
|
||||
required = f.read().splitlines()
|
||||
|
||||
# For each library listed in requirements.txt, check if the corresponding version is installed
|
||||
for package in required:
|
||||
# Use the pkg_resources library to get information about the installed version of the library
|
||||
package_name, package_verion = package.split("==")
|
||||
installed = pkg_resources.get_distribution(package_name)
|
||||
# Extract the library name and version number
|
||||
name, version = installed.project_name, installed.version
|
||||
# Compare the version number to see if it matches the one in requirements.txt
|
||||
if package != f"{name}=={version}":
|
||||
logger.error(
|
||||
f"{name} version {version} is installed but does not match the requirements"
|
||||
)
|
||||
sys.exit()
|
||||
|
||||
|
||||
@bot.event
|
||||
async def on_ready():
|
||||
bot_status = discord.Status.online
|
||||
bot_activity = discord.Activity(type=discord.ActivityType.playing, name="bing.com")
|
||||
await bot.change_presence(status=bot_status, activity=bot_activity)
|
||||
for Filename in os.listdir("./cogs"):
|
||||
if Filename.endswith(".py"):
|
||||
await bot.load_extension(f"cogs.{Filename[:-3]}")
|
||||
logger.info(f"{bot.user} is now running!")
|
||||
print("Bot is Up and Ready!")
|
||||
try:
|
||||
synced = await bot.tree.sync()
|
||||
print(f"Synced {len(synced)} commands")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
|
||||
# Load command
|
||||
@commands.is_owner()
|
||||
@bot.command()
|
||||
async def load(ctx, extension):
|
||||
await bot.load_extension(f"cogs.{extension}")
|
||||
await ctx.author.send(f"> **Loaded {extension} done.**")
|
||||
|
||||
|
||||
# Unload command
|
||||
@commands.is_owner()
|
||||
@bot.command()
|
||||
async def unload(ctx, extension):
|
||||
await bot.unload_extension(f"cogs.{extension}")
|
||||
await ctx.author.send(f"> **Un-Loaded {extension} done.**")
|
||||
|
||||
|
||||
# Empty discord_bot.log file
|
||||
@commands.is_owner()
|
||||
@bot.command()
|
||||
async def clean(ctx):
|
||||
open("discord_bot.log", "w").close()
|
||||
await ctx.author.send(f"> **Successfully emptied the file!**")
|
||||
|
||||
|
||||
# Get discord_bot.log file
|
||||
@commands.is_owner()
|
||||
@bot.command()
|
||||
async def getLog(ctx):
|
||||
try:
|
||||
with open("discord_bot.log", "rb") as f:
|
||||
file = discord.File(f)
|
||||
await ctx.author.send(file=file)
|
||||
await ctx.author.send("> **Send successfully!**")
|
||||
except:
|
||||
await ctx.author.send("> **Send failed!**")
|
||||
|
||||
|
||||
# Upload new Bing cookies and restart the bot
|
||||
@commands.is_owner()
|
||||
@bot.command()
|
||||
async def upload(ctx):
|
||||
if ctx.message.attachments:
|
||||
for attachment in ctx.message.attachments:
|
||||
if str(attachment)[-4:] == ".txt":
|
||||
content = await attachment.read()
|
||||
with open("cookies.json", "w", encoding="utf-8") as f:
|
||||
json.dump(json.loads(content), f, indent=2)
|
||||
if not isinstance(ctx.channel, discord.abc.PrivateChannel):
|
||||
await ctx.message.delete()
|
||||
await ctx.author.send(f"> **Upload new cookies successfully!**")
|
||||
logger.warning("\x1b[31mCookies has been setup successfully\x1b[0m")
|
||||
restart_bot()
|
||||
else:
|
||||
await ctx.author.send("> **Didn't get any txt file.**")
|
||||
else:
|
||||
await ctx.author.send("> **Didn't get any file.**")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
check_verion()
|
||||
bot.run(os.getenv("DISCORD_BOT_TOKEN"))
|
@ -0,0 +1,148 @@
|
||||
import os
|
||||
import discord
|
||||
import json
|
||||
from typing import Optional
|
||||
from EdgeGPT.ImageGen import ImageGenAsync, ImageGen
|
||||
from EdgeGPT.EdgeGPT import Chatbot
|
||||
from discord import app_commands
|
||||
from core.classes import Cog_Extension
|
||||
from src import log
|
||||
from src.imageCreate import create_image, get_using_create, set_using_create
|
||||
from src.response import send_message, get_using_send, set_using_send
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
logger = log.setup_logger(__name__)
|
||||
|
||||
users_chatbot = {}
|
||||
users_image_generator = {}
|
||||
user_conversation_style = {}
|
||||
|
||||
async def init_chatbot(user_id):
|
||||
with open("./cookies.json", encoding="utf-8") as file:
|
||||
cookie_json = json.load(file)
|
||||
for cookie in cookie_json:
|
||||
if cookie.get("name") == "_U":
|
||||
auth_cookie = cookie.get("value")
|
||||
break
|
||||
|
||||
auth_cookie = os.environ.get("AUTH_COOKIE")
|
||||
auth_cookie_SRCHHPGUSR = os.environ.get("AUTH_COOKIE_SRCHHPGUSR")
|
||||
# auth_cookie_SRCHHPGUSR = os.environ.get("AUTH_COOKIE_SRCHHPGUSR")
|
||||
users_chatbot[user_id] = UserChatbot(cookies=cookie_json)
|
||||
users_image_generator[user_id] = ImageGenAsync(auth_cookie, quiet=True)
|
||||
user_conversation_style[user_id] = "balanced"
|
||||
|
||||
class UserChatbot:
|
||||
def __init__(self, cookies):
|
||||
self.chatbot = Chatbot(cookies=cookies)
|
||||
|
||||
async def send_message(self, interaction, message, conversation_style):
|
||||
await send_message(self.chatbot, interaction, message, conversation_style)
|
||||
|
||||
async def create_image(self, interaction, prompt: str, image_generator):
|
||||
await create_image(interaction, prompt, image_generator)
|
||||
|
||||
async def reset(self):
|
||||
await self.chatbot.reset()
|
||||
|
||||
class EdgeGPT(Cog_Extension):
|
||||
# Chat with Bing
|
||||
@app_commands.command(name="bing", description="Have a chat with Bing")
|
||||
async def bing(self, interaction: discord.Interaction, *, message: str):
|
||||
try:
|
||||
using = await get_using_send(interaction.user.id)
|
||||
except:
|
||||
await set_using_send(interaction.user.id, False)
|
||||
using = await get_using_send(interaction.user.id)
|
||||
if not using:
|
||||
await interaction.response.defer(ephemeral=False, thinking=True)
|
||||
username = str(interaction.user)
|
||||
usermessage = message
|
||||
channel = str(interaction.channel)
|
||||
user_id = interaction.user.id
|
||||
if user_id not in users_chatbot:
|
||||
await init_chatbot(interaction.user.id)
|
||||
conversation_style = user_conversation_style[user_id]
|
||||
logger.info(f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}]")
|
||||
await users_chatbot[user_id].send_message(interaction, usermessage, conversation_style)
|
||||
else:
|
||||
await interaction.response.defer(ephemeral=True, thinking=True)
|
||||
await interaction.followup.send("> **Please wait for your last conversation to finish.**")
|
||||
|
||||
# Reset Bing conversation
|
||||
@app_commands.command(name="reset", description="Reset Bing conversation")
|
||||
async def reset(self, interaction: discord.Interaction):
|
||||
await interaction.response.defer(ephemeral=True, thinking=True)
|
||||
user_id = interaction.user.id
|
||||
try:
|
||||
await users_chatbot[user_id].reset()
|
||||
await interaction.followup.send("> **Info: Reset finish.**")
|
||||
logger.warning("\x1b[31mBing has been successfully reset\x1b[0m")
|
||||
except:
|
||||
await interaction.followup.send(f"> **You don't have any conversation yet.**")
|
||||
logger.exception("Bing reset failed.")
|
||||
|
||||
# Switch conversation style
|
||||
@app_commands.command(name="switch_style", description="Switch conversation style")
|
||||
@app_commands.choices(style=[app_commands.Choice(name="Creative", value="creative"), app_commands.Choice(name="Balanced", value="balanced"), app_commands.Choice(name="Precise", value="precise")])
|
||||
async def switch_style(self, interaction: discord.Interaction, style: app_commands.Choice[str]):
|
||||
await interaction.response.defer(ephemeral=True, thinking=True)
|
||||
user_id = interaction.user.id
|
||||
if user_id not in users_chatbot:
|
||||
await init_chatbot(user_id)
|
||||
user_conversation_style[user_id] = style.value
|
||||
await interaction.followup.send(f"> **Info: successfull switch conversation style to {style.value}.**")
|
||||
logger.warning(f"\x1b[31mConversation style has been successfully switch to {style.value}\x1b[0m")
|
||||
|
||||
# Set and delete personal Bing Cookies
|
||||
@app_commands.command(name="bing_cookies", description="Set or delete Bing Cookies")
|
||||
@app_commands.choices(choice=[app_commands.Choice(name="set", value="set"), app_commands.Choice(name="delete", value="delete")])
|
||||
async def cookies_setting(self, interaction: discord.Interaction, choice: app_commands.Choice[str], cookies_file: Optional[discord.Attachment]=None):
|
||||
await interaction.response.defer(ephemeral=True, thinking=True)
|
||||
user_id = interaction.user.id
|
||||
if choice.value == "set":
|
||||
try:
|
||||
content = json.loads(await cookies_file.read())
|
||||
for cookie in content:
|
||||
if cookie.get("name") == "_U":
|
||||
auth_cookie = cookie.get("value")
|
||||
break
|
||||
users_image_generator[user_id] = ImageGenAsync(auth_cookie, quiet=True)
|
||||
users_chatbot[user_id] = UserChatbot(cookies=content)
|
||||
user_conversation_style[user_id] = "balanced"
|
||||
await interaction.followup.send("> **Upload successful!**")
|
||||
logger.warning(f"\x1b[31m{interaction.user} set Bing Cookies successful\x1b[0m")
|
||||
except:
|
||||
await interaction.followup.send("> **Please upload your Bing Cookies.**")
|
||||
else:
|
||||
try:
|
||||
del users_chatbot[user_id]
|
||||
del users_image_generator[user_id]
|
||||
del user_conversation_style[user_id]
|
||||
await interaction.followup.send("> **Delete finish.**")
|
||||
logger.warning(f"\x1b[31m{interaction.user} delete Cookies\x1b[0m")
|
||||
except:
|
||||
await interaction.followup.send("> **You don't have any Bing Cookies.**")
|
||||
|
||||
# Create images
|
||||
@app_commands.command(name="create_image", description="generate image by Bing image creator")
|
||||
async def create_image(self, interaction: discord.Interaction, *, prompt: str):
|
||||
user_id = interaction.user.id
|
||||
if interaction.user.id not in users_chatbot:
|
||||
await init_chatbot(user_id)
|
||||
try:
|
||||
using = await get_using_create(user_id)
|
||||
except:
|
||||
await set_using_create(user_id, False)
|
||||
using = await get_using_create(user_id)
|
||||
if not using:
|
||||
logger.info(f"\x1b[31m{interaction.user}\x1b[0m : '{prompt}' ({interaction.channel}) [BingImageCreator]")
|
||||
await users_chatbot[user_id].create_image(interaction, prompt, users_image_generator[user_id] )
|
||||
else:
|
||||
await interaction.response.defer(ephemeral=True, thinking=True)
|
||||
await interaction.followup.send("> **Please wait for your last image to create finish.**")
|
||||
|
||||
async def setup(bot):
|
||||
await bot.add_cog(EdgeGPT(bot))
|
@ -0,0 +1,321 @@
|
||||
import discord
|
||||
import re
|
||||
import os
|
||||
import json
|
||||
import asyncio
|
||||
from EdgeGPT.EdgeGPT import Chatbot, ConversationStyle
|
||||
from dotenv import load_dotenv
|
||||
from discord.ext import commands
|
||||
from core.classes import Cog_Extension
|
||||
from functools import partial
|
||||
from src import log
|
||||
|
||||
load_dotenv()
|
||||
|
||||
USE_SUGGEST_RESPONSES = True
|
||||
try:
|
||||
MENTION_CHANNEL_ID = int(os.getenv("MENTION_CHANNEL_ID"))
|
||||
except:
|
||||
MENTION_CHANNEL_ID = None
|
||||
logger = log.setup_logger(__name__)
|
||||
sem = asyncio.Semaphore(1)
|
||||
conversation_style = "balanced"
|
||||
|
||||
with open("./cookies.json", encoding="utf-8") as file:
|
||||
cookies = json.load(file)
|
||||
chatbot = Chatbot(cookies=cookies)
|
||||
|
||||
|
||||
# To add suggest responses
|
||||
class MyView(discord.ui.View):
|
||||
def __init__(self, chatbot: Chatbot, suggest_responses: list):
|
||||
super().__init__(timeout=120)
|
||||
# Add buttons
|
||||
for label in suggest_responses:
|
||||
button = discord.ui.Button(label=label)
|
||||
|
||||
# Button event
|
||||
async def callback(
|
||||
interaction: discord.Interaction, button: discord.ui.Button
|
||||
):
|
||||
await interaction.response.defer(ephemeral=False, thinking=True)
|
||||
# When click the button, all buttons will disable.
|
||||
for child in self.children:
|
||||
child.disabled = True
|
||||
await interaction.followup.edit_message(
|
||||
message_id=interaction.message.id, view=self
|
||||
)
|
||||
username = str(interaction.user)
|
||||
usermessage = button.label
|
||||
channel = str(interaction.channel)
|
||||
logger.info(
|
||||
f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}] [button]"
|
||||
)
|
||||
task = asyncio.create_task(
|
||||
send_message(chatbot, interaction, usermessage)
|
||||
)
|
||||
await asyncio.gather(task)
|
||||
|
||||
self.add_item(button)
|
||||
self.children[-1].callback = partial(callback, button=button)
|
||||
|
||||
|
||||
# Show Dropdown
|
||||
class DropdownView(discord.ui.View):
|
||||
def __init__(self):
|
||||
super().__init__(timeout=180)
|
||||
|
||||
options = [
|
||||
discord.SelectOption(
|
||||
label="Creative",
|
||||
description="Switch conversation style to Creative",
|
||||
emoji="🎨",
|
||||
),
|
||||
discord.SelectOption(
|
||||
label="Balanced",
|
||||
description="Switch conversation style to Balanced",
|
||||
emoji="⚖️",
|
||||
),
|
||||
discord.SelectOption(
|
||||
label="Precise",
|
||||
description="Switch conversation style to Precise",
|
||||
emoji="🔎",
|
||||
),
|
||||
discord.SelectOption(
|
||||
label="Reset", description="Reset conversation", emoji="🔄"
|
||||
),
|
||||
]
|
||||
|
||||
dropdown = discord.ui.Select(
|
||||
placeholder="Choose setting", min_values=1, max_values=1, options=options
|
||||
)
|
||||
|
||||
dropdown.callback = self.dropdown_callback
|
||||
self.add_item(dropdown)
|
||||
|
||||
# Dropdown event
|
||||
async def dropdown_callback(self, interaction: discord.Interaction):
|
||||
await interaction.response.defer(ephemeral=False, thinking=True)
|
||||
if interaction.data["values"][0] == "Creative":
|
||||
await set_conversation_style("creative")
|
||||
await interaction.followup.send(
|
||||
f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**"
|
||||
)
|
||||
logger.warning(
|
||||
f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m"
|
||||
)
|
||||
elif interaction.data["values"][0] == "Balanced":
|
||||
await set_conversation_style("balanced")
|
||||
await interaction.followup.send(
|
||||
f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**"
|
||||
)
|
||||
logger.warning(
|
||||
f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m"
|
||||
)
|
||||
elif interaction.data["values"][0] == "Precise":
|
||||
await set_conversation_style("precise")
|
||||
await interaction.followup.send(
|
||||
f"> **Info: successfull switch conversation style to *{interaction.data['values'][0]}*.**"
|
||||
)
|
||||
logger.warning(
|
||||
f"\x1b[31mConversation style has been successfully switch to {interaction.data['values'][0]}\x1b[0m"
|
||||
)
|
||||
else:
|
||||
await chatbot.reset()
|
||||
await interaction.followup.send(f"> **Info: Reset finish.**")
|
||||
logger.warning("\x1b[31mBing has been successfully reset\x1b[0m")
|
||||
# disable dropdown after select
|
||||
for dropdown in self.children:
|
||||
dropdown.disabled = True
|
||||
await interaction.followup.edit_message(
|
||||
message_id=interaction.message.id, view=self
|
||||
)
|
||||
|
||||
|
||||
# Set conversation style
|
||||
async def set_conversation_style(style: str):
|
||||
global conversation_style
|
||||
conversation_style = style
|
||||
|
||||
|
||||
async def set_chatbot(cookies):
|
||||
global chatbot
|
||||
chatbot = Chatbot(cookies=cookies)
|
||||
|
||||
|
||||
async def send_message(chatbot: Chatbot, message, user_message: str):
|
||||
async with sem:
|
||||
if isinstance(message, discord.message.Message):
|
||||
await message.channel.typing()
|
||||
reply = ""
|
||||
text = ""
|
||||
link_embed = ""
|
||||
images_embed = []
|
||||
all_url = []
|
||||
try:
|
||||
# Change conversation style
|
||||
if conversation_style == "creative":
|
||||
reply = await chatbot.ask(
|
||||
prompt=user_message,
|
||||
conversation_style=ConversationStyle.creative,
|
||||
simplify_response=True,
|
||||
)
|
||||
elif conversation_style == "precise":
|
||||
reply = await chatbot.ask(
|
||||
prompt=user_message,
|
||||
conversation_style=ConversationStyle.precise,
|
||||
simplify_response=True,
|
||||
)
|
||||
else:
|
||||
reply = await chatbot.ask(
|
||||
prompt=user_message,
|
||||
conversation_style=ConversationStyle.balanced,
|
||||
simplify_response=True,
|
||||
)
|
||||
|
||||
# Get reply text
|
||||
text = f"{reply['text']}"
|
||||
text = re.sub(r"\[\^(\d+)\^\]", lambda match: "", text)
|
||||
|
||||
# Get the URL, if available
|
||||
try:
|
||||
if len(reply["sources"]) != 0:
|
||||
for i, url in enumerate(reply["sources"], start=1):
|
||||
if len(url["providerDisplayName"]) == 0:
|
||||
all_url.append(f"{i}. {url['seeMoreUrl']}")
|
||||
else:
|
||||
all_url.append(
|
||||
f"{i}. [{url['providerDisplayName']}]({url['seeMoreUrl']})"
|
||||
)
|
||||
link_text = "\n".join(all_url)
|
||||
link_embed = discord.Embed(description=link_text)
|
||||
except:
|
||||
pass
|
||||
|
||||
# Set the final message
|
||||
if isinstance(message, discord.interactions.Interaction):
|
||||
user_message = user_message.replace("\n", "")
|
||||
ask = f"> **{user_message}**\t(***style: {conversation_style}***)\n\n"
|
||||
response = f"{ask}{text}"
|
||||
else:
|
||||
response = f"{text}\t(***style: {conversation_style}***)"
|
||||
|
||||
# Discord limit about 2000 characters for a message
|
||||
while len(response) > 2000:
|
||||
temp = response[:2000]
|
||||
response = response[2000:]
|
||||
if isinstance(message, discord.interactions.Interaction):
|
||||
await message.followup.send(temp)
|
||||
else:
|
||||
await message.channel.send(temp)
|
||||
|
||||
# Get the image, if available
|
||||
try:
|
||||
if len(link_embed) == 0:
|
||||
all_image = re.findall(
|
||||
"https?://[\w\./]+", str(reply["sources_text"])
|
||||
)
|
||||
[
|
||||
images_embed.append(
|
||||
discord.Embed(url="https://www.bing.com/").set_image(
|
||||
url=image_link
|
||||
)
|
||||
)
|
||||
for image_link in all_image
|
||||
]
|
||||
except:
|
||||
pass
|
||||
|
||||
if USE_SUGGEST_RESPONSES:
|
||||
suggest_responses = reply["suggestions"]
|
||||
if images_embed:
|
||||
if isinstance(message, discord.interactions.Interaction):
|
||||
await message.followup.send(
|
||||
response,
|
||||
view=MyView(chatbot, suggest_responses),
|
||||
embeds=images_embed,
|
||||
wait=True,
|
||||
)
|
||||
else:
|
||||
await message.channel.send(
|
||||
response,
|
||||
view=MyView(chatbot, suggest_responses),
|
||||
embeds=images_embed,
|
||||
)
|
||||
elif link_embed:
|
||||
if isinstance(message, discord.interactions.Interaction):
|
||||
await message.followup.send(
|
||||
response,
|
||||
view=MyView(chatbot, suggest_responses),
|
||||
embed=link_embed,
|
||||
wait=True,
|
||||
)
|
||||
else:
|
||||
await message.channel.send(
|
||||
response,
|
||||
view=MyView(chatbot, suggest_responses),
|
||||
embed=link_embed,
|
||||
)
|
||||
else:
|
||||
if isinstance(message, discord.interactions.Interaction):
|
||||
await message.followup.send(
|
||||
response, view=MyView(chatbot, suggest_responses), wait=True
|
||||
)
|
||||
else:
|
||||
await message.channel.send(
|
||||
response, view=MyView(chatbot, suggest_responses)
|
||||
)
|
||||
else:
|
||||
if images_embed:
|
||||
if isinstance(message, discord.interactions.Interaction):
|
||||
await message.followup.send(
|
||||
response, embeds=images_embed, wait=True
|
||||
)
|
||||
else:
|
||||
await message.channel.send(response, embeds=images_embed)
|
||||
elif link_embed:
|
||||
if isinstance(message, discord.interactions.Interaction):
|
||||
await message.followup.send(
|
||||
response, embed=link_embed, wait=True
|
||||
)
|
||||
else:
|
||||
await message.channel.send(response, embed=link_embed)
|
||||
else:
|
||||
if isinstance(message, discord.interactions.Interaction):
|
||||
await message.followup.send(response, wait=True)
|
||||
else:
|
||||
await message.channel.send(response)
|
||||
except Exception as e:
|
||||
if isinstance(message, discord.interactions.Interaction):
|
||||
await message.followup.send(f">>> **Error: {e}**")
|
||||
else:
|
||||
await message.channel.send(f">>> **Error: {e}**")
|
||||
logger.exception(f"Error while sending message: {e}")
|
||||
|
||||
|
||||
class Event(Cog_Extension):
|
||||
@commands.Cog.listener()
|
||||
async def on_message(self, message: discord.Message):
|
||||
if message.author == self.bot.user:
|
||||
return
|
||||
if self.bot.user in message.mentions:
|
||||
if not MENTION_CHANNEL_ID or message.channel.id == MENTION_CHANNEL_ID:
|
||||
content = re.sub(r"<@.*?>", "", message.content).strip()
|
||||
if len(content) > 0:
|
||||
username = str(message.author)
|
||||
channel = str(message.channel)
|
||||
logger.info(
|
||||
f"\x1b[31m{username}\x1b[0m : '{content}' ({channel}) [Style: {conversation_style}]"
|
||||
)
|
||||
task = asyncio.create_task(send_message(chatbot, message, content))
|
||||
await asyncio.gather(task)
|
||||
else:
|
||||
await message.channel.send(view=DropdownView())
|
||||
elif MENTION_CHANNEL_ID is not None:
|
||||
await message.channel.send(
|
||||
f"> **Can only be mentioned at <#{self.bot.get_channel(MENTION_CHANNEL_ID).id}>**"
|
||||
)
|
||||
|
||||
|
||||
async def setup(bot):
|
||||
await bot.add_cog(Event(bot))
|
@ -0,0 +1,35 @@
|
||||
import discord
|
||||
from core.classes import Cog_Extension
|
||||
from discord import app_commands
|
||||
|
||||
|
||||
class Help(Cog_Extension):
|
||||
@app_commands.command(name="help", description="Show how to use")
|
||||
async def help(self, interaction: discord.Interaction):
|
||||
embed = discord.Embed(
|
||||
title="Help",
|
||||
)
|
||||
embed.add_field(
|
||||
name="/bing_cookies",
|
||||
value="Set and delete your Bing Cookies.",
|
||||
inline=False,
|
||||
)
|
||||
embed.add_field(name="/bing", value="Chat with Bing.", inline=False)
|
||||
embed.add_field(
|
||||
name="/reset", value="Reset your Bing conversation.", inline=False
|
||||
)
|
||||
embed.add_field(
|
||||
name="/switch_style",
|
||||
value="Switch your Bing conversation style.",
|
||||
inline=False,
|
||||
)
|
||||
embed.add_field(
|
||||
name="/create_image",
|
||||
value="Generate image by Bing Image Creator.",
|
||||
inline=False,
|
||||
)
|
||||
await interaction.response.send_message(embed=embed)
|
||||
|
||||
|
||||
async def setup(bot):
|
||||
await bot.add_cog(Help(bot))
|
@ -0,0 +1,11 @@
|
||||
version: '3'
|
||||
|
||||
services:
|
||||
spartan:
|
||||
container_name: Spartan
|
||||
build: .
|
||||
environment:
|
||||
- DISCORD_BOT_TOKEN=${DISCORD_BOT_TOKEN}
|
||||
volumes:
|
||||
- ./cookies.json:/bot/cookies.json
|
||||
- ./config.yml:/bot/config.yml
|
@ -0,0 +1,6 @@
|
||||
[
|
||||
{
|
||||
"name": "cookie1",
|
||||
"value": "1lEXeWRSIPUsQ0S3tdAc3v7BexGK2qBlzsXz8j52w_HNBoOsegjiwRySQHmfoWduHVUxSXo6cETPP2qNrYWAz6k7wn43WGO9i7ll9_Wl7M6HA2c9twbKByfAtAB5fr26wPawQ6y1GCdakD_Kr4xdD20fvkytnmOmZu7Ktnb9mUVE605AAbJcIA9SOlRN5410ZPOnZA1cIzr4WtAFWNfQKPG6Sxk_zO5zvXQfYTyMNmOI"
|
||||
}
|
||||
]
|
@ -0,0 +1,6 @@
|
||||
from discord.ext import commands
|
||||
|
||||
|
||||
class Cog_Extension(commands.Cog):
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
@ -0,0 +1,4 @@
|
||||
discord.py==2.3.2
|
||||
python-dotenv==1.0.0
|
||||
PyYAML==6.0.1
|
||||
bing-chat==1.9.3
|
@ -0,0 +1,40 @@
|
||||
import discord
|
||||
import asyncio
|
||||
from src import log
|
||||
|
||||
logger = log.setup_logger(__name__)
|
||||
using_func = {}
|
||||
|
||||
|
||||
async def get_using_create(user_id):
|
||||
return using_func[user_id]
|
||||
|
||||
|
||||
async def set_using_create(user_id, status: bool):
|
||||
using_func[user_id] = status
|
||||
|
||||
|
||||
async def create_image(interaction: discord.Interaction, prompt: str, image_generator):
|
||||
await interaction.response.defer(ephemeral=False, thinking=True)
|
||||
using_func[interaction.user.id] = True
|
||||
try:
|
||||
embeds = []
|
||||
prompts = f"> **{prompt}** - <@{str(interaction.user.id)}> (***BingImageCreator***)\n\n"
|
||||
# Fetches image links
|
||||
images = await image_generator.get_images(prompt)
|
||||
# Add embed to list of embeds
|
||||
[
|
||||
embeds.append(
|
||||
discord.Embed(url="https://www.bing.com/").set_image(url=image_link)
|
||||
)
|
||||
for image_link in images
|
||||
]
|
||||
await interaction.followup.send(prompts, embeds=embeds, wait=True)
|
||||
except asyncio.TimeoutError:
|
||||
await interaction.followup.send("> **Error: Request timed out.**")
|
||||
logger.exception("Error while create image: Request timed out.")
|
||||
except Exception as e:
|
||||
await interaction.followup.send(f"> **Error: {e}**")
|
||||
logger.exception(f"Error while create image: {e}")
|
||||
finally:
|
||||
using_func[interaction.user.id] = False
|
@ -0,0 +1,66 @@
|
||||
import os
|
||||
import logging
|
||||
import logging.handlers
|
||||
|
||||
|
||||
class CustomFormatter(logging.Formatter):
|
||||
LEVEL_COLORS = [
|
||||
(logging.DEBUG, "\x1b[40;1m"),
|
||||
(logging.INFO, "\x1b[34;1m"),
|
||||
(logging.WARNING, "\x1b[33;1m"),
|
||||
(logging.ERROR, "\x1b[31m"),
|
||||
(logging.CRITICAL, "\x1b[41m"),
|
||||
]
|
||||
FORMATS = {
|
||||
level: logging.Formatter(
|
||||
f"\x1b[30;1m%(asctime)s\x1b[0m {color}%(levelname)-8s\x1b[0m \x1b[35m%(name)s\x1b[0m -> %(message)s",
|
||||
"%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
for level, color in LEVEL_COLORS
|
||||
}
|
||||
|
||||
def format(self, record):
|
||||
formatter = self.FORMATS.get(record.levelno)
|
||||
if formatter is None:
|
||||
formatter = self.FORMATS[logging.DEBUG]
|
||||
|
||||
# Override the traceback to always print in red
|
||||
if record.exc_info:
|
||||
text = formatter.formatException(record.exc_info)
|
||||
record.exc_text = f"\x1b[31m{text}\x1b[0m"
|
||||
|
||||
output = formatter.format(record)
|
||||
|
||||
# Remove the cache layer
|
||||
record.exc_text = None
|
||||
return output
|
||||
|
||||
|
||||
def setup_logger(module_name: str) -> logging.Logger:
|
||||
# create logger
|
||||
library, _, _ = module_name.partition(".py")
|
||||
logger = logging.getLogger(library)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
if not logger.handlers:
|
||||
# create console handler
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setLevel(logging.INFO)
|
||||
console_handler.setFormatter(CustomFormatter())
|
||||
# specify that the log file path is the same as `main.py` file path
|
||||
grandparent_dir = os.path.abspath(__file__ + "/../../")
|
||||
log_name = "discord_bot.log"
|
||||
log_path = os.path.join(grandparent_dir, log_name)
|
||||
# create local log handler
|
||||
log_handler = logging.handlers.RotatingFileHandler(
|
||||
filename=log_path,
|
||||
encoding="utf-8",
|
||||
maxBytes=32 * 1024 * 1024, # 32 MiB
|
||||
backupCount=2, # Rotate through 5 files
|
||||
)
|
||||
log_handler.setFormatter(CustomFormatter())
|
||||
# Add handlers to logger
|
||||
logger.addHandler(log_handler)
|
||||
logger.addHandler(console_handler)
|
||||
|
||||
return logger
|
@ -0,0 +1,194 @@
|
||||
import discord
|
||||
import re
|
||||
from EdgeGPT.EdgeGPT import Chatbot, ConversationStyle
|
||||
from src import log
|
||||
from functools import partial
|
||||
|
||||
USE_SUGGEST_RESPONSES = True
|
||||
logger = log.setup_logger(__name__)
|
||||
using_func = {}
|
||||
|
||||
|
||||
# To add suggest responses
|
||||
class MyView(discord.ui.View):
|
||||
def __init__(
|
||||
self,
|
||||
interaction: discord.Interaction,
|
||||
chatbot: Chatbot,
|
||||
conversation_style: str,
|
||||
suggest_responses: list,
|
||||
):
|
||||
super().__init__(timeout=120)
|
||||
self.button_author = interaction.user.id
|
||||
# Add buttons
|
||||
for label in suggest_responses:
|
||||
button = discord.ui.Button(label=label)
|
||||
|
||||
# Button event
|
||||
async def callback(
|
||||
interaction: discord.Interaction,
|
||||
button_author: int,
|
||||
button: discord.ui.Button,
|
||||
):
|
||||
if interaction.user.id != button_author:
|
||||
await interaction.response.defer(ephemeral=True, thinking=True)
|
||||
await interaction.followup.send(
|
||||
"You don't have permission to press this button."
|
||||
)
|
||||
elif not using_func[interaction.user.id]:
|
||||
await interaction.response.defer(ephemeral=False, thinking=True)
|
||||
# When click the button, all buttons will disable.
|
||||
for child in self.children:
|
||||
child.disabled = True
|
||||
await interaction.followup.edit_message(
|
||||
message_id=interaction.message.id, view=self
|
||||
)
|
||||
username = str(interaction.user)
|
||||
usermessage = button.label
|
||||
channel = str(interaction.channel)
|
||||
logger.info(
|
||||
f"\x1b[31m{username}\x1b[0m : '{usermessage}' ({channel}) [Style: {conversation_style}] [button]"
|
||||
)
|
||||
await send_message(
|
||||
chatbot, interaction, usermessage, conversation_style
|
||||
)
|
||||
else:
|
||||
await interaction.response.defer(ephemeral=True, thinking=True)
|
||||
await interaction.followup.send(
|
||||
"Please wait for your last conversation to finish."
|
||||
)
|
||||
|
||||
self.add_item(button)
|
||||
self.children[-1].callback = partial(
|
||||
callback, button_author=self.button_author, button=button
|
||||
)
|
||||
|
||||
|
||||
async def get_using_send(user_id):
|
||||
return using_func[user_id]
|
||||
|
||||
|
||||
async def set_using_send(user_id, status: bool):
|
||||
using_func[user_id] = status
|
||||
|
||||
|
||||
async def send_message(
|
||||
chatbot: Chatbot,
|
||||
interaction: discord.Interaction,
|
||||
user_message: str,
|
||||
conversation_style: str,
|
||||
):
|
||||
using_func[interaction.user.id] = True
|
||||
reply = ""
|
||||
text = ""
|
||||
link_embed = ""
|
||||
images_embed = []
|
||||
all_url = []
|
||||
try:
|
||||
# Change conversation style
|
||||
if conversation_style == "creative":
|
||||
reply = await chatbot.ask(
|
||||
prompt=user_message,
|
||||
conversation_style=ConversationStyle.creative,
|
||||
simplify_response=True,
|
||||
)
|
||||
elif conversation_style == "precise":
|
||||
reply = await chatbot.ask(
|
||||
prompt=user_message,
|
||||
conversation_style=ConversationStyle.precise,
|
||||
simplify_response=True,
|
||||
)
|
||||
else:
|
||||
reply = await chatbot.ask(
|
||||
prompt=user_message,
|
||||
conversation_style=ConversationStyle.balanced,
|
||||
simplify_response=True,
|
||||
)
|
||||
|
||||
# Get reply text
|
||||
text = f"{reply['text']}"
|
||||
text = re.sub(r"\[\^(\d+)\^\]", lambda match: "", text)
|
||||
|
||||
# Get the URL, if available
|
||||
try:
|
||||
if len(reply["sources"]) != 0:
|
||||
for i, url in enumerate(reply["sources"], start=1):
|
||||
if len(url["providerDisplayName"]) == 0:
|
||||
all_url.append(f"{i}. {url['seeMoreUrl']}")
|
||||
else:
|
||||
all_url.append(
|
||||
f"{i}. [{url['providerDisplayName']}]({url['seeMoreUrl']})"
|
||||
)
|
||||
link_text = "\n".join(all_url)
|
||||
link_embed = discord.Embed(description=link_text)
|
||||
except:
|
||||
pass
|
||||
|
||||
# Set the final message
|
||||
user_message = user_message.replace("\n", "")
|
||||
ask = f"> **{user_message}** - <@{str(interaction.user.id)}> (***style: {conversation_style}***)\n\n"
|
||||
response = f"{ask}{text}"
|
||||
|
||||
# Discord limit about 2000 characters for a message
|
||||
while len(response) > 2000:
|
||||
temp = response[:2000]
|
||||
response = response[2000:]
|
||||
await interaction.followup.send(temp)
|
||||
|
||||
# Get the image, if available
|
||||
try:
|
||||
if len(link_embed) == 0:
|
||||
all_image = re.findall("https?://[\w\./]+", str(reply["sources_text"]))
|
||||
[
|
||||
images_embed.append(
|
||||
discord.Embed(url="https://www.bing.com/").set_image(
|
||||
url=image_link
|
||||
)
|
||||
)
|
||||
for image_link in all_image
|
||||
]
|
||||
except:
|
||||
pass
|
||||
# Add all suggest responses in list
|
||||
if USE_SUGGEST_RESPONSES:
|
||||
suggest_responses = reply["suggestions"]
|
||||
if images_embed:
|
||||
await interaction.followup.send(
|
||||
response,
|
||||
view=MyView(
|
||||
interaction, chatbot, conversation_style, suggest_responses
|
||||
),
|
||||
embeds=images_embed,
|
||||
wait=True,
|
||||
)
|
||||
elif link_embed:
|
||||
await interaction.followup.send(
|
||||
response,
|
||||
view=MyView(
|
||||
interaction, chatbot, conversation_style, suggest_responses
|
||||
),
|
||||
embed=link_embed,
|
||||
wait=True,
|
||||
)
|
||||
else:
|
||||
await interaction.followup.send(
|
||||
response,
|
||||
view=MyView(
|
||||
interaction, chatbot, conversation_style, suggest_responses
|
||||
),
|
||||
wait=True,
|
||||
)
|
||||
else:
|
||||
if images_embed:
|
||||
await interaction.followup.send(
|
||||
response, embeds=images_embed, wait=True
|
||||
)
|
||||
elif link_embed:
|
||||
await interaction.followup.send(response, embed=link_embed, wait=True)
|
||||
else:
|
||||
await interaction.followup.send(response, wait=True)
|
||||
except Exception as e:
|
||||
await interaction.followup.send(f">>> **Error: {e}**")
|
||||
logger.exception(f"Error while sending message: {e}")
|
||||
finally:
|
||||
using_func[interaction.user.id] = False
|
@ -0,0 +1,197 @@
|
||||
import asyncio
|
||||
import argparse
|
||||
from collections import Counter
|
||||
import json
|
||||
import pathlib
|
||||
import re
|
||||
|
||||
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
import gradio as gr
|
||||
from gradio import utils
|
||||
import requests
|
||||
|
||||
from typing import Dict, List
|
||||
|
||||
from utils import *
|
||||
|
||||
|
||||
lock = asyncio.Lock()
|
||||
|
||||
bot = commands.Bot("", intents=discord.Intents(messages=True, guilds=True))
|
||||
|
||||
|
||||
GUILD_SPACES_FILE = "guild_spaces.pkl"
|
||||
|
||||
|
||||
if pathlib.Path(GUILD_SPACES_FILE).exists():
|
||||
guild_spaces = read_pickle_file(GUILD_SPACES_FILE)
|
||||
assert isinstance(guild_spaces, dict), f"{GUILD_SPACES_FILE} in invalid format."
|
||||
guild_blocks = {}
|
||||
delete_keys = []
|
||||
for k, v in guild_spaces.items():
|
||||
try:
|
||||
guild_blocks[k] = gr.Interface.load(v, src="spaces")
|
||||
except ValueError:
|
||||
delete_keys.append(k)
|
||||
for k in delete_keys:
|
||||
del guild_spaces[k]
|
||||
else:
|
||||
guild_spaces: Dict[int, str] = {}
|
||||
guild_blocks: Dict[int, gr.Blocks] = {}
|
||||
|
||||
|
||||
HASHED_USERS_FILE = "users.pkl"
|
||||
|
||||
if pathlib.Path(HASHED_USERS_FILE).exists():
|
||||
hashed_users = read_pickle_file(HASHED_USERS_FILE)
|
||||
assert isinstance(hashed_users, list), f"{HASHED_USERS_FILE} in invalid format."
|
||||
else:
|
||||
hashed_users: List[str] = []
|
||||
|
||||
|
||||
@bot.event
|
||||
async def on_ready():
|
||||
print(f"Logged in as {bot.user}")
|
||||
print(f"Running in {len(bot.guilds)} servers...")
|
||||
|
||||
|
||||
async def run_prediction(space: gr.Blocks, *inputs):
|
||||
inputs = list(inputs)
|
||||
fn_index = 0
|
||||
processed_inputs = space.serialize_data(fn_index=fn_index, inputs=inputs)
|
||||
batch = space.dependencies[fn_index]["batch"]
|
||||
|
||||
if batch:
|
||||
processed_inputs = [[inp] for inp in processed_inputs]
|
||||
|
||||
outputs = await space.process_api(
|
||||
fn_index=fn_index, inputs=processed_inputs, request=None, state={}
|
||||
)
|
||||
outputs = outputs["data"]
|
||||
|
||||
if batch:
|
||||
outputs = [out[0] for out in outputs]
|
||||
|
||||
processed_outputs = space.deserialize_data(fn_index, outputs)
|
||||
processed_outputs = utils.resolve_singleton(processed_outputs)
|
||||
|
||||
return processed_outputs
|
||||
|
||||
|
||||
async def display_stats(message: discord.Message):
|
||||
await message.channel.send(
|
||||
f"Running in {len(bot.guilds)} servers\n"
|
||||
f"Total # of users: {len(hashed_users)}\n"
|
||||
f"------------------"
|
||||
)
|
||||
await message.channel.send(f"Most popular spaces:")
|
||||
# display the top 10 most frequently occurring strings and their counts
|
||||
spaces = guild_spaces.values()
|
||||
counts = Counter(spaces)
|
||||
for space, count in counts.most_common(10):
|
||||
await message.channel.send(f"- {space}: {count}")
|
||||
|
||||
|
||||
async def load_space(guild: discord.Guild, message: discord.Message, content: str):
|
||||
iframe_url = (
|
||||
requests.get(f"https://huggingface.co/api/spaces/{content}/host")
|
||||
.json()
|
||||
.get("host")
|
||||
)
|
||||
if iframe_url is None:
|
||||
return await message.channel.send(
|
||||
f"Space: {content} not found. If you'd like to make a prediction, enclose the inputs in quotation marks."
|
||||
)
|
||||
else:
|
||||
await message.channel.send(
|
||||
f"Loading Space: https://huggingface.co/spaces/{content}..."
|
||||
)
|
||||
interface = gr.Interface.load(content, src="spaces")
|
||||
guild_spaces[guild.id] = content
|
||||
guild_blocks[guild.id] = interface
|
||||
asyncio.create_task(update_pickle_file(guild_spaces, GUILD_SPACES_FILE))
|
||||
if len(content) > 32 - len(f"{bot.name} []"): # type: ignore
|
||||
nickname = content[: 32 - len(f"{bot.name} []") - 3] + "..." # type: ignore
|
||||
else:
|
||||
nickname = content
|
||||
nickname = f"{bot.name} [{nickname}]" # type: ignore
|
||||
await guild.me.edit(nick=nickname)
|
||||
await message.channel.send(
|
||||
"Ready to make predictions! Type in your inputs and enclose them in quotation marks."
|
||||
)
|
||||
|
||||
|
||||
async def disconnect_space(bot: commands.Bot, guild: discord.Guild):
|
||||
guild_spaces.pop(guild.id, None)
|
||||
guild_blocks.pop(guild.id, None)
|
||||
asyncio.create_task(update_pickle_file(guild_spaces, GUILD_SPACES_FILE))
|
||||
await guild.me.edit(nick=bot.name) # type: ignore
|
||||
|
||||
|
||||
async def make_prediction(guild: discord.Guild, message: discord.Message, content: str):
|
||||
if guild.id in guild_spaces:
|
||||
params = re.split(r' (?=")', content)
|
||||
params = [p.strip("'\"") for p in params]
|
||||
space = guild_blocks[guild.id]
|
||||
predictions = await run_prediction(space, *params)
|
||||
if isinstance(predictions, (tuple, list)):
|
||||
for p in predictions:
|
||||
await send_file_or_text(message.channel, p)
|
||||
else:
|
||||
await send_file_or_text(message.channel, predictions)
|
||||
return
|
||||
else:
|
||||
await message.channel.send(
|
||||
"No Space is currently running. Please type in the name of a Hugging Face Space name first, e.g. abidlabs/en2fr"
|
||||
)
|
||||
await guild.me.edit(nick=bot.name) # type: ignore
|
||||
|
||||
|
||||
@bot.event
|
||||
async def on_message(message: discord.Message):
|
||||
if message.author == bot.user:
|
||||
return
|
||||
h = hash_user_id(message.author.id)
|
||||
if h not in hashed_users:
|
||||
hashed_users.append(h)
|
||||
asyncio.create_task(update_pickle_file(hashed_users, HASHED_USERS_FILE))
|
||||
else:
|
||||
if message.content:
|
||||
content = remove_tags(message.content)
|
||||
guild = message.channel.guild
|
||||
assert guild, "Message not sent in a guild."
|
||||
|
||||
if content.strip() == "exit":
|
||||
await disconnect_space(bot, guild)
|
||||
elif content.strip() == "stats":
|
||||
await display_stats(message)
|
||||
elif content.startswith('"') or content.startswith("'"):
|
||||
await make_prediction(guild, message, content)
|
||||
else:
|
||||
await load_space(guild, message, content)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--token",
|
||||
type=str,
|
||||
help="API key for the Discord bot. You can set this to your Discord token if you'd like to make your own clone of the Gradio Bot.",
|
||||
required=False,
|
||||
default="",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.token.strip():
|
||||
discord_token = args.token
|
||||
bot.env = "staging" # type: ignore
|
||||
bot.name = "StagingBot" # type: ignore
|
||||
else:
|
||||
with open("secrets.json") as fp:
|
||||
discord_token = json.load(fp)["discord_token"]
|
||||
bot.env = "prod" # type: ignore
|
||||
bot.name = "GradioBot" # type: ignore
|
||||
|
||||
bot.run(discord_token)
|
@ -0,0 +1,41 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import pickle
|
||||
import hashlib
|
||||
import pathlib
|
||||
from typing import Dict, List
|
||||
|
||||
import discord
|
||||
|
||||
lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def update_pickle_file(data: Dict | List, file_path: str):
|
||||
async with lock:
|
||||
with open(file_path, "wb") as fp:
|
||||
pickle.dump(data, fp)
|
||||
|
||||
|
||||
def read_pickle_file(file_path: str):
|
||||
with open(file_path, "rb") as fp:
|
||||
return pickle.load(fp)
|
||||
|
||||
|
||||
async def send_file_or_text(channel, file_or_text: str):
|
||||
# if the file exists, send as a file
|
||||
if pathlib.Path(str(file_or_text)).exists():
|
||||
with open(file_or_text, "rb") as f:
|
||||
return await channel.send(file=discord.File(f))
|
||||
else:
|
||||
return await channel.send(file_or_text)
|
||||
|
||||
|
||||
def remove_tags(content: str) -> str:
|
||||
content = content.replace("<@1040198143695933501>", "")
|
||||
content = content.replace("<@1057338428938788884>", "")
|
||||
return content.strip()
|
||||
|
||||
|
||||
def hash_user_id(user_id: int) -> str:
|
||||
return hashlib.sha256(str(user_id).encode("utf-8")).hexdigest()
|
@ -0,0 +1,2 @@
|
||||
OPENAI_API_KEY="YOUR_API_KEY"
|
||||
DALLE_COOKIE="YOUR_COOKIE"
|
@ -0,0 +1,71 @@
|
||||
MythGen: A Dynamic New Art Form
|
||||
Overview
|
||||
|
||||

|
||||
|
||||
|
||||
MythGen is an Iterative Multimedia Generator that allows users to create their own comic stories based on textual prompts. The system integrates state-of-the-art language and image models to provide a seamless and creative experience.
|
||||
Features
|
||||
|
||||
Initial Prompting: Kick-start your story with an initial text prompt.
|
||||
Artistic Style Suffix: Maintain a consistent artistic style throughout your comic.
|
||||
Image Generation: Generate captivating comic panels based on textual captions.
|
||||
Caption Generation: Produce engaging captions for each comic panel.
|
||||
Interactive Story Building: Select your favorite panels and captions to build your story iteratively.
|
||||
Storyboard: View the sequence of your selected panels and their associated captions.
|
||||
State Management: Keep track of the current state of your comic generation process.
|
||||
User-Friendly Interface: Easy-to-use interface built on Gradio.
|
||||
|
||||
Prerequisites
|
||||
OpenAI API Key
|
||||
|
||||
You will need an OpenAI API key to access GPT-3 for generating captions. Follow these steps to obtain one:
|
||||
|
||||
Visit OpenAI's Developer Dashboard.
|
||||
Sign up for an API key and follow the verification process.
|
||||
Once verified, you will be provided with an API key.
|
||||
|
||||
Bing Image Creator Cookie
|
||||
|
||||
You should obtain your cookie to run this program. Follow these steps to obtain your cookie:
|
||||
|
||||
Go to Bing Image Creator in your browser and log in to your account.
|
||||
Press Ctrl+Shift+J to open developer tools.
|
||||
Navigate to the Application section.
|
||||
Click on the Cookies section.
|
||||
Find the variable _U and copy its value.
|
||||
|
||||
How to Use
|
||||
|
||||
Initial Prompt: Start by inputting your initial comic concept.
|
||||
Select a Panel: Choose your favorite panel and caption from the generated options.
|
||||
Iterate: Use the "Next Part" button to generate the next part of your comic based on your latest selection.
|
||||
View Storyboard: See your selected comic panels and captions in a storyboard for a comprehensive view of your comic.
|
||||
Finalize: Continue this process until you've created your full comic story.
|
||||
|
||||
Installation
|
||||
|
||||
bash
|
||||
|
||||
pip install -r requirements.txt
|
||||
|
||||
Running MythGen
|
||||
|
||||
bash
|
||||
|
||||
python main.py
|
||||
|
||||
This will launch the Gradio interface where you can interact with MythGen.
|
||||
Dependencies
|
||||
|
||||
Python 3.x
|
||||
Gradio
|
||||
OpenAI's GPT-3
|
||||
DALL-E
|
||||
|
||||
Contributing
|
||||
|
||||
We welcome contributions! Please read the CONTRIBUTING.md for guidelines on how to contribute to this project.
|
||||
License
|
||||
|
||||
This project is licensed under the MIT License. See LICENSE.md for details.
|
@ -0,0 +1,6 @@
|
||||
[
|
||||
{
|
||||
"name": "cookie1",
|
||||
"value": "1lEXeWRSIPUsQ0S3tdAc3v7BexGK2qBlzsXz8j52w_HNBoOsegjiwRySQHmfoWduHVUxSXo6cETPP2qNrYWAz6k7wn43WGO9i7ll9_Wl7M6HA2c9twbKByfAtAB5fr26wPawQ6y1GCdakD_Kr4xdD20fvkytnmOmZu7Ktnb9mUVE605AAbJcIA9SOlRN5410ZPOnZA1cIzr4WtAFWNfQKPG6Sxk_zO5zvXQfYTyMNmOI"
|
||||
}
|
||||
]
|
@ -0,0 +1,8 @@
|
||||
dalle3==0.0.7
|
||||
Flask==2.3.2
|
||||
gradio==3.48.0
|
||||
openai==0.28.1
|
||||
Pillow==10.1.0
|
||||
python-dotenv==1.0.0
|
||||
Requests==2.31.0
|
||||
swarms==1.8.2
|
@ -1,135 +1,77 @@
|
||||
import openai
|
||||
import os
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
import interpreter
|
||||
import dotenv
|
||||
import whisper
|
||||
import logging
|
||||
import gradio as gr
|
||||
from BingImageCreator import ImageGen
|
||||
from swarms.models.bing_chat import BingChat
|
||||
|
||||
# from swarms.models.bingchat import BingChat
|
||||
dotenv.load_dotenv(".env")
|
||||
|
||||
bot_id = os.getenv("BOT_ID")
|
||||
bot_token = os.getenv("DISCORD_TOKEN")
|
||||
|
||||
interpreter.api_key = os.getenv("OPENAI_API_KEY")
|
||||
# interpreter.api_base = os.getenv("API_BASE")
|
||||
# interpreter.auto_run = True
|
||||
|
||||
|
||||
def split_text(text, chunk_size=1500):
|
||||
#########################################################################
|
||||
return [text[i : i + chunk_size] for i in range(0, len(text), chunk_size)]
|
||||
|
||||
|
||||
# discord initial
|
||||
intents = discord.Intents.all()
|
||||
intents.message_content = True
|
||||
client = commands.Bot(command_prefix="$", intents=intents)
|
||||
|
||||
message_chunks = []
|
||||
send_image = False
|
||||
|
||||
model = whisper.load_model("base")
|
||||
|
||||
|
||||
def transcribe(audio):
|
||||
# load audio and pad/trim it to fit 30 seconds
|
||||
audio = whisper.load_audio(audio)
|
||||
audio = whisper.pad_or_trim(audio)
|
||||
|
||||
# make log-Mel spectrogram and move to the same device as the model
|
||||
mel = whisper.log_mel_spectrogram(audio).to(model.device)
|
||||
|
||||
# detect the spoken language
|
||||
_, probs = model.detect_language(mel)
|
||||
|
||||
# decode the audio
|
||||
options = whisper.DecodingOptions()
|
||||
result = whisper.decode(model, mel, options)
|
||||
return result.text
|
||||
|
||||
|
||||
@client.event
|
||||
async def on_message(message):
|
||||
await client.process_commands(message)
|
||||
bot_mention = f"<@{bot_id}>"
|
||||
# if ("<@1158923910855798804>" in message.content) or (message.author == client.user or message.content[0] == '$'):
|
||||
# return
|
||||
response = []
|
||||
for chunk in interpreter.chat(message.content, display=False, stream=False):
|
||||
# await message.channel.send(chunk)
|
||||
if "message" in chunk:
|
||||
response.append(chunk["message"])
|
||||
last_response = response[-1]
|
||||
|
||||
max_message_length = 2000 # Discord's max message length is 2000 characters
|
||||
# Splitting the message into chunks of 2000 characters
|
||||
response_chunks = [
|
||||
last_response[i : i + max_message_length]
|
||||
for i in range(0, len(last_response), max_message_length)
|
||||
]
|
||||
# Sending each chunk as a separate message
|
||||
for chunk in response_chunks:
|
||||
await message.channel.send(chunk)
|
||||
|
||||
|
||||
@client.command()
|
||||
async def join(ctx):
|
||||
if ctx.author.voice:
|
||||
channel = ctx.message.author.voice.channel
|
||||
print("joining..")
|
||||
await channel.connect()
|
||||
print("joined.")
|
||||
else:
|
||||
print("not in a voice channel!")
|
||||
|
||||
|
||||
@client.command()
|
||||
async def leave(ctx):
|
||||
if ctx.voice_client:
|
||||
await ctx.voice_client.disconnect()
|
||||
else:
|
||||
print("not in a voice channel!")
|
||||
|
||||
|
||||
@client.command()
|
||||
async def listen(ctx):
|
||||
if ctx.voice_client:
|
||||
print("trying to listen..")
|
||||
ctx.voice_client.start_recording(discord.sinks.WaveSink(), callback, ctx)
|
||||
print("listening..")
|
||||
else:
|
||||
print("not in a voice channel!")
|
||||
|
||||
|
||||
async def callback(sink: discord.sinks, ctx):
|
||||
print("in callback..")
|
||||
for user_id, audio in sink.audio_data.items():
|
||||
if user_id == ctx.author.id:
|
||||
print("saving audio..")
|
||||
audio: discord.sinks.core.AudioData = audio
|
||||
print(user_id)
|
||||
filename = "audio.wav"
|
||||
with open(filename, "wb") as f:
|
||||
f.write(audio.file.getvalue())
|
||||
print("audio saved.")
|
||||
transcription = transcribe(filename)
|
||||
print(transcription)
|
||||
response = []
|
||||
for chunk in interpreter.chat(transcription, display=False, stream=True):
|
||||
# await message.channel.send(chunk)
|
||||
if "message" in chunk:
|
||||
response.append(chunk["message"])
|
||||
await ctx.message.channel.send(" ".join(response))
|
||||
|
||||
|
||||
@client.command()
|
||||
async def stop(ctx):
|
||||
ctx.voice_client.stop_recording()
|
||||
|
||||
|
||||
@client.event
|
||||
async def on_ready():
|
||||
print(f"We have logged in as {client.user}")
|
||||
|
||||
|
||||
client.run(bot_token)
|
||||
# Initialize the EdgeGPTModel
|
||||
model = BingChat()
|
||||
|
||||
response = model("Generate")
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
accumulated_story = ""
|
||||
latest_caption = ""
|
||||
standard_suffix = ""
|
||||
storyboard = []
|
||||
|
||||
caption = "Create comic about opensourcerer a robot wizard"
|
||||
|
||||
def generate_images_with_bingchat(caption):
|
||||
img_path = model.create_img(caption)
|
||||
img_urls = model.images(caption)
|
||||
return img_urls
|
||||
|
||||
def generate_single_caption(text):
|
||||
prompt = f"A comic about {text}."
|
||||
response = model(text)
|
||||
return response
|
||||
|
||||
def interpret_text_with_gpt(text, suffix):
|
||||
return generate_single_caption(f"{text} {suffix}")
|
||||
|
||||
def create_standard_suffix(original_prompt):
|
||||
return f"In the style of {original_prompt}"
|
||||
|
||||
def gradio_interface(text=None, next_button_clicked=False):
|
||||
global accumulated_story, latest_caption, standard_suffix, storyboard
|
||||
|
||||
if not standard_suffix:
|
||||
standard_suffix = create_standard_suffix(text)
|
||||
|
||||
if next_button_clicked:
|
||||
new_caption = generate_single_caption(latest_caption + " " + standard_suffix)
|
||||
new_urls = generate_images_with_bingchat(new_caption)
|
||||
latest_caption = new_caption
|
||||
storyboard.append((new_urls, new_caption))
|
||||
|
||||
elif text:
|
||||
caption = generate_single_caption(text + " " + standard_suffix)
|
||||
comic_panel_urls = generate_images_with_bingchat(caption)
|
||||
latest_caption = caption
|
||||
storyboard.append((comic_panel_urls, caption))
|
||||
|
||||
storyboard_html = ""
|
||||
for urls, cap in storyboard:
|
||||
for url in urls:
|
||||
storyboard_html += f'<img src="{url}" alt="{cap}" width="300"/><br>{cap}<br>'
|
||||
|
||||
return storyboard_html
|
||||
|
||||
if __name__ == "__main__":
|
||||
iface = gr.Interface(
|
||||
fn=gradio_interface,
|
||||
inputs=[
|
||||
gr.inputs.Textbox(default="Type your story concept here", optional=True, label="Story Concept"),
|
||||
gr.inputs.Checkbox(label="Generate Next Part")
|
||||
],
|
||||
outputs=[gr.outputs.HTML()],
|
||||
live=False # Submit button will appear
|
||||
)
|
||||
iface.launch()
|
||||
|
@ -1,14 +0,0 @@
|
||||
/*
|
||||
A ping command that replies with "Pong!" when bot is running.
|
||||
*/
|
||||
|
||||
const { SlashCommandBuilder } = require("discord.js");
|
||||
|
||||
module.exports = {
|
||||
data: new SlashCommandBuilder()
|
||||
.setName("ping")
|
||||
.setDescription("Replies with Pong!"),
|
||||
async execute(interaction) {
|
||||
await interaction.reply("Pong!");
|
||||
},
|
||||
};
|
@ -1,10 +0,0 @@
|
||||
const { SlashCommandBuilder } = require('discord.js');
|
||||
|
||||
module.exports = {
|
||||
data: new SlashCommandBuilder()
|
||||
.setName("server")
|
||||
.setDescription("Replies with server name and member count."),
|
||||
async execute(interaction) {
|
||||
await interaction.reply(`Server name: ${interaction.guild.name}\nTotal members: ${interaction.guild.memberCount}`);
|
||||
},
|
||||
};
|
@ -0,0 +1,6 @@
|
||||
from swarms.models.bing_chat import BingChat
|
||||
# Initialize the EdgeGPTModel
|
||||
bing = BingChat(cookies_path="./cookies.json")
|
||||
task = "generate topics for PositiveMed.com,: 1. Monitor Health Trends: Scan Google Alerts, authoritative health websites, and social media for emerging health, wellness, and medical discussions. 2. Keyword Research: Utilize tools like SEMrush to identify keywords with moderate to high search volume and low competition. Focus on long-tail, conversational keywords. 3. Analyze Site Data: Review PositiveMed's analytics to pinpoint popular articles and areas lacking recent content. 4. Crowdsourcing: Gather topic suggestions from the brand's audience and internal team, ensuring alignment with PositiveMed's mission. 5. Topic Evaluation: Assess topics for audience relevance, uniqueness, brand fit, current relevance, and SEO potential. 6. Tone and Style: Ensure topics can be approached with an educational, empowering, and ethical tone, in line with the brand's voice. Use this framework to generate a list of potential topics that cater to PositiveMed's audience while staying true to its brand ethos. Find trending topics for slowing and reversing aging think step by step and o into as much detail as possible"
|
||||
response = bing(task)
|
||||
print(response)
|
@ -0,0 +1,15 @@
|
||||
from swarms.models.bing_chat import BingChat
|
||||
from swarms.workers.worker import Worker
|
||||
from swarms.tools.autogpt import EdgeGPTTool, tool
|
||||
from swarms.models import OpenAIChat
|
||||
import os
|
||||
|
||||
load_dotenv("../.env")
|
||||
auth_cookie = os.environ.get("AUTH_COOKIE")
|
||||
auth_cookie_SRCHHPGUSR = os.environ.get("AUTH_COOKIE_SRCHHPGUSR")
|
||||
|
||||
# Initialize the EdgeGPTModel
|
||||
bing = BingChat(cookies_path="./cookies.json", auth_cookie_SRCHHPGUSR)
|
||||
task = "generate topics for PositiveMed.com,: 1. Monitor Health Trends: Scan Google Alerts, authoritative health websites, and social media for emerging health, wellness, and medical discussions. 2. Keyword Research: Utilize tools like SEMrush to identify keywords with moderate to high search volume and low competition. Focus on long-tail, conversational keywords. 3. Analyze Site Data: Review PositiveMed's analytics to pinpoint popular articles and areas lacking recent content. 4. Crowdsourcing: Gather topic suggestions from the brand's audience and internal team, ensuring alignment with PositiveMed's mission. 5. Topic Evaluation: Assess topics for audience relevance, uniqueness, brand fit, current relevance, and SEO potential. 6. Tone and Style: Ensure topics can be approached with an educational, empowering, and ethical tone, in line with the brand's voice. Use this framework to generate a list of potential topics that cater to PositiveMed's audience while staying true to its brand ethos. Find trending topics for slowing and reversing aging think step by step and o into as much detail as possible"
|
||||
|
||||
bing(task)
|
@ -0,0 +1,15 @@
|
||||
import os
|
||||
from swarms.models.bing_chat import BingChat
|
||||
from apps.discord import Bot
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# Initialize the EdgeGPTModel
|
||||
cookie = os.environ.get("BING_COOKIE")
|
||||
auth = os.environ.get("AUTH_COOKIE")
|
||||
bing = BingChat(cookies_path="./cookies.json")
|
||||
|
||||
bot = Bot(llm=bing)
|
||||
bot.generate_image(imggen=bing.create_img(auth_cookie=cookie, auth_cookie_SRCHHPGUSR=auth))
|
||||
bot.send_text(use_agent=False)
|
@ -1,32 +0,0 @@
|
||||
from swarms.models.bing_chat import BingChat
|
||||
from swarms.workers.worker import Worker
|
||||
from swarms.tools.autogpt import EdgeGPTTool, tool
|
||||
from swarms.models import OpenAIChat
|
||||
import os
|
||||
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
|
||||
# Initialize the EdgeGPTModel
|
||||
edgegpt = BingChat(cookies_path="./cookies.txt")
|
||||
|
||||
|
||||
@tool
|
||||
def edgegpt(task: str = None):
|
||||
"""A tool to run infrence on the EdgeGPT Model"""
|
||||
return EdgeGPTTool.run(task)
|
||||
|
||||
|
||||
# Initialize the language model,
|
||||
# This model can be swapped out with Anthropic, ETC, Huggingface Models like Mistral, ETC
|
||||
llm = OpenAIChat(
|
||||
openai_api_key=api_key,
|
||||
temperature=0.5,
|
||||
)
|
||||
|
||||
# Initialize the Worker with the custom tool
|
||||
worker = Worker(llm=llm, ai_name="EdgeGPT Worker", external_tools=[edgegpt])
|
||||
|
||||
# Use the worker to process a task
|
||||
task = "Hello, my name is ChatGPT"
|
||||
response = worker.run(task)
|
||||
print(response)
|
@ -0,0 +1,29 @@
|
||||
import os
|
||||
import sys
|
||||
from dotenv import load_dotenv
|
||||
from swarms.models.revgptV4 import RevChatGPTModelv4
|
||||
from swarms.models.revgptV1 import RevChatGPTModelv1
|
||||
|
||||
root_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
|
||||
sys.path.append(root_dir)
|
||||
|
||||
load_dotenv()
|
||||
|
||||
config = {
|
||||
"model": os.getenv("REVGPT_MODEL"),
|
||||
"plugin_ids": [os.getenv("REVGPT_PLUGIN_IDS")],
|
||||
"disable_history": os.getenv("REVGPT_DISABLE_HISTORY") == "True",
|
||||
"PUID": os.getenv("REVGPT_PUID"),
|
||||
"unverified_plugin_domains": [os.getenv("REVGPT_UNVERIFIED_PLUGIN_DOMAINS")],
|
||||
}
|
||||
|
||||
# For v1 model
|
||||
model = RevChatGPTModelv1(access_token=os.getenv("ACCESS_TOKEN"), **config)
|
||||
# model = RevChatGPTModelv4(access_token=os.getenv("ACCESS_TOKEN"), **config)
|
||||
|
||||
# For v3 model
|
||||
# model = RevChatGPTModel(access_token=os.getenv("OPENAI_API_KEY"), **config)
|
||||
|
||||
task = "Write a cli snake game"
|
||||
response = model.run(task)
|
||||
print(response)
|
@ -0,0 +1,55 @@
|
||||
from vllm import LLM, SamplingParams
|
||||
import openai
|
||||
import ray
|
||||
import uvicorn
|
||||
from vllm.entrypoints import api_server as vllm_api_server
|
||||
from vllm.entrypoints.openai import api_server as openai_api_server
|
||||
from skypilot import SkyPilot
|
||||
|
||||
class VLLMModel:
|
||||
def __init__(self, model_name="facebook/opt-125m", tensor_parallel_size=1):
|
||||
self.model_name = model_name
|
||||
self.tensor_parallel_size = tensor_parallel_size
|
||||
self.model = LLM(model_name, tensor_parallel_size=tensor_parallel_size)
|
||||
self.temperature = 1.0
|
||||
self.max_tokens = None
|
||||
self.sampling_params = SamplingParams(temperature=self.temperature)
|
||||
|
||||
def generate_text(self, prompt: str) -> str:
|
||||
output = self.model.generate([prompt], self.sampling_params)
|
||||
return output[0].outputs[0].text
|
||||
|
||||
def set_temperature(self, value: float):
|
||||
self.temperature = value
|
||||
self.sampling_params = SamplingParams(temperature=self.temperature)
|
||||
|
||||
def set_max_tokens(self, value: int):
|
||||
self.max_tokens = value
|
||||
self.sampling_params = SamplingParams(temperature=self.temperature, max_tokens=self.max_tokens)
|
||||
|
||||
def offline_batched_inference(self, prompts: list) -> list:
|
||||
outputs = self.model.generate(prompts, self.sampling_params)
|
||||
return [output.outputs[0].text for output in outputs]
|
||||
|
||||
def start_api_server(self):
|
||||
uvicorn.run(vllm_api_server.app, host="0.0.0.0", port=8000)
|
||||
|
||||
def start_openai_compatible_server(self):
|
||||
uvicorn.run(openai_api_server.app, host="0.0.0.0", port=8000)
|
||||
|
||||
def query_openai_compatible_server(self, prompt: str):
|
||||
openai.api_key = "EMPTY"
|
||||
openai.api_base = "http://localhost:8000/v1"
|
||||
completion = openai.Completion.create(model=self.model_name, prompt=prompt)
|
||||
return completion
|
||||
|
||||
def distributed_inference(self, prompt: str):
|
||||
ray.init()
|
||||
self.model = LLM(self.model_name, tensor_parallel_size=self.tensor_parallel_size)
|
||||
output = self.model.generate(prompt, self.sampling_params)
|
||||
ray.shutdown()
|
||||
return output[0].outputs[0].text
|
||||
|
||||
def run_on_cloud_with_skypilot(self, yaml_file):
|
||||
sky = SkyPilot()
|
||||
sky.launch(yaml_file)
|
Loading…
Reference in new issue