huggingbots / app.py
lunarflu's picture
lunarflu HF Staff
antispam
2a9e01b
raw
history blame
13.4 kB
import discord
from discord.ui import Button
from discord.ext import commands #for buttons
import gradio_client
from gradio_client import Client
import gradio as gr
import os
import threading
#for deepfloydif
import requests
import json
import random
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import time
import asyncio
# random + small llama #
#todos
#alert
#fix error on first command on bot startup
#stable diffusion upscale
#buttons for deepfloydIF (1,2,3,4)
#application commands instead of message content checks (more user-friendly)
DFIF_TOKEN = os.getenv('DFIF_TOKEN')
#deepfloydIF
#df = Client("DeepFloyd/IF", DFIF_TOKEN) #not reliable at the moment
df = Client("huggingface-projects/IF", DFIF_TOKEN)
#stable diffusion upscaler
sdlu = Client("huggingface-projects/stable-diffusion-latent-upscaler", DFIF_TOKEN)
#---------------------------------------------------------------------------------------------------------------------------------------------------
class ButtonView(discord.ui.View):
def __init__(self, ctx, image_paths, stage_1_result_path):
super().__init__()
self.ctx = ctx
self.image_paths = image_paths
self.stage_1_result_path = stage_1_result_path
async def on_timeout(self):
for child in self.children:
child.disabled = True
self.stop()
#-------------------------------------------
async def invoke_dfif2(self, image_path):
ctx = await self.get_context(message, cls=commands.Context)
await self.ctx.invoke(self.ctx.bot.get_command('dfif2'), image_path=image_path, stage_1_result_path=self.stage_1_result_path)
@discord.ui.button(label='Image 1', style=discord.ButtonStyle.blurple)
async def image1_button(self, button: discord.ui.Button, interaction: discord.Interaction):
await self.invoke_dfif2(self.image_paths[0])
self.stop()
@discord.ui.button(label='Image 2', style=discord.ButtonStyle.blurple)
async def image2_button(self, button: discord.ui.Button, interaction: discord.Interaction):
await self.invoke_dfif2(self.image_paths[1])
self.stop()
@discord.ui.button(label='Image 3', style=discord.ButtonStyle.blurple)
async def image3_button(self, button: discord.ui.Button, interaction: discord.Interaction):
await self.invoke_dfif2(self.image_paths[2])
self.stop()
@discord.ui.button(label='Image 4', style=discord.ButtonStyle.blurple)
async def image4_button(self, button: discord.ui.Button, interaction: discord.Interaction):
await self.invoke_dfif2(self.image_paths[3])
self.stop()
#---------------------------------------------------------------------------------------------------------------------------------------------------
#---------------------------------------------------------------------------------------------------------------------------------------------------
# Set up discord bot
class MyClient(discord.Client):
guild = 879548962464493619
OFFLINE_ROLE_ID = 1103676632667017266
bot_member = guild.get_member(self.user.id)
if any(role.id == OFFLINE_ROLE_ID for role in bot_member.roles):
return
async def on_ready(self):
print('Logged on as', self.user)
self.log_channel = self.get_channel(1036960509586587689) # 1100458786826747945 = bot test
async def on_message(self, message):
#safety checks----------------------------------------------------------------------------------------------------
# tldr, bot should run if
#1) it does not have @offline role
#2) user has @verified role
#3) bot is in #bot-test channel
# bot won't respond to itself, prevents feedback loop + API spam
if message.author == self.user:
return
#antispam---------------------------------------------------
if "discord.gg/" in message.content or "discordapp.com/invite/" in message.content:
try:
await message.delete()
await asyncio.sleep(5) # safety mechanism, can only delete slowly
except Exception as e:
print(f"An error occurred while deleting the message: {e}")
await asyncio.sleep(5)
elif "@everyone" in message.content or "@here" in message.content:
try:
await message.delete()
await asyncio.sleep(5) # safety mechanism, can only delete slowly
# if the bot has this role, it won't run
OFFLINE_ROLE_ID = 1103676632667017266 # 1103676632667017266 = @offline / under maintenance
guild = message.guild
bot_member = guild.get_member(self.user.id)
if any(role.id == OFFLINE_ROLE_ID for role in bot_member.roles):
return
# the message author needs this role in order to use the bot
REQUIRED_ROLE_ID = 897376942817419265 # 900063512829755413 = @verified, 897376942817419265 = @huggingfolks
if not any(role.id == REQUIRED_ROLE_ID for role in message.author.roles):
return
# channels where bot will accept commands
ALLOWED_CHANNEL_IDS = [1100458786826747945] # 1100458786826747945 = #bot-test
if message.channel.id not in ALLOWED_CHANNEL_IDS:
return
#deepfloydif----------------------------------------------------------------------------------------------------
if message.content.startswith('!deepfloydif'): # change to application commands, more intuitive
#(prompt, negative_prompt, seed, number_of_images, guidance_scale,custom_timesteps_1, number_of_inference_steps, api_name="/generate64")
#-> (stage_1_results, stage_1_param_path, stage_1_result_path)
# input prompt
prompt = message.content[12:].strip()
negative_prompt = ''
seed = 0
number_of_images = 4
guidance_scale = 7
custom_timesteps_1 = 'smart50'
number_of_inference_steps = 50
stage_1_results, stage_1_param_path, stage_1_result_path = df.predict(
prompt,
negative_prompt,
seed,
number_of_images,
guidance_scale,
custom_timesteps_1,
number_of_inference_steps,
api_name='/generate64')
#stage_1_results, stage_1_param_path, stage_1_result_path = df.predict("gradio written on a wall", "blur", 1,1,7.0, 'smart100',50, api_name="/generate64")
# stage_1_results -> path to directory with png files, so we isolate those
png_files = [f for f in os.listdir(stage_1_results) if f.endswith('.png')]
# merge images into larger, 2x2 image the way midjourney does it
if png_files:
first_png = png_files[0]
second_png = png_files[1]
third_png = png_files[2]
fourth_png = png_files[3]
'''
[],[],[],[] -> [][]
[][]
'''
first_png_path = os.path.join(stage_1_results, first_png)
second_png_path = os.path.join(stage_1_results, second_png)
third_png_path = os.path.join(stage_1_results, third_png)
fourth_png_path = os.path.join(stage_1_results, fourth_png)
img1 = Image.open(first_png_path)
img2 = Image.open(second_png_path)
img3 = Image.open(third_png_path)
img4 = Image.open(fourth_png_path)
# create a new blank image with the size of the combined images (2x2)
combined_image = Image.new('RGB', (img1.width * 2, img1.height * 2))
# paste the individual images into the combined image
combined_image.paste(img1, (0, 0))
combined_image.paste(img2, (img1.width, 0))
combined_image.paste(img3, (0, img1.height))
combined_image.paste(img4, (img1.width, img1.height))
# save the combined image
combined_image_path = os.path.join(stage_1_results, 'combined_image.png')
combined_image.save(combined_image_path)
# Send the combined image file as a discord attachment with the button view
with open(combined_image_path, 'rb') as f:
view = ButtonView(ctx, [first_png_path, second_png_path, third_png_path, fourth_png_path], stage_1_result_path)
await message.reply('Here is the combined image', file=discord.File(f, 'combined_image.png'), view=view)
#stage 2, can be stable diffusion too---------------------------------------------------------------------------------------------------------------------------------------------------
async def dfif2(self, ctx, image_path, stage_1_result_path):
selected_index_for_stage_2 = 0
seed_2 = 0
guidance_scale_2 = 4
custom_timesteps_2 = 'smart50'
number_of_inference_steps_2 = 50
result_path = df.predict(stage_1_result_path, selected_index_for_stage_2, seed_2, guidance_scale_2, custom_timesteps_2, number_of_inference_steps_2, api_name='/upscale256')
with open(result_path, 'rb') as f:
await ctx.reply('Here is the result of the second stage', file=discord.File(f, 'result.png'))
#---------------------------------------------------------------------------------------------------------------------------------------------------
async def on_message_delete(self, message):
if message.author == self.user:
return
embed = Embed(color=Color.red())
embed.set_author(name=f"{message.author} ID: {message.author.id}", icon_url=message.author.avatar.url)
embed.title = "Message Deleted"
embed.description = message.content or "*(empty message)*"
embed.add_field(name="Author Username", value=message.author.name, inline=True)
embed.add_field(name="Channel", value=message.channel.mention, inline=True)
embed.add_field(name="Message Created On", value=convert_to_timezone(message.created_at, zurich_tz), inline=True)
embed.add_field(name="Message ID", value=message.id, inline=True)
embed.add_field(name="Message Jump URL", value=f"[Jump to message!](https://discord.com/channels/{message.guild.id}/{message.channel.id}/{message.id})", inline=True)
if message.attachments:
attachment_urls = "\n".join([attachment.url for attachment in message.attachments])
embed.add_field(name="Attachments", value=attachment_urls, inline=False)
#embed.set_footer(text=f"{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
await self.log_channel.send(embed=embed)
async def on_guild_channel_create(channel):
# Channel creations
embed = Embed(description=f'Channel {channel.mention} was created', color=Color.green())
await bot.log_channel.send(embed=embed)
async def on_guild_channel_delete(channel):
# Channel deletions
embed = Embed(description=f'Channel {channel.name} ({channel.mention}) was deleted', color=Color.red())
await bot.log_channel.send(embed=embed)
async def on_guild_role_create(role):
# Creating roles
embed = Embed(description=f'Role {role.mention} was created', color=Color.green())
await bot.log_channel.send(embed=embed)
async def on_guild_role_delete(role):
# Deleting roles
embed = Embed(description=f'Role {role.name} ({role.mention}) was deleted', color=Color.red())
await bot.log_channel.send(embed=embed)
async def on_guild_role_update(before, after):
# Editing roles
if before.name != after.name:
embed = Embed(description=f'Role {before.mention} was renamed to {after.name}', color=Color.orange())
await bot.log_channel.send(embed=embed)
if before.permissions.administrator != after.permissions.administrator:
# Changes involving the administrator permission
certain_role_id = 1106995261487710411 # Replace with the actual role ID
certain_role = after.guild.get_role(certain_role_id)
embed = Embed(description=f'Role {after.mention} had its administrator permission {"enabled" if after.permissions.administrator else "disabled"}', color=Color.red())
await bot.log_channel.send(content=certain_role.mention, embed=embed)
DISCORD_TOKEN = os.environ.get("GRADIOTEST_TOKEN", None)
intents = discord.Intents.default()
intents.message_content = True
client = MyClient(intents=intents)
def run_bot():
client.run(DISCORD_TOKEN)
threading.Thread(target=run_bot).start()
def greet(name):
return "Hello " + name + "!"
demo = gr.Interface(fn=greet, inputs="text", outputs="text")
demo.launch()