Spaces:
Running
Running
# This code is based on the following example: | |
# https://discordpy.readthedocs.io/en/stable/quickstart.html#a-minimal-bot | |
import os | |
import discord | |
from discord import app_commands | |
from discord.ext import commands | |
from threading import Thread | |
import json | |
from horde import HordeAPI | |
import inspect | |
# 创建一个字典,将规则类型映射到相应的条件检查函数 | |
check_functions = { | |
"equals": lambda content, message: message.content == content, | |
"contains": lambda content, message: content in message.content, | |
"startswith": lambda content, message: message.content.startswith(content), | |
"endswith": lambda content, message: message.content.endswith(content) | |
} | |
# 定义bot和tree | |
intents = discord.Intents.default() | |
intents.message_content = True | |
bot = commands.Bot(command_prefix='>', intents=intents) | |
tree = bot.tree | |
# 读取json | |
with open("discord.json", "r") as f: | |
json_data = json.load(f) | |
# 自动生成调用函数 | |
COMMAND_NAME_PREFIX="discord_bot_call_" | |
FILE_CONTENT="\n\n".join([ | |
f"async def {COMMAND_NAME_PREFIX}{command['name']}" | |
+"(interaction: discord.Interaction" | |
+''.join([f", {param['name']}: {param['type']}" for param in command["parameters"]]) | |
+f"):\n\ | |
await interaction.response.defer()\n\ | |
result = await {command['function']}(" | |
+', '.join([f"{param['name']}={param['name']}" for param in command["parameters"]]) | |
+f")\n\ | |
if result is not None:\n\ | |
await interaction.followup.send(result)\n\ | |
tree.add_command(app_commands.Command(\ | |
name=\"{command['name']}\", \ | |
description=\"{command['description']}\", \ | |
callback={COMMAND_NAME_PREFIX}{command['name']}\ | |
))" | |
for command in json_data["command"]]) | |
exec(FILE_CONTENT) | |
async def greet(name: str): | |
return f"Hello, {name}!" | |
async def get_kudos(): | |
async with HordeAPI.getUserDetails() as details: | |
if "kudos" not in details: | |
return f'Error: {details["code"]} {details["reason"]}' | |
return f'The amount of Kudos this user has is {details["kudos"]}' | |
async def generate_status(id: str): | |
async with HordeAPI.generateCheck(id) as details: | |
if "kudos" not in details: | |
return f'Check Error: {details["code"]} {details["reason"]}' | |
if bool(details["is_possible"]) is False: | |
return "This generation is impossible." | |
if bool(details["faulted"]) is True: | |
return "This generation is faulted." | |
if bool(details["done"]) is True: | |
async with HordeAPI.generateStatus(id) as generation_detail: | |
if "generations" not in generation_detail: | |
return f'Status Error: {generation_detail["code"]} {generation_detail["reason"]}' | |
for i in range(len(generation_detail["generations"])): | |
return generation_detail["generations"][i]["img"] | |
if int(details["processing"]) > 0: | |
total = int(details["finished"]) | |
+ int(details["processing"]) | |
+ int(details["queue_position"]) | |
+ int(details["restarted"]) | |
+ int(details["waiting"]) | |
return f'Processing image: {details["processing"]}/{total}' | |
return f'Position in queue: {details["queue_position"]}, wait time: {details["wait_time"]}s' | |
async def ping(ctx): | |
await ctx.send('pong') | |
async def on_ready(): | |
await tree.sync() | |
await bot.tree.sync() | |
print('We have logged in as {0.user}'.format(bot)) | |
async def on_message(message): | |
if message.author == bot.user: | |
return | |
for rule in json_data["message"]: | |
rule_type = rule["type"] | |
content = rule["content"] | |
# 根据规则类型动态调用对应的判断函数 | |
if check_functions.get(rule_type, lambda c, m: False)(content, message): | |
# 如果规则指定了函数,则调用对应的函数 | |
if "function" in rule: | |
function_name = rule["function"] | |
function = globals()[function_name] | |
result = await function(message) | |
await message.channel.send(result) | |
# 否则发送预定义的响应消息 | |
elif "response" in rule: | |
await message.channel.send(rule["response"]) | |
# 确保命令系统正常工作 | |
await bot.process_commands(message) | |
async def sendMessageToChannelHelper(data): | |
channel = await bot.fetch_channel(os.environ.get("CHANNEL_ID")) | |
# 创建一个 embed 对象 | |
mTitle = "Empty Title" | |
if "id" in data: | |
mTitle = data["id"] | |
if "log_tag" in data: | |
mTitle = data["log_tag"] | |
mDescription = "Empty Description" | |
if "model" in data: | |
mDescription = data["model"] | |
if "log_message" in data: | |
mDescription = data["log_message"] | |
mColor = 0x00ff00 | |
if ("log_tag" in data or "log_message" in data) and (data["log_level"] == "assert" or data["log_level"] == "error"): | |
mColor = 0xff0000 | |
embed = discord.Embed(title=mTitle, description=mDescription, color=mColor) | |
# 将 fields 数据加入 embed | |
for field in data: | |
if field == "img": | |
embed.set_image(url=data[field]) | |
else: | |
embed.add_field(name=field, value=data[field], inline=True) | |
# 发送 embed 消息 | |
await channel.send(embed=embed) | |
def sendMessageToChannel(data): | |
bot.loop.create_task(sendMessageToChannelHelper(data)) | |
def run(): | |
try: | |
token = os.environ.get("TOKEN") or "" | |
if token == "": | |
raise Exception("Please add your token to the Secrets pane.") | |
bot.run(token) | |
except discord.HTTPException as e: | |
if e.status == 429: | |
print( | |
"The Discord servers denied the connection for making too many requests" | |
) | |
print( | |
"Get help from https://stackoverflow.com/questions/66724687/in-discord-py-how-to-solve-the-error-for-toomanyrequests" | |
) | |
else: | |
raise e | |
def discord_bot(): | |
print("Running discord_bot") | |
run() | |