huggingbots / app.py
lunarflu's picture
lunarflu HF Staff
lunarbot -> gradiotest
c41bcc3
raw
history blame
12.8 kB
import discord
import os
import threading
import gradio as gr
import requests
import json
import random
import time
import re
from discord import Embed, Color
from discord.ext import commands
# running?
from gradio_client import Client
from PIL import Image
from ratelimiter import RateLimiter
#
from datetime import datetime
from pytz import timezone
#
import asyncio
zurich_tz = timezone("Europe/Zurich")
def convert_to_timezone(dt, tz):
return dt.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S %Z")
DFIF_TOKEN = os.getenv('HF_TOKEN')
df = Client("huggingface-projects/IF", DFIF_TOKEN)
sdlu = Client("huggingface-projects/stable-diffusion-latent-upscaler", DFIF_TOKEN)
DISCORD_TOKEN = os.environ.get("LUNARBOT_TOKEN", None)
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix='!', intents=intents)
rate_limiter = RateLimiter(max_calls=10, period=60) # 10 calls per minute
#buttons----------------------------------------------------------------------------------------------------------------------------------------------
#new
class ButtonView(discord.ui.View):
def __init__(self, ctx, image_paths, stage_1_result_path):
super().__init__()
self.ctx = ctx
self.image_paths = image_paths
self.stage_1_result_path = stage_1_result_path
async def on_timeout(self):
for child in self.children:
child.disabled = True
self.stop()
@discord.ui.button(label='Image 1', style=discord.ButtonStyle.blurple)
async def image1_button(self, button: discord.ui.Button, interaction: discord.Interaction):
await self.ctx.invoke(self.ctx.bot.get_command('dfif2'), image_path=self.image_paths[0], stage_1_result_path=self.stage_1_result_path)
self.stop()
@discord.ui.button(label='Image 2', style=discord.ButtonStyle.blurple)
async def image2_button(self, button: discord.ui.Button, interaction: discord.Interaction):
await self.ctx.invoke(self.ctx.bot.get_command('dfif2'), image_path=self.image_paths[1], stage_1_result_path=self.stage_1_result_path)
self.stop()
@discord.ui.button(label='Image 3', style=discord.ButtonStyle.blurple)
async def image3_button(self, button: discord.ui.Button, interaction: discord.Interaction):
await self.ctx.invoke(self.ctx.bot.get_command('dfif2'), image_path=self.image_paths[2], stage_1_result_path=self.stage_1_result_path)
self.stop()
@discord.ui.button(label='Image 4', style=discord.ButtonStyle.blurple)
async def image4_button(self, button: discord.ui.Button, interaction: discord.Interaction):
await self.ctx.invoke(self.ctx.bot.get_command('dfif2'), image_path=self.image_paths[3], stage_1_result_path=self.stage_1_result_path)
self.stop()
#new
def create_button_row(ctx, image_paths, stage_1_result_path):
view = ButtonView(ctx, image_paths, stage_1_result_path)
return view
#----------------------------------------------------------------------------------------------------------------------------------------------
@bot.event
async def on_ready():
print('Logged on as', bot.user)
bot.log_channel = bot.get_channel(1107006391547342910) # 1100458786826747945 = bot-test, 1107006391547342910 = lunarbot server
@bot.event
async def on_message_edit(before, after):
if before.author == bot.user:
return
if before.content != after.content:
embed = Embed(color=Color.orange())
embed.set_author(name=f"{before.author} ID: {before.author.id}", icon_url=before.author.avatar.url)
embed.title = "Message Edited"
embed.description = f"**Before:** {before.content or '*(empty message)*'}\n**After:** {after.content or '*(empty message)*'}"
embed.add_field(name="Author Username", value=before.author.name, inline=True)
embed.add_field(name="Channel", value=before.channel.mention, inline=True)
#embed.add_field(name="Message Created On", value=before.created_at.strftime("%Y-%m-%d %H:%M:%S UTC"), inline=True)
embed.add_field(name="Message Created On", value=convert_to_timezone(before.created_at, zurich_tz), inline=True)
embed.add_field(name="Message ID", value=before.id, inline=True)
embed.add_field(name="Message Jump URL", value=f"[Jump to message!](https://discord.com/channels/{before.guild.id}/{before.channel.id}/{before.id})", inline=True)
if before.attachments:
attachment_urls = "\n".join([attachment.url for attachment in before.attachments])
embed.add_field(name="Attachments", value=attachment_urls, inline=False)
#embed.set_footer(text=f"{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
await bot.log_channel.send(embed=embed)
@bot.event
async def on_message_delete(message):
if message.author == bot.user:
return
embed = Embed(color=Color.red())
embed.set_author(name=f"{message.author} ID: {message.author.id}", icon_url=message.author.avatar.url)
embed.title = "Message Deleted"
embed.description = message.content or "*(empty message)*"
embed.add_field(name="Author Username", value=message.author.name, inline=True)
embed.add_field(name="Channel", value=message.channel.mention, inline=True)
#embed.add_field(name="Message Created On", value=message.created_at.strftime("%Y-%m-%d %H:%M:%S UTC"), inline=True)
embed.add_field(name="Message Created On", value=convert_to_timezone(message.created_at, zurich_tz), inline=True)
embed.add_field(name="Message ID", value=message.id, inline=True)
embed.add_field(name="Message Jump URL", value=f"[Jump to message!](https://discord.com/channels/{message.guild.id}/{message.channel.id}/{message.id})", inline=True)
if message.attachments:
attachment_urls = "\n".join([attachment.url for attachment in message.attachments])
embed.add_field(name="Attachments", value=attachment_urls, inline=False)
#embed.set_footer(text=f"{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
await bot.log_channel.send(embed=embed)
#new
@bot.event
async def on_voice_state_update(member, before, after):
if before.channel != after.channel:
# Moving members in voice chat
embed = Embed(description=f'{member} moved in voice chat from {before.channel} to {after.channel}', color=Color.blue())
await bot.log_channel.send(embed=embed)
if before.mute != after.mute:
# Muting members in voice chat
embed = Embed(description=f'{member} was {"muted" if after.mute else "unmuted"} in voice chat', color=Color.orange())
await bot.log_channel.send(embed=embed)
if before.deaf != after.deaf:
# Deafening members in voice chat
embed = Embed(description=f'{member} was {"deafened" if after.deaf else "undeafened"} in voice chat', color=Color.orange())
await bot.log_channel.send(embed=embed)
@bot.event
async def on_member_update(before, after):
if before.nick != after.nick:
# Nickname changes
embed = Embed(description=f'{before} changed their nickname to {after.nick}', color=Color.blue())
await bot.log_channel.send(embed=embed)
@bot.event
async def on_guild_channel_create(channel):
# Channel creations
embed = Embed(description=f'Channel {channel.mention} was created', color=Color.green())
await bot.log_channel.send(embed=embed)
@bot.event
async def on_guild_channel_delete(channel):
# Channel deletions
embed = Embed(description=f'Channel {channel.name} ({channel.mention}) was deleted', color=Color.red())
await bot.log_channel.send(embed=embed)
@bot.event
async def on_guild_role_create(role):
# Creating roles
embed = Embed(description=f'Role {role.mention} was created', color=Color.green())
await bot.log_channel.send(embed=embed)
@bot.event
async def on_guild_role_delete(role):
# Deleting roles
embed = Embed(description=f'Role {role.name} ({role.mention}) was deleted', color=Color.red())
await bot.log_channel.send(embed=embed)
@bot.event
async def on_guild_role_update(before, after):
# Editing roles
if before.name != after.name:
embed = Embed(description=f'Role {before.mention} was renamed to {after.name}', color=Color.orange())
await bot.log_channel.send(embed=embed)
if before.permissions.administrator != after.permissions.administrator:
# Changes involving the administrator permission
certain_role_id = 1106995261487710411 # Replace with the actual role ID
certain_role = after.guild.get_role(certain_role_id)
embed = Embed(description=f'Role {after.mention} had its administrator permission {"enabled" if after.permissions.administrator else "disabled"}', color=Color.red())
await bot.log_channel.send(content=certain_role.mention, embed=embed)
@bot.command()
@commands.cooldown(1, 5, commands.BucketType.user)
async def deepfloydif(ctx, *, prompt: str):
try:
prompt = prompt.strip()[:100] # Limit the prompt length to 100 characters
prompt = re.sub(r'[^\w\s]', '', prompt) # Remove special characters
with rate_limiter:
number_of_images = 4
current_time = int(time.time())
random.seed(current_time)
seed = random.randint(0, 2**32 - 1)
stage_1_results, stage_1_param_path, stage_1_result_path = df.predict(prompt, "blur", seed, number_of_images, 7.0, 'smart100', 50, api_name="/generate64")
png_files = [f for f in os.listdir(stage_1_results) if f.endswith('.png')]
if png_files:
first_png = png_files[0]
second_png = png_files[1]
third_png = png_files[2]
fourth_png = png_files[3]
first_png_path = os.path.join(stage_1_results, first_png)
second_png_path = os.path.join(stage_1_results, second_png)
third_png_path = os.path.join(stage_1_results, third_png)
fourth_png_path = os.path.join(stage_1_results, fourth_png)
img1 = Image.open(first_png_path)
img2 = Image.open(second_png_path)
img3 = Image.open(third_png_path)
img4 = Image.open(fourth_png_path)
combined_image = Image.new('RGB', (img1.width * 2, img1.height * 2))
combined_image.paste(img1, (0, 0))
combined_image.paste(img2, (img1.width, 0))
combined_image.paste(img3, (0, img1.height))
combined_image.paste(img4, (img1.width, img1.height))
combined_image_path = os.path.join(stage_1_results, 'combined_image.png')
combined_image.save(combined_image_path)
# Trigger the second stage prediction
#await dfif2(ctx, stage_1_result_path)
await ctx.reply('Here is the combined image. Select an option quickly!')
with open(combined_image_path, 'rb') as f:
await ctx.send(file=discord.File(f, 'combined_image.png'), view=create_button_row(ctx, [first_png_path, second_png_path, third_png_path, fourth_png_path], stage_1_result_path))
except Exception as e:
print(f"Error: {e}")
await ctx.reply('An error occurred while processing your request. Please wait 5 seconds before retrying.')
#new stage 2----------------------------------------------------------------------------------------------------------------------------------------------
# Stage 2
@bot.command()
@commands.cooldown(1, 5, commands.BucketType.user)
async def dfif2(ctx, image_path, stage_1_result_path):
try:
selected_index_for_stage_2 = 0
seed_2 = 0
guidance_scale_2 = 4
custom_timesteps_2 = 'smart50'
number_of_inference_steps_2 = 50
result_path = df.predict(stage_1_result_path, selected_index_for_stage_2, seed_2, guidance_scale_2, custom_timesteps_2, number_of_inference_steps_2, api_name='/upscale256')
# Process the result_path or perform any additional operations
with open(result_path, 'rb') as f:
await ctx.reply('Here is the result of the second stage', file=discord.File(f, 'result.png'))
except Exception as e:
print(f"Error: {e}")
await ctx.reply('An error occurred while processing stage 2 upscaling. Please try again later.')
def run_bot():
bot.run(DISCORD_TOKEN)
threading.Thread(target=run_bot).start()
def greet(name):
return "Hello " + name + "!"
demo = gr.Interface(fn=greet, inputs="text", outputs="text")
demo.launch()