|
import discord |
|
import os |
|
import threading |
|
import gradio as gr |
|
import requests |
|
import json |
|
import random |
|
import time |
|
import re |
|
from discord import Embed, Color |
|
from discord.ext import commands |
|
|
|
from gradio_client import Client |
|
from PIL import Image |
|
from ratelimiter import RateLimiter |
|
|
|
from datetime import datetime |
|
from pytz import timezone |
|
|
|
import asyncio |
|
|
|
zurich_tz = timezone("Europe/Zurich") |
|
|
|
def convert_to_timezone(dt, tz): |
|
return dt.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S %Z") |
|
|
|
DFIF_TOKEN = os.getenv('HF_TOKEN') |
|
df = Client("huggingface-projects/IF", DFIF_TOKEN) |
|
sdlu = Client("huggingface-projects/stable-diffusion-latent-upscaler", DFIF_TOKEN) |
|
|
|
DISCORD_TOKEN = os.environ.get("LUNARBOT_TOKEN", None) |
|
intents = discord.Intents.default() |
|
intents.message_content = True |
|
|
|
bot = commands.Bot(command_prefix='!', intents=intents) |
|
|
|
rate_limiter = RateLimiter(max_calls=10, period=60) |
|
|
|
|
|
|
|
class ButtonView(discord.ui.View): |
|
def __init__(self, ctx, image_paths, stage_1_result_path): |
|
super().__init__() |
|
self.ctx = ctx |
|
self.image_paths = image_paths |
|
self.stage_1_result_path = stage_1_result_path |
|
|
|
async def on_timeout(self): |
|
for child in self.children: |
|
child.disabled = True |
|
self.stop() |
|
|
|
@discord.ui.button(label='Image 1', style=discord.ButtonStyle.blurple) |
|
async def image1_button(self, button: discord.ui.Button, interaction: discord.Interaction): |
|
await self.ctx.invoke(self.ctx.bot.get_command('dfif2'), image_path=self.image_paths[0], stage_1_result_path=self.stage_1_result_path) |
|
self.stop() |
|
|
|
@discord.ui.button(label='Image 2', style=discord.ButtonStyle.blurple) |
|
async def image2_button(self, button: discord.ui.Button, interaction: discord.Interaction): |
|
await self.ctx.invoke(self.ctx.bot.get_command('dfif2'), image_path=self.image_paths[1], stage_1_result_path=self.stage_1_result_path) |
|
self.stop() |
|
|
|
@discord.ui.button(label='Image 3', style=discord.ButtonStyle.blurple) |
|
async def image3_button(self, button: discord.ui.Button, interaction: discord.Interaction): |
|
await self.ctx.invoke(self.ctx.bot.get_command('dfif2'), image_path=self.image_paths[2], stage_1_result_path=self.stage_1_result_path) |
|
self.stop() |
|
|
|
@discord.ui.button(label='Image 4', style=discord.ButtonStyle.blurple) |
|
async def image4_button(self, button: discord.ui.Button, interaction: discord.Interaction): |
|
await self.ctx.invoke(self.ctx.bot.get_command('dfif2'), image_path=self.image_paths[3], stage_1_result_path=self.stage_1_result_path) |
|
self.stop() |
|
|
|
|
|
def create_button_row(ctx, image_paths, stage_1_result_path): |
|
view = ButtonView(ctx, image_paths, stage_1_result_path) |
|
return view |
|
|
|
|
|
@bot.event |
|
async def on_ready(): |
|
print('Logged on as', bot.user) |
|
bot.log_channel = bot.get_channel(1107006391547342910) |
|
|
|
@bot.event |
|
async def on_message_edit(before, after): |
|
if before.author == bot.user: |
|
return |
|
|
|
if before.content != after.content: |
|
embed = Embed(color=Color.orange()) |
|
embed.set_author(name=f"{before.author} ID: {before.author.id}", icon_url=before.author.avatar.url) |
|
embed.title = "Message Edited" |
|
embed.description = f"**Before:** {before.content or '*(empty message)*'}\n**After:** {after.content or '*(empty message)*'}" |
|
embed.add_field(name="Author Username", value=before.author.name, inline=True) |
|
embed.add_field(name="Channel", value=before.channel.mention, inline=True) |
|
|
|
embed.add_field(name="Message Created On", value=convert_to_timezone(before.created_at, zurich_tz), inline=True) |
|
embed.add_field(name="Message ID", value=before.id, inline=True) |
|
embed.add_field(name="Message Jump URL", value=f"[Jump to message!](https://discord.com/channels/{before.guild.id}/{before.channel.id}/{before.id})", inline=True) |
|
|
|
if before.attachments: |
|
attachment_urls = "\n".join([attachment.url for attachment in before.attachments]) |
|
embed.add_field(name="Attachments", value=attachment_urls, inline=False) |
|
|
|
|
|
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") |
|
|
|
await bot.log_channel.send(embed=embed) |
|
|
|
@bot.event |
|
async def on_message_delete(message): |
|
if message.author == bot.user: |
|
return |
|
|
|
embed = Embed(color=Color.red()) |
|
embed.set_author(name=f"{message.author} ID: {message.author.id}", icon_url=message.author.avatar.url) |
|
embed.title = "Message Deleted" |
|
embed.description = message.content or "*(empty message)*" |
|
embed.add_field(name="Author Username", value=message.author.name, inline=True) |
|
embed.add_field(name="Channel", value=message.channel.mention, inline=True) |
|
|
|
embed.add_field(name="Message Created On", value=convert_to_timezone(message.created_at, zurich_tz), inline=True) |
|
embed.add_field(name="Message ID", value=message.id, inline=True) |
|
embed.add_field(name="Message Jump URL", value=f"[Jump to message!](https://discord.com/channels/{message.guild.id}/{message.channel.id}/{message.id})", inline=True) |
|
|
|
if message.attachments: |
|
attachment_urls = "\n".join([attachment.url for attachment in message.attachments]) |
|
embed.add_field(name="Attachments", value=attachment_urls, inline=False) |
|
|
|
|
|
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") |
|
|
|
await bot.log_channel.send(embed=embed) |
|
|
|
|
|
|
|
|
|
|
|
|
|
@bot.event |
|
async def on_voice_state_update(member, before, after): |
|
if before.channel != after.channel: |
|
|
|
embed = Embed(description=f'{member} moved in voice chat from {before.channel} to {after.channel}', color=Color.blue()) |
|
await bot.log_channel.send(embed=embed) |
|
|
|
if before.mute != after.mute: |
|
|
|
embed = Embed(description=f'{member} was {"muted" if after.mute else "unmuted"} in voice chat', color=Color.orange()) |
|
await bot.log_channel.send(embed=embed) |
|
|
|
if before.deaf != after.deaf: |
|
|
|
embed = Embed(description=f'{member} was {"deafened" if after.deaf else "undeafened"} in voice chat', color=Color.orange()) |
|
await bot.log_channel.send(embed=embed) |
|
|
|
@bot.event |
|
async def on_member_update(before, after): |
|
if before.nick != after.nick: |
|
|
|
embed = Embed(description=f'{before} changed their nickname to {after.nick}', color=Color.blue()) |
|
await bot.log_channel.send(embed=embed) |
|
|
|
@bot.event |
|
async def on_guild_channel_create(channel): |
|
|
|
embed = Embed(description=f'Channel {channel.mention} was created', color=Color.green()) |
|
await bot.log_channel.send(embed=embed) |
|
|
|
@bot.event |
|
async def on_guild_channel_delete(channel): |
|
|
|
embed = Embed(description=f'Channel {channel.name} ({channel.mention}) was deleted', color=Color.red()) |
|
await bot.log_channel.send(embed=embed) |
|
|
|
@bot.event |
|
async def on_guild_role_create(role): |
|
|
|
embed = Embed(description=f'Role {role.mention} was created', color=Color.green()) |
|
await bot.log_channel.send(embed=embed) |
|
|
|
@bot.event |
|
async def on_guild_role_delete(role): |
|
|
|
embed = Embed(description=f'Role {role.name} ({role.mention}) was deleted', color=Color.red()) |
|
await bot.log_channel.send(embed=embed) |
|
|
|
@bot.event |
|
async def on_guild_role_update(before, after): |
|
|
|
if before.name != after.name: |
|
embed = Embed(description=f'Role {before.mention} was renamed to {after.name}', color=Color.orange()) |
|
await bot.log_channel.send(embed=embed) |
|
|
|
if before.permissions.administrator != after.permissions.administrator: |
|
|
|
certain_role_id = 1106995261487710411 |
|
certain_role = after.guild.get_role(certain_role_id) |
|
embed = Embed(description=f'Role {after.mention} had its administrator permission {"enabled" if after.permissions.administrator else "disabled"}', color=Color.red()) |
|
await bot.log_channel.send(content=certain_role.mention, embed=embed) |
|
|
|
@bot.command() |
|
@commands.cooldown(1, 5, commands.BucketType.user) |
|
async def deepfloydif(ctx, *, prompt: str): |
|
try: |
|
prompt = prompt.strip()[:100] |
|
prompt = re.sub(r'[^\w\s]', '', prompt) |
|
|
|
with rate_limiter: |
|
number_of_images = 4 |
|
current_time = int(time.time()) |
|
random.seed(current_time) |
|
seed = random.randint(0, 2**32 - 1) |
|
stage_1_results, stage_1_param_path, stage_1_result_path = df.predict(prompt, "blur", seed, number_of_images, 7.0, 'smart100', 50, api_name="/generate64") |
|
png_files = [f for f in os.listdir(stage_1_results) if f.endswith('.png')] |
|
|
|
if png_files: |
|
first_png = png_files[0] |
|
second_png = png_files[1] |
|
third_png = png_files[2] |
|
fourth_png = png_files[3] |
|
|
|
first_png_path = os.path.join(stage_1_results, first_png) |
|
second_png_path = os.path.join(stage_1_results, second_png) |
|
third_png_path = os.path.join(stage_1_results, third_png) |
|
fourth_png_path = os.path.join(stage_1_results, fourth_png) |
|
|
|
img1 = Image.open(first_png_path) |
|
img2 = Image.open(second_png_path) |
|
img3 = Image.open(third_png_path) |
|
img4 = Image.open(fourth_png_path) |
|
|
|
combined_image = Image.new('RGB', (img1.width * 2, img1.height * 2)) |
|
|
|
combined_image.paste(img1, (0, 0)) |
|
combined_image.paste(img2, (img1.width, 0)) |
|
combined_image.paste(img3, (0, img1.height)) |
|
combined_image.paste(img4, (img1.width, img1.height)) |
|
|
|
combined_image_path = os.path.join(stage_1_results, 'combined_image.png') |
|
combined_image.save(combined_image_path) |
|
|
|
|
|
|
|
|
|
await ctx.reply('Here is the combined image. Select an option quickly!') |
|
with open(combined_image_path, 'rb') as f: |
|
await ctx.send(file=discord.File(f, 'combined_image.png'), view=create_button_row(ctx, [first_png_path, second_png_path, third_png_path, fourth_png_path], stage_1_result_path)) |
|
|
|
except Exception as e: |
|
print(f"Error: {e}") |
|
await ctx.reply('An error occurred while processing your request. Please wait 5 seconds before retrying.') |
|
|
|
|
|
|
|
@bot.command() |
|
@commands.cooldown(1, 5, commands.BucketType.user) |
|
async def dfif2(ctx, image_path, stage_1_result_path): |
|
try: |
|
selected_index_for_stage_2 = 0 |
|
seed_2 = 0 |
|
guidance_scale_2 = 4 |
|
custom_timesteps_2 = 'smart50' |
|
number_of_inference_steps_2 = 50 |
|
result_path = df.predict(stage_1_result_path, selected_index_for_stage_2, seed_2, guidance_scale_2, custom_timesteps_2, number_of_inference_steps_2, api_name='/upscale256') |
|
|
|
|
|
|
|
with open(result_path, 'rb') as f: |
|
await ctx.reply('Here is the result of the second stage', file=discord.File(f, 'result.png')) |
|
|
|
except Exception as e: |
|
print(f"Error: {e}") |
|
await ctx.reply('An error occurred while processing stage 2 upscaling. Please try again later.') |
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_bot(): |
|
bot.run(DISCORD_TOKEN) |
|
|
|
threading.Thread(target=run_bot).start() |
|
|
|
def greet(name): |
|
return "Hello " + name + "!" |
|
|
|
demo = gr.Interface(fn=greet, inputs="text", outputs="text") |
|
demo.launch() |