Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import discord | |
import os | |
import threading | |
import gradio as gr | |
import requests | |
import json | |
import random | |
import time | |
import re | |
from discord import Embed, Color | |
from discord.ext import commands | |
from gradio_client import Client | |
from PIL import Image | |
from ratelimiter import RateLimiter | |
from datetime import datetime, timedelta # for times | |
from pytz import timezone # for times | |
import asyncio # check if used | |
import logging | |
zurich_tz = timezone("Europe/Zurich") | |
def convert_to_timezone(dt, tz): | |
return dt.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S %Z") | |
DISCORD_TOKEN = os.environ.get("DISCORD_TOKEN", None) | |
intents = discord.Intents.all() | |
bot = commands.Bot(command_prefix='!', intents=intents, max_messages=100000) | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.DEBUG) | |
#rate_limiter = RateLimiter(max_calls=10, period=60) # needs testing | |
message_cache = {} | |
# stats stuff --------------------------------------------------------------------------------------------------------------------------------------------------------- | |
number_of_messages = 0 | |
user_cooldowns = {} | |
async def on_message(message): | |
try: | |
global number_of_messages | |
if message.author != bot.user: | |
lunarflu = bot.get_user(811235357663297546) #811235357663297546 | |
cakiki = bot.get_user(416019758492680203) | |
"""Backup""" | |
number_of_messages = number_of_messages + 1 | |
message_link = f"[#{message.channel.name}]({message.jump_url})" | |
msgcnt = message.content | |
backup_message = f"{number_of_messages} | {message_link} | {message.author}: {msgcnt}" | |
# check for attachments | |
if message.attachments: | |
for attachment in message.attachments: | |
attachment_url = attachment.url | |
backup_message += f"\nAttachment: {attachment_url}" | |
# check for embeds | |
if message.embeds: | |
for embed in message.embeds: | |
backup_message += f"\nEmbed Title: {embed.title}\nEmbed Description: {embed.description}" | |
dm_message = await lunarflu.send(backup_message) | |
"""Antispam""" | |
#Detecting certain unwanted strings | |
try: | |
forbidden_strings = ["@everyone", "@here", "discord.gg", "discord.com/invite", "discord.com", "discord-premium"] | |
if any(string.lower() in message.content.lower() for string in forbidden_strings): | |
ignored_role_ids = [897381378172264449, 897376942817419265] #admins, huggingfolks | |
if any(role.id in ignored_role_ids for role in message.author.roles): | |
if message.author != lunarflu: | |
return | |
dm_unwanted = await lunarflu.send(f" {lunarflu.mention} [experimental] SUSPICIOUS MESSAGE: {message_link} | {message.author}: {message.content}") | |
dm_unwanted = await cakiki.send(f" {cakiki.mention} [experimental] SUSPICIOUS MESSAGE: {message_link} | {message.author}: {message.content}") | |
except Exception as e: | |
print(f"Antispam->Detecting certain unwanted strings Error: {e}") | |
#Posting too fast | |
""" | |
cooldown_duration determines the time window within which the bot tracks a user's posting behavior. | |
This is useful for detecting "staggered" instances of spam, where 20-50 messages are sent 2-10s apart, | |
over timespans of typically a few minutes. | |
If a user hasn't posted anything for a duration longer than cooldown_duration, their record is cleared, | |
and they start fresh if they post again. | |
If a user posts within the cooldown_duration, their activity count is updated, | |
and their record persists until it exceeds the specified threshold or until the cooldown_duration window resets. | |
Increasing cooldown_duration = More robust at detecting "staggered" / "delayed" spam, but more false positives (fast chatters) | |
""" | |
# cooldown_duration = 3; false_positive_threshld = 3; -> 99% spam at spam_count of 10+ (could still be wrong, so we timeout) | |
cooldown_duration = 5 # messages per n seconds, was 1, now 3, could try 5 | |
false_positive_threshold = 5 # big = alert less (catch less spam), small = alert more (catch more spam) | |
timeout_threshold = 10 # number of messages before issuing a timeout (similar function to ban, easier to reverse) | |
timeout_duration = 168 # timeout duration in hours (1 week) | |
if message.author.id not in user_cooldowns: | |
user_cooldowns[message.author.id] = {'count': 1, 'timestamp': message.created_at} | |
else: | |
if (message.created_at - user_cooldowns[message.author.id]['timestamp']).total_seconds() > cooldown_duration: | |
var1 = message.created_at | |
var2 = user_cooldowns[message.author.id]['timestamp'] | |
print(f"seconds since last message by {message.author}: ({var1} - {var2}).seconds = {(var1 - var2).total_seconds()}") | |
# if we wait longer than cooldown_duration, count will reset | |
user_cooldowns[message.author.id] = {'count': 1, 'timestamp': message.created_at} | |
else: | |
user_cooldowns[message.author.id]['count'] += 1 | |
spam_count = user_cooldowns[message.author.id]['count'] | |
# tldr; if we post 2 messages with less than [cooldown_duration]seconds between them | |
if spam_count >= false_positive_threshold: # n in a row, helps avoid false positives for posting in threads | |
# warning for 5+ | |
channel = message.channel | |
if spam_count == false_positive_threshold: | |
if channel.id != 996580741121065091: # admin channel excluded due to how automod messages are categorized by the discord bot | |
await channel.send(f"{message.author.mention}, you may be posting too quickly! Please slow down a bit 🤗") | |
var1 = message.created_at | |
var2 = user_cooldowns[message.author.id]['timestamp'] | |
print(f"seconds since last message by {message.author}: {(var1 - var2).total_seconds()}") | |
print(f"spam_count: {spam_count}") | |
test_server = os.environ.get('TEST_SERVER') | |
if test_server == 'True': | |
alert = "<@&1106995261487710411>" # test @alerts role | |
if test_server == 'False': | |
alert = "<@&1108342563628404747>" # normal @alerts role | |
await bot.log_channel.send( | |
f"[EXPERIMENTAL ALERT] {message.author} may be posting too quickly! \n" | |
f"Spam count: {spam_count}\n" | |
f"Message content: {message.content}\n" | |
f"[Jump to message!](https://discord.com/channels/{message.guild.id}/{message.channel.id}/{message.id})\n" | |
f"{alert}" | |
) | |
await cakiki.send( | |
f"[EXPERIMENTAL ALERT] {message.author} may be posting too quickly! \n" | |
f"Spam count: {spam_count}\n" | |
f"Message content: {message.content}\n" | |
f"[Jump to message!](https://discord.com/channels/{message.guild.id}/{message.channel.id}/{message.id})\n" | |
) | |
# auto-ban | |
""" | |
if spam_count >= timeout_threshold: | |
try: | |
member = message.author | |
await member.send( | |
"You have been auto-banned for spamming. \n If this was an error, message <@811235357663297546> ." | |
) | |
await member.ban() | |
except Exception as e: | |
print(f"Error: {e}") | |
""" | |
user_cooldowns[message.author.id]['timestamp'] = message.created_at | |
await bot.process_commands(message) | |
except Exception as e: | |
print(f"on_message Error: {e}") | |
# moderation stuff----------------------------------------------------------------------------------------------------------------------------------------------------- | |
async def on_message_edit(before, after): | |
try: | |
if before.author == bot.user: | |
return | |
if before.content != after.content: | |
embed = Embed(color=Color.orange()) | |
embed.set_author(name=f"{before.author} ID: {before.author.id}", icon_url=before.author.avatar.url if before.author.avatar else bot.user.avatar.url) | |
embed.title = "Message Edited" | |
embed.description = f"**Before:** {before.content or '*(empty message)*'}\n**After:** {after.content or '*(empty message)*'}" | |
embed.add_field(name="Author Username", value=before.author.name, inline=True) | |
embed.add_field(name="Channel", value=before.channel.mention, inline=True) | |
#embed.add_field(name="Message Created On", value=before.created_at.strftime("%Y-%m-%d %H:%M:%S UTC"), inline=True) | |
embed.add_field(name="Message Created On", value=convert_to_timezone(before.created_at, zurich_tz), inline=True) | |
embed.add_field(name="Message ID", value=before.id, inline=True) | |
embed.add_field(name="Message Jump URL", value=f"[Jump to message!](https://discord.com/channels/{before.guild.id}/{before.channel.id}/{before.id})", inline=True) | |
if before.attachments: | |
attachment_urls = "\n".join([attachment.url for attachment in before.attachments]) | |
embed.add_field(name="Attachments", value=attachment_urls, inline=False) | |
#embed.set_footer(text=f"{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}") | |
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
await bot.log_channel.send(embed=embed) | |
except Exception as e: | |
print(f"on_message_edit Error: {e}") | |
async def on_raw_message_delete(payload): | |
try: | |
message_id = payload.message_id | |
channel_id = payload.channel_id | |
message = message_cache.pop(message_id, None) | |
if message: | |
if message.author == bot.user: | |
return | |
embed = Embed(color=Color.red()) | |
embed.set_author(name=f"{message.author} ID: {message.author.id}", icon_url=message.author.avatar.url if message.author.avatar else bot.user.avatar.url) | |
embed.title = "Message Deleted" | |
embed.description = message.content or "*(empty message)*" | |
embed.add_field(name="Author Username", value=message.author.name, inline=True) | |
embed.add_field(name="Channel", value=message.channel.mention, inline=True) | |
#embed.add_field(name="Message Created On", value=message.created_at.strftime("%Y-%m-%d %H:%M:%S UTC"), inline=True) | |
embed.add_field(name="Message Created On", value=convert_to_timezone(message.created_at, zurich_tz), inline=True) | |
embed.add_field(name="Message ID", value=message.id, inline=True) | |
embed.add_field(name="Message Jump URL", value=f"[Jump to message!](https://discord.com/channels/{message.guild.id}/{message.channel.id}/{message.id})", inline=True) | |
if message.attachments: | |
attachment_urls = "\n".join([attachment.url for attachment in message.attachments]) | |
embed.add_field(name="Attachments", value=attachment_urls, inline=False) | |
#embed.set_footer(text=f"{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}") | |
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
await bot.log_channel.send(embed=embed) | |
except Exception as e: | |
print(f"on_message_delete Error: {e}") | |
# nickname stuff --------------------------------------------------------------------------------------------------------------------------- | |
async def on_member_update(before, after): | |
try: | |
""" | |
if before.name != after.name: | |
async for entry in before.guild.audit_logs(limit=5): | |
print(f'{entry.user} did {entry.action} to {entry.target}') | |
""" | |
if before.nick != after.nick: | |
embed = Embed(color=Color.orange()) | |
embed.set_author(name=f"{after} ID: {after.id}", icon_url=after.avatar.url if after.avatar else bot.user.avatar.url) | |
embed.title = "Nickname Modified" | |
embed.add_field(name="Mention", value=after.mention, inline=True) | |
embed.add_field(name="Old", value=before.nick, inline=True) | |
embed.add_field(name="New", value=after.nick, inline=True) | |
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
await bot.log_channel.send(embed=embed) | |
except Exception as e: | |
print(f"on_member_update Error: {e}") | |
async def on_member_ban(guild, banned_user): | |
try: | |
await asyncio.sleep(1) | |
entry1 = await guild.fetch_ban(banned_user) | |
ban_reason = entry1.reason | |
print(f"ban_reason: {ban_reason}") | |
async for entry2 in guild.audit_logs(action=discord.AuditLogAction.ban, limit=1): | |
if ban_reason: | |
print(f'{entry2.user} banned {entry2.target} for {ban_reason}') | |
else: | |
print(f'{entry2.user} banned {entry2.target} (no reason specified)') | |
content = "<@&1108342563628404747>" # @alerts role | |
embed = Embed(color=Color.red()) | |
embed.set_author(name=f"{entry2.target} ID: {entry2.target.id}", icon_url=entry2.target.avatar.url if entry2.target.avatar else bot.user.avatar.url) | |
embed.title = "User Banned" | |
embed.add_field(name="User", value=entry2.target.mention, inline=True) | |
#nickname = entry2.target.nick if entry2.target.nick else "None" | |
#embed.add_field(name="Nickname", value=nicknmae, inline=True) | |
#embed.add_field(name="Account Created At", value=entry2.target.created_at, inline=True) | |
embed.add_field(name="Moderator", value=entry2.user.mention, inline=True) | |
embed.add_field(name="Nickname", value=entry2.user.nick, inline=True) | |
embed.add_field(name="Reason", value=ban_reason, inline=False) | |
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
#user = bot.get_user(811235357663297546) | |
dm_message = await banned_user.send(f"You've been banned from the Hugging Face Discord. To appeal, reach out to <@811235357663297546> via DM") | |
await bot.log_channel.send(content=content, embed=embed) | |
except Exception as e: | |
print(f"on_member_ban Error: {e}") | |
async def on_member_unban(guild, unbanned_user): | |
try: | |
await asyncio.sleep(5) | |
async for entry in guild.audit_logs(action=discord.AuditLogAction.unban, limit=1): | |
if unbanned_user == entry.target: # verify that unbanned user is in audit log | |
moderator = entry.user | |
created_and_age = f"{unbanned_user.created_at}" | |
content = "<@&1108342563628404747>" # @alerts role | |
embed = Embed(color=Color.red()) | |
embed.set_author(name=f"{unbanned_user} ID: {unbanned_user.id}", icon_url=unbanned_user.avatar.url if unbanned_user.avatar else bot.user.avatar.url) | |
embed.title = "User Unbanned" | |
embed.add_field(name="User", value=unbanned_user.mention, inline=True) | |
embed.add_field(name="Account Created At", value=created_and_age, inline=True) | |
embed.add_field(name="Moderator", value=moderator.mention, inline=True) | |
embed.add_field(name="Nickname", value=moderator.nick, inline=True) | |
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
#user = bot.get_user(811235357663297546) | |
#dm_message = await user.send(content=content, embed=embed) | |
await bot.log_channel.send(content=content, embed=embed) | |
except Exception as e: | |
print(f"on_member_unban Error: {e}") | |
# admin stuff----------------------------------------------------------------------------------------------------------------------- | |
async def on_member_join(member): | |
try: | |
await asyncio.sleep(5) | |
# initialize lvl1 on join | |
guild = bot.get_guild(879548962464493619) | |
lvl1 = guild.get_role(1171861537699397733) | |
member2 = guild.get_member(member.id) | |
await member2.add_roles(lvl1) | |
embed = Embed(color=Color.blue()) | |
avatar_url = member.avatar.url if member.avatar else bot.user.avatar.url | |
embed.set_author(name=f"{member} ID: {member.id}", icon_url=avatar_url) | |
embed.title = "User Joined" | |
embed.add_field(name="Mention", value=member.mention, inline=True) | |
embed.add_field(name="Nickname", value=member.nick, inline=True) | |
embed.add_field(name="Account Created At", value=member.created_at, inline=True) | |
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
await bot.log_channel.send(embed=embed) | |
except Exception as e: | |
print(f"on_member_join Error: {e}") | |
async def on_member_remove(member): | |
try: | |
embed = Embed(color=Color.blue()) | |
embed.set_author(name=f"{member} ID: {member.id}", icon_url=member.avatar.url if member.avatar else bot.user.avatar.url) | |
embed.title = "User Left" | |
embed.add_field(name="Mention", value=member.mention, inline=True) | |
embed.add_field(name="Nickname", value=member.nick, inline=True) | |
embed.add_field(name="Account Created At", value=member.created_at, inline=True) | |
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}") | |
await bot.log_channel.send(embed=embed) | |
except Exception as e: | |
print(f"on_member_remove Error: {e}") | |
async def on_guild_channel_create(channel): | |
try: | |
# creating channels | |
embed = Embed(description=f'Channel {channel.mention} was created', color=Color.green()) | |
await bot.log_channel.send(embed=embed) | |
except Exception as e: | |
print(f"on_guild_channel_create Error: {e}") | |
async def on_guild_channel_delete(channel): | |
try: | |
# deleting channels, should ping @alerts for this | |
embed = Embed(description=f'Channel {channel.name} ({channel.mention}) was deleted', color=Color.red()) | |
await bot.log_channel.send(embed=embed) | |
except Exception as e: | |
print(f"on_guild_channel_delete Error: {e}") | |
async def on_guild_role_create(role): | |
try: | |
# creating roles | |
embed = Embed(description=f'Role {role.mention} was created', color=Color.green()) | |
await bot.log_channel.send(embed=embed) | |
except Exception as e: | |
print(f"on_guild_role_create Error: {e}") | |
async def on_guild_role_delete(role): | |
try: | |
# deleting roles, should ping @alerts for this | |
embed = Embed(description=f'Role {role.name} ({role.mention}) was deleted', color=Color.red()) | |
await bot.log_channel.send(embed=embed) | |
except Exception as e: | |
print(f"on_guild_role_delete Error: {e}") | |
async def on_guild_role_update(before, after): | |
try: | |
# editing roles, could expand this | |
if before.name != after.name: | |
embed = Embed(description=f'Role {before.mention} was renamed to {after.name}', color=Color.orange()) | |
await bot.log_channel.send(embed=embed) | |
if before.permissions.administrator != after.permissions.administrator: | |
# changes involving the administrator permission / sensitive permissions (can help to prevent mistakes) | |
content = "<@&1108342563628404747>" # @alerts role | |
embed = Embed(description=f'Role {after.mention} had its administrator permission {"enabled" if after.permissions.administrator else "disabled"}', color=Color.red()) | |
await bot.log_channel.send(content=content, embed=embed) | |
except Exception as e: | |
print(f"on_guild_role_update Error: {e}") | |
async def on_voice_state_update(member, before, after): | |
try: | |
if before.mute != after.mute: | |
# muting members | |
embed = Embed(description=f'{member} was {"muted" if after.mute else "unmuted"} in voice chat', color=Color.orange()) | |
await bot.log_channel.send(embed=embed) | |
if before.deaf != after.deaf: | |
# deafening members | |
embed = Embed(description=f'{member} was {"deafened" if after.deaf else "undeafened"} in voice chat', color=Color.orange()) | |
await bot.log_channel.send(embed=embed) | |
except Exception as e: | |
print(f"on_voice_state_update Error: {e}") | |
# github test stuff ------------------------------------------------------------------------------------------------------------------- | |
""" | |
async def check_github(): | |
url = f'https://api.github.com/repos/{github_repo}/pulls' | |
response = requests.get(url) | |
pulls = response.json() | |
for pull in pulls: | |
# Check if the pull request was just opened | |
if pull['state'] == 'open' and pull['created_at'] == pull['updated_at']: | |
channel = client.get_channel(channel_id) | |
if channel: | |
await channel.send(f'New PR opened: {pull["title"]}') | |
""" | |
# bot stuff --------------------------------------------------------------------------------------------------------------------------- | |
async def on_ready(): | |
await asyncio.sleep(5) | |
print('Logged on as', bot.user) | |
await asyncio.sleep(5) | |
bot.log_channel = bot.get_channel(1036960509586587689) # admin-logs | |
await asyncio.sleep(5) | |
print(bot.log_channel) | |
guild = bot.get_guild(879548962464493619) | |
for channel in guild.text_channels: # helps with more accurate logging across restarts | |
try: | |
message_cache.update({m.id: m async for m in channel.history(limit=10000)}) | |
print(f"Finished caching messages for channel: {channel.name}") | |
except Exception as e: | |
print(f"An error occurred while fetching messages from {channel.name}: {e}") | |
await asyncio.sleep(0.1) | |
def run_bot(): | |
bot.run(DISCORD_TOKEN) | |
threading.Thread(target=run_bot).start() | |
def greet(name): | |
return "Hello " + name + "!" | |
demo = gr.Interface(fn=greet, inputs="text", outputs="text") | |
demo.launch() | |