| from __future__ import annotations
|
|
|
| import re
|
| import traceback
|
|
|
| import discord
|
| from discord.ext import commands
|
|
|
| from bot.emojis import (
|
| CUSTOM_EMOJIS, _UI_ALIASES, FALLBACK_EMOJIS, _EMOJI_BOT,
|
| _EMOJI_ID_RE, _normalize_key
|
| )
|
|
|
|
|
| class Developer(commands.Cog):
|
| def __init__(self, bot: commands.Bot) -> None:
|
| self.bot = bot
|
|
|
|
|
|
|
|
|
| @commands.command(name="load", help="Load a cog extension by dotted path.")
|
| async def load(self, ctx: commands.Context, extension: str) -> None:
|
| await self.bot.load_extension(extension)
|
| await ctx.reply(f"Loaded {extension}")
|
|
|
| @commands.command(name="unload", help="Unload a cog extension by dotted path.")
|
| async def unload(self, ctx: commands.Context, extension: str) -> None:
|
| await self.bot.unload_extension(extension)
|
| await ctx.reply(f"Unloaded {extension}")
|
|
|
| @commands.command(name="reload", help="Reload a cog extension by dotted path.")
|
| async def reload(self, ctx: commands.Context, extension: str) -> None:
|
| await self.bot.reload_extension(extension)
|
| await ctx.reply(f"Reloaded {extension}")
|
|
|
| @commands.command(name="sync", help="Sync global slash commands to Discord.")
|
| async def sync(self, ctx: commands.Context) -> None:
|
| synced = await self.bot.tree.sync()
|
| await ctx.reply(f"Synced {len(synced)} app commands")
|
|
|
| @commands.command(name="shutdown", help="Gracefully shut down the bot process.")
|
| async def shutdown(self, ctx: commands.Context) -> None:
|
| await ctx.reply("Shutting down...")
|
| await self.bot.close()
|
|
|
| @commands.hybrid_command(name="emoji_scan", description="Show broken/unresolved custom emojis")
|
| async def emoji_scan(self, ctx: commands.Context) -> None:
|
| """Scan all configured custom emojis and show which ones are broken."""
|
| if not self.bot.is_ready() or not self.bot.user:
|
| await ctx.send("β³ Bot is not ready yet.", ephemeral=True)
|
| return
|
|
|
| bot_emojis = {e.id: e for e in self.bot.emojis}
|
| broken: list[tuple[str, str, str]] = []
|
| working: list[tuple[str, str]] = []
|
| total = 0
|
|
|
|
|
| for key, value in CUSTOM_EMOJIS.items():
|
| total += 1
|
| emoji_id = self._extract_id(value)
|
| if emoji_id and emoji_id not in bot_emojis:
|
| broken.append((key, value, "emoji not in bot cache"))
|
| elif not emoji_id:
|
| broken.append((key, value, "no valid emoji ID found"))
|
| else:
|
| working.append((key, value))
|
|
|
|
|
| checked_aliases: set[str] = set()
|
| for ui_key, aliases in _UI_ALIASES.items():
|
| for alias in aliases:
|
| norm = _normalize_key(alias)
|
| if norm in checked_aliases:
|
| continue
|
| checked_aliases.add(norm)
|
| total += 1
|
| alias_value = CUSTOM_EMOJIS.get(norm, "")
|
| if not alias_value:
|
|
|
| broken.append((f"ui:{ui_key}β{alias}", alias_value or "(none)", "alias has no configured emoji"))
|
| continue
|
| aid = self._extract_id(alias_value)
|
| if aid and aid not in bot_emojis:
|
| broken.append((f"ui:{ui_key}β{alias}", alias_value, "aliased emoji not in cache"))
|
|
|
|
|
| seen_keys: set[str] = set()
|
| unique_broken: list[tuple[str, str, str]] = []
|
| for key, value, reason in broken:
|
| if key not in seen_keys:
|
| seen_keys.add(key)
|
| unique_broken.append((key, value, reason))
|
| broken = unique_broken
|
|
|
|
|
| color = discord.Color.red() if broken else discord.Color.green()
|
| embed = discord.Embed(
|
| title="π Bot Emoji Health Check",
|
| description=(
|
| f"**Total configured:** `{total}`\n"
|
| f"**β
Working:** `{len(working) + len(checked_aliases) - len(broken)}`\n"
|
| f"**β Broken:** `{len(broken)}`\n"
|
| f"**Bot cache size:** `{len(bot_emojis)}`"
|
| ),
|
| color=color,
|
| )
|
|
|
| if broken:
|
|
|
| by_reason: dict[str, list[tuple[str, str]]] = {}
|
| for key, value, reason in broken:
|
| by_reason.setdefault(reason, []).append((key, value))
|
|
|
| for reason, items in list(by_reason.items())[:10]:
|
| names = ", ".join(f"`{k}`" for k, v in items[:15])
|
| if len(items) > 15:
|
| names += f" (+{len(items) - 15} more)"
|
| embed.add_field(
|
| name=f"β {reason} ({len(items)})",
|
| value=names[:1000],
|
| inline=False,
|
| )
|
|
|
| embed.set_footer(text="Run this after the bot joins new servers to refresh emoji cache")
|
|
|
| try:
|
| if ctx.interaction:
|
| if ctx.interaction.response.is_done():
|
| await ctx.interaction.followup.send(embed=embed, ephemeral=True)
|
| else:
|
| await ctx.interaction.response.send_message(embed=embed, ephemeral=True)
|
| else:
|
| await ctx.send(embed=embed, ephemeral=True)
|
| except discord.InteractionResponded:
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send(embed=embed, ephemeral=True)
|
|
|
| @staticmethod
|
| def _extract_id(value: str) -> int | None:
|
| """Extract numeric emoji ID from a custom emoji tag."""
|
| if not value:
|
| return None
|
| m = _EMOJI_ID_RE.search(value)
|
| if m:
|
| return int(m.group(0))
|
| if value.strip().isdigit():
|
| return int(value.strip())
|
| return None
|
|
|
| async def _safe_reply(self, ctx: commands.Context, message: str) -> None:
|
| try:
|
| await ctx.reply(message)
|
| except (discord.NotFound, discord.InteractionResponded):
|
| if ctx.channel:
|
| await ctx.channel.send(message)
|
| except discord.HTTPException as exc:
|
| if exc.code not in {10062, 40060}:
|
| raise
|
| if ctx.channel:
|
| await ctx.channel.send(message)
|
|
|
| @commands.Cog.listener()
|
| async def on_command_error(self, ctx: commands.Context, error: Exception) -> None:
|
| if not await self.bot.is_owner(ctx.author):
|
| return
|
| if isinstance(error, commands.CheckFailure):
|
| return
|
| tb = "".join(traceback.format_exception(type(error), error, error.__traceback__))
|
| await self._safe_reply(ctx, f"```py\n{tb[-1800:]}\n```")
|
|
|
|
|
| async def setup(bot: commands.Bot) -> None:
|
| await bot.add_cog(Developer(bot))
|
|
|