huggingbots / app.py
lunarflu's picture
lunarflu HF Staff
image paths -> dfif2
b9dc6ec
raw
history blame
19.7 kB
import discord
import os
import threading
import gradio as gr
import requests
import json
import random
import time
import re
from discord import Embed, Color
from discord.ext import commands
# restart #unstick
from gradio_client import Client
from PIL import Image
from ratelimiter import RateLimiter
#
from datetime import datetime
from pytz import timezone
#
import asyncio
zurich_tz = timezone("Europe/Zurich")
def convert_to_timezone(dt, tz):
return dt.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S %Z")
DFIF_TOKEN = os.getenv('HF_TOKEN')
df = Client("huggingface-projects/IF", DFIF_TOKEN)
sdlu = Client("huggingface-projects/stable-diffusion-latent-upscaler", DFIF_TOKEN)
DISCORD_TOKEN = os.environ.get("GRADIOTEST_TOKEN", None)
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix='!', intents=intents)
rate_limiter = RateLimiter(max_calls=10, period=60) # 10 calls per minute
#buttons----------------------------------------------------------------------------------------------------------------------------------------------
#new
class ButtonView(discord.ui.View):
def __init__(self, ctx, image_paths, stage_1_result_path):
super().__init__()
self.ctx = ctx
self.image_paths = image_paths
self.stage_1_result_path = stage_1_result_path
async def on_timeout(self):
for child in self.children:
child.disabled = True
self.stop()
async def button_handler(self, index: int):
await self.ctx.invoke(self.ctx.bot.get_command('dfif2'), index=index, stage_1_result_path=self.stage_1_result_path, image_paths=self.image_paths)
self.stop()
@discord.ui.button(label='Image 1', style=discord.ButtonStyle.blurple)
async def image1_button(self, button: discord.ui.Button, interaction: discord.Interaction):
await self.button_handler(0)
@discord.ui.button(label='Image 2', style=discord.ButtonStyle.blurple)
async def image2_button(self, button: discord.ui.Button, interaction: discord.Interaction):
await self.button_handler(1)
@discord.ui.button(label='Image 3', style=discord.ButtonStyle.blurple)
async def image3_button(self, button: discord.ui.Button, interaction: discord.Interaction):
await self.button_handler(2)
@discord.ui.button(label='Image 4', style=discord.ButtonStyle.blurple)
async def image4_button(self, button: discord.ui.Button, interaction: discord.Interaction):
await self.button_handler(3)
#new
def create_button_row(ctx, image_paths, stage_1_result_path):
view = ButtonView(ctx, image_paths, stage_1_result_path)
return view
#----------------------------------------------------------------------------------------------------------------------------------------------
@bot.event
async def on_ready():
print('Logged on as', bot.user)
bot.log_channel = bot.get_channel(1100458786826747945) # 1100458786826747945 = bot-test, 1107006391547342910 = lunarbot server
@bot.event
async def on_message_edit(before, after):
if before.author == bot.user:
return
if before.content != after.content:
embed = Embed(color=Color.orange())
embed.set_author(name=f"{before.author} ID: {before.author.id}", icon_url=before.author.avatar.url)
embed.title = "Message Edited"
embed.description = f"**Before:** {before.content or '*(empty message)*'}\n**After:** {after.content or '*(empty message)*'}"
embed.add_field(name="Author Username", value=before.author.name, inline=True)
embed.add_field(name="Channel", value=before.channel.mention, inline=True)
#embed.add_field(name="Message Created On", value=before.created_at.strftime("%Y-%m-%d %H:%M:%S UTC"), inline=True)
embed.add_field(name="Message Created On", value=convert_to_timezone(before.created_at, zurich_tz), inline=True)
embed.add_field(name="Message ID", value=before.id, inline=True)
embed.add_field(name="Message Jump URL", value=f"[Jump to message!](https://discord.com/channels/{before.guild.id}/{before.channel.id}/{before.id})", inline=True)
if before.attachments:
attachment_urls = "\n".join([attachment.url for attachment in before.attachments])
embed.add_field(name="Attachments", value=attachment_urls, inline=False)
#embed.set_footer(text=f"{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
await bot.log_channel.send(embed=embed)
@bot.event
async def on_message_delete(message):
if message.author == bot.user:
return
embed = Embed(color=Color.red())
embed.set_author(name=f"{message.author} ID: {message.author.id}", icon_url=message.author.avatar.url)
embed.title = "Message Deleted"
embed.description = message.content or "*(empty message)*"
embed.add_field(name="Author Username", value=message.author.name, inline=True)
embed.add_field(name="Channel", value=message.channel.mention, inline=True)
#embed.add_field(name="Message Created On", value=message.created_at.strftime("%Y-%m-%d %H:%M:%S UTC"), inline=True)
embed.add_field(name="Message Created On", value=convert_to_timezone(message.created_at, zurich_tz), inline=True)
embed.add_field(name="Message ID", value=message.id, inline=True)
embed.add_field(name="Message Jump URL", value=f"[Jump to message!](https://discord.com/channels/{message.guild.id}/{message.channel.id}/{message.id})", inline=True)
if message.attachments:
attachment_urls = "\n".join([attachment.url for attachment in message.attachments])
embed.add_field(name="Attachments", value=attachment_urls, inline=False)
#embed.set_footer(text=f"{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
await bot.log_channel.send(embed=embed)
@bot.event
async def on_member_ban(guild, user):
# User bans
logs = await guild.audit_logs(limit=1, action=discord.AuditLogAction.ban).flatten()
ban_entry = logs[0] if logs else None
if ban_entry:
reason = ban_entry.reason
embed = Embed(description=f'User {user.name} ({user.mention}) was banned', color=Color.red())
embed.add_field(name='Reason', value=reason, inline=False)
else:
embed = Embed(description=f'User {user.name} ({user.mention}) was banned', color=Color.red())
await bot.log_channel.send(embed=embed)
@bot.event
async def on_member_ban(guild, user):
# User kicks (temporary bans)
logs = await guild.audit_logs(limit=1, action=discord.AuditLogAction.ban).flatten()
kick_entry = next((log for log in logs if log.target == user), None)
if kick_entry:
reason = kick_entry.reason
embed = Embed(description=f'User {user.name} ({user.mention}) was kicked', color=Color.red())
embed.add_field(name='Reason', value=reason, inline=False)
else:
embed = Embed(description=f'User {user.name} ({user.mention}) was kicked', color=Color.red())
await bot.log_channel.send(embed=embed)
#new
@bot.event
async def on_voice_state_update(member, before, after):
if before.channel != after.channel:
# Moving members in voice chat
embed = Embed(description=f'{member} moved in voice chat from {before.channel} to {after.channel}', color=Color.blue())
await bot.log_channel.send(embed=embed)
if before.mute != after.mute:
# Muting members in voice chat
embed = Embed(description=f'{member} was {"muted" if after.mute else "unmuted"} in voice chat', color=Color.orange())
await bot.log_channel.send(embed=embed)
if before.deaf != after.deaf:
# Deafening members in voice chat
embed = Embed(description=f'{member} was {"deafened" if after.deaf else "undeafened"} in voice chat', color=Color.orange())
await bot.log_channel.send(embed=embed)
@bot.event
async def on_member_update(before, after):
if before.nick == None and after.nick != None and after.nick != '':
# User creates a nickname
embed = Embed(description=f'{before} created the nickname {after.nick}', color=Color.blue())
await bot.log_channel.send(embed=embed)
elif before.nick != None and (after.nick == None or after.nick == ''):
# User removes their nickname
embed = Embed(description=f'{before} removed their nickname', color=Color.blue())
await bot.log_channel.send(embed=embed)
elif before.nick != after.nick:
# User changes their nickname
embed = Embed(description=f'{before} changed their nickname to {after.nick}', color=Color.blue())
await bot.log_channel.send(embed=embed)
@bot.event
async def on_guild_channel_create(channel):
# Channel creations
embed = Embed(description=f'Channel #{channel.name} ({channel.mention}) was created', color=Color.green())
await bot.log_channel.send(embed=embed)
@bot.event
async def on_guild_channel_delete(channel):
# Channel deletions
embed = Embed(description=f'Channel #{channel.name} ({channel.mention}) was deleted', color=Color.red())
await bot.log_channel.send(embed=embed)
@bot.event
async def on_guild_role_create(role):
# Creating roles
embed = Embed(description=f'Role {role.mention} was created', color=Color.green())
await bot.log_channel.send(embed=embed)
@bot.event
async def on_guild_role_delete(role):
# Deleting roles
embed = Embed(description=f'Role {role.name} ({role.mention}) was deleted', color=Color.red())
await bot.log_channel.send(embed=embed)
@bot.event
async def on_guild_role_update(before, after):
# Editing roles
if before.name != after.name:
embed = Embed(description=f'Role {before.mention} was renamed to {after.name}', color=Color.orange())
await bot.log_channel.send(embed=embed)
if before.permissions.kick_members != after.permissions.kick_members:
# Changes involving the kick_members permission
certain_role_id = 1108342563628404747 # alerts role- opt in
certain_role = after.guild.get_role(certain_role_id)
embed = Embed(description=f'Role {after.mention} had its kick_members permission {"enabled" if after.permissions.kick_members else "disabled"}', color=Color.red())
await bot.log_channel.send(content=certain_role.mention, embed=embed)
if before.permissions.ban_members != after.permissions.ban_members:
# Changes involving the ban_members permission
certain_role_id = 1108342563628404747 # alerts role- opt in
certain_role = after.guild.get_role(certain_role_id)
embed = Embed(description=f'Role {after.mention} had its ban_members permission {"enabled" if after.permissions.ban_members else "disabled"}', color=Color.red())
await bot.log_channel.send(content=certain_role.mention, embed=embed)
if before.permissions.administrator != after.permissions.administrator:
# Changes involving the administrator permission
certain_role_id = 1108342563628404747 # alerts role- opt in
certain_role = after.guild.get_role(certain_role_id)
embed = Embed(description=f'Role {after.mention} had its administrator permission {"enabled" if after.permissions.administrator else "disabled"}', color=Color.red())
await bot.log_channel.send(content=certain_role.mention, embed=embed)
if before.permissions.manage_channels != after.permissions.manage_channels:
# Changes involving the manage_channels permission
certain_role_id = 1108342563628404747 # alerts role- opt in
certain_role = after.guild.get_role(certain_role_id)
embed = Embed(description=f'Role {after.mention} had its manage_channels permission {"enabled" if after.permissions.manage_channels else "disabled"}', color=Color.red())
await bot.log_channel.send(content=certain_role.mention, embed=embed)
if before.permissions.manage_guild != after.permissions.manage_guild:
# Changes involving the manage_guild permission
certain_role_id = 1108342563628404747 # alerts role- opt in
certain_role = after.guild.get_role(certain_role_id)
embed = Embed(description=f'Role {after.mention} had its manage_guild permission {"enabled" if after.permissions.manage_guild else "disabled"}', color=Color.red())
await bot.log_channel.send(content=certain_role.mention, embed=embed)
if before.permissions.manage_messages != after.permissions.manage_messages:
# Changes involving the manage_messages permission
certain_role_id = 1108342563628404747 # alerts role- opt in
certain_role = after.guild.get_role(certain_role_id)
embed = Embed(description=f'Role {after.mention} had its manage_messages permission {"enabled" if after.permissions.manage_messages else "disabled"}', color=Color.red())
await bot.log_channel.send(content=certain_role.mention, embed=embed)
if before.permissions.manage_roles != after.permissions.manage_roles:
# Changes involving the manage_roles permission
certain_role_id = 1108342563628404747 # alerts role- opt in
certain_role = after.guild.get_role(certain_role_id)
embed = Embed(description=f'Role {after.mention} had its manage_roles permission {"enabled" if after.permissions.manage_roles else "disabled"}', color=Color.red())
await bot.log_channel.send(content=certain_role.mention, embed=embed)
if before.permissions.manage_webhooks != after.permissions.manage_webhooks:
# Changes involving the manage_webhooks permission
certain_role_id = 1108342563628404747 # alerts role- opt in
certain_role = after.guild.get_role(certain_role_id)
embed = Embed(description=f'Role {after.mention} had its manage_webhooks permission {"enabled" if after.permissions.manage_webhooks else "disabled"}', color=Color.red())
await bot.log_channel.send(content=certain_role.mention, embed=embed)
if before.permissions.manage_guild_expressions != after.permissions.manage_guild_expressions:
# Changes involving the manage_guild_expressions permission
certain_role_id = 1108342563628404747 # alerts role- opt in
certain_role = after.guild.get_role(certain_role_id)
embed = Embed(description=f'Role {after.mention} had its manage_guild_expressions permission {"enabled" if after.permissions.manage_guild_expressions else "disabled"}', color=Color.red())
await bot.log_channel.send(content=certain_role.mention, embed=embed)
if before.permissions.manage_threads != after.permissions.manage_threads:
# Changes involving the manage_threads permission
certain_role_id = 1108342563628404747 # alerts role- opt in
certain_role = after.guild.get_role(certain_role_id)
embed = Embed(description=f'Role {after.mention} had its manage_threads permission {"enabled" if after.permissions.manage_threads else "disabled"}', color=Color.red())
await bot.log_channel.send(content=certain_role.mention, embed=embed)
if before.permissions.moderate_members != after.permissions.moderate_members:
# Changes involving the moderate_members permission
certain_role_id = 1108342563628404747 # alerts role- opt in
certain_role = after.guild.get_role(certain_role_id)
embed = Embed(description=f'Role {after.mention} had its moderate_members permission {"enabled" if after.permissions.moderate_members else "disabled"}', color=Color.red())
await bot.log_channel.send(content=certain_role.mention, embed=embed)
@bot.command()
@commands.cooldown(1, 5, commands.BucketType.user)
async def deepfloydif(ctx, *, prompt: str):
try:
prompt = prompt.strip()[:100] # Limit the prompt length to 100 characters
prompt = re.sub(r'[^\w\s]', '', prompt) # Remove special characters
with rate_limiter:
number_of_images = 4
current_time = int(time.time())
random.seed(current_time)
seed = random.randint(0, 2**32 - 1)
stage_1_results, stage_1_param_path, stage_1_result_path = df.predict(prompt, "blur", seed, number_of_images, 7.0, 'smart100', 50, api_name="/generate64")
png_files = [f for f in os.listdir(stage_1_results) if f.endswith('.png')]
if png_files:
first_png = png_files[0]
second_png = png_files[1]
third_png = png_files[2]
fourth_png = png_files[3]
first_png_path = os.path.join(stage_1_results, first_png)
second_png_path = os.path.join(stage_1_results, second_png)
third_png_path = os.path.join(stage_1_results, third_png)
fourth_png_path = os.path.join(stage_1_results, fourth_png)
img1 = Image.open(first_png_path)
img2 = Image.open(second_png_path)
img3 = Image.open(third_png_path)
img4 = Image.open(fourth_png_path)
combined_image = Image.new('RGB', (img1.width * 2, img1.height * 2))
combined_image.paste(img1, (0, 0))
combined_image.paste(img2, (img1.width, 0))
combined_image.paste(img3, (0, img1.height))
combined_image.paste(img4, (img1.width, img1.height))
combined_image_path = os.path.join(stage_1_results, 'combined_image.png')
combined_image.save(combined_image_path)
# Trigger the second stage prediction
#await dfif2(ctx, stage_1_result_path)
await ctx.reply('Here is the combined image. Select an option quickly!')
with open(combined_image_path, 'rb') as f:
await ctx.send(file=discord.File(f, 'combined_image.png'), view=create_button_row(ctx, [first_png_path, second_png_path, third_png_path, fourth_png_path], stage_1_result_path))
except Exception as e:
print(f"Error: {e}")
await ctx.reply('An error occurred while processing your request. Please wait 5 seconds before retrying.')
#new stage 2----------------------------------------------------------------------------------------------------------------------------------------------
# Stage 2
@bot.command()
@commands.cooldown(1, 5, commands.BucketType.user)
async def dfif2(ctx, index: int, stage_1_result_path, image_paths):
try:
image_path = image_paths[index]
selected_index_for_stage_2 = image_path
seed_2 = 0
guidance_scale_2 = 4
custom_timesteps_2 = 'smart50'
number_of_inference_steps_2 = 50
result_path = df.predict(stage_1_result_path, selected_index_for_stage_2, seed_2, guidance_scale_2, custom_timesteps_2, number_of_inference_steps_2, api_name='/upscale256')
# Process the result_path or perform any additional operations
with open(result_path, 'rb') as f:
await ctx.reply('Here is the result of the second stage', file=discord.File(f, 'result.png'))
except Exception as e:
print(f"Error: {e}")
await ctx.reply('An error occurred while processing stage 2 upscaling. Please try again later.')
def run_bot():
bot.run(DISCORD_TOKEN)
threading.Thread(target=run_bot).start()
def greet(name):
return "Hello " + name + "!"
demo = gr.Interface(fn=greet, inputs="text", outputs="text")
demo.launch()