|
import asyncio |
|
import os |
|
import threading |
|
from threading import Event |
|
from typing import Optional |
|
|
|
import datetime |
|
import requests |
|
import discord |
|
import gradio as gr |
|
import gradio_client as grc |
|
from discord import Permissions |
|
from discord.ext import commands |
|
from discord.utils import oauth_url |
|
from gradio_client.utils import QueueError |
|
|
|
event = Event() |
|
|
|
DISCORD_TOKEN = os.getenv("DISCORD_TOKEN") |
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
|
async def wait(job): |
|
while not job.done(): |
|
await asyncio.sleep(0.2) |
|
|
|
def get_client(session: Optional[str] = None) -> grc.Client: |
|
client = grc.Client("https://wop-xxx-opengpt.hf.space/", hf_token=HF_TOKEN) |
|
if session: |
|
client.session_hash = session |
|
return client |
|
|
|
def truncate_response(response: str) -> str: |
|
ending = "...\nTruncating response to 2000 characters due to discord api limits." |
|
if len(response) > 2000: |
|
return response[: 2000 - len(ending)] + ending |
|
else: |
|
return response |
|
|
|
intents = discord.Intents.all() |
|
bot = commands.Bot(command_prefix="$", intents=intents, help_command=None) |
|
|
|
@bot.command() |
|
async def uptime(ctx): |
|
"""Displays the uptime of the bot.""" |
|
delta = datetime.datetime.utcnow() - bot.start_time |
|
hours, remainder = divmod(int(delta.total_seconds()), 3600) |
|
minutes, seconds = divmod(remainder, 60) |
|
days, hours = divmod(hours, 24) |
|
|
|
|
|
embed = discord.Embed(title="Bot Uptime", color=discord.Color.green()) |
|
embed.add_field(name="Uptime", value=f"{days} days, {hours} hours, {minutes} minutes, {seconds} seconds", inline=False) |
|
embed.set_footer(text="Created by Cosmos") |
|
|
|
await ctx.send(embed=embed) |
|
|
|
session_count = 0 |
|
|
|
@bot.command() |
|
async def ai(ctx, *, input_text: str): |
|
"""Ask our AI model a question. (Session resets every 10 messages!)""" |
|
global session_count |
|
|
|
|
|
session_count += 1 |
|
|
|
try: |
|
|
|
if session_count % 10 == 0: |
|
|
|
client = get_client(session=str(datetime.datetime.now())) |
|
else: |
|
|
|
client = get_client() |
|
|
|
|
|
result = client.predict( |
|
input_text, |
|
0.9, |
|
2000, |
|
0.9, |
|
1.2, |
|
api_name="/chat" |
|
) |
|
|
|
|
|
embed = discord.Embed(title="AI Response", description=result, color=discord.Color.green()) |
|
embed.add_field(name="Info", value="Session resets every `10` messages!\nModel Name: `Mistral 7B`", inline=False) |
|
embed.set_footer(text="Created by Cosmos") |
|
|
|
|
|
await ctx.send(embed=embed) |
|
except Exception as e: |
|
await ctx.send(f"An error occurred: {str(e)}") |
|
|
|
|
|
@bot.command() |
|
async def verse(ctx): |
|
"""Returns a random Bible verse.""" |
|
|
|
bible_api_url = "https://labs.bible.org/api/?passage=random&type=json" |
|
response = requests.get(bible_api_url) |
|
if response.status_code == 200: |
|
verse = response.json()[0] |
|
passage = f"**{verse['bookname']} {verse['chapter']}:{verse['verse']}** \n{verse['text']}" |
|
else: |
|
passage = "Unable to fetch Bible verse" |
|
|
|
|
|
embed = discord.Embed(title="Random Bible Verse", description=passage, color=discord.Color.blue()) |
|
embed.set_footer(text="Created by Cosmos") |
|
await ctx.send(embed=embed) |
|
|
|
@bot.command() |
|
@commands.has_permissions(kick_members=True) |
|
async def kick(ctx, member: discord.Member, *, reason: Optional[str] = "No reason provided"): |
|
"""Kicks a member.""" |
|
try: |
|
await member.kick(reason=reason) |
|
await ctx.send(f"{member.mention} has been kicked. Reason: {reason}") |
|
except discord.Forbidden: |
|
await ctx.send("I don't have permission to kick members.") |
|
except discord.HTTPException: |
|
await ctx.send("An error occurred while trying to kick the member. Please try again later.") |
|
|
|
@bot.command() |
|
@commands.has_permissions(ban_members=True) |
|
async def ban(ctx, member: discord.Member, *, reason: Optional[str] = "No reason provided"): |
|
"""Bans a member.""" |
|
try: |
|
await member.ban(reason=reason) |
|
await ctx.send(f"{member.mention} has been banned. Reason: {reason}") |
|
except discord.Forbidden: |
|
await ctx.send("I don't have permission to ban members.") |
|
except discord.HTTPException: |
|
await ctx.send("An error occurred while trying to ban the member. Please try again later.") |
|
|
|
|
|
@bot.command() |
|
async def cmds(ctx): |
|
"""Returns a list of commands and bot information.""" |
|
|
|
command_list = [f"{command.name}: {command.help}" for command in bot.commands] |
|
|
|
|
|
bot_info = f"Bot Name: {bot.user.name}\nBot ID: {bot.user.id}" |
|
|
|
|
|
embed = discord.Embed(title="Bot prefix: $", color=discord.Color.blue()) |
|
embed.add_field(name="Commands", value="\n".join(command_list), inline=False) |
|
embed.add_field(name="Bot Information", value=bot_info, inline=False) |
|
embed.set_footer(text="Created by Cosmos") |
|
|
|
await ctx.send(embed=embed) |
|
|
|
async def update_status(): |
|
await bot.wait_until_ready() |
|
while True: |
|
|
|
member_count = sum(guild.member_count for guild in bot.guilds) |
|
|
|
await bot.change_presence(activity=discord.Activity(type=discord.ActivityType.watching, name=f"{member_count} users")) |
|
|
|
await asyncio.sleep(60) |
|
|
|
@bot.event |
|
async def on_ready(): |
|
bot.start_time = datetime.datetime.utcnow() |
|
print(f"Logged in as {bot.user} (ID: {bot.user.id})") |
|
event.set() |
|
print("------") |
|
bot.loop.create_task(update_status()) |
|
|
|
@bot.event |
|
async def on_member_join(member): |
|
channel = discord.utils.get(member.guild.channels, name="👋wellcome-goodbye") |
|
|
|
|
|
bible_api_url = "https://labs.bible.org/api/?passage=random&type=json" |
|
response = requests.get(bible_api_url) |
|
if response.status_code == 200: |
|
verse = response.json()[0] |
|
passage = f"{verse['bookname']} {verse['chapter']}:{verse['verse']} - {verse['text']}" |
|
else: |
|
passage = "Unable to fetch Bible verse" |
|
|
|
|
|
embed = discord.Embed(title=f"Welcome to the server, {member.name}!", description=f"Hope you have a great stay here, <@{member.id}>!", color=discord.Color.blue()) |
|
embed.add_field(name="Random Bible Verse", value=passage, inline=False) |
|
embed.set_footer(text="Created by Cosmos") |
|
|
|
await channel.send(embed=embed) |
|
|
|
@bot.command() |
|
async def search(ctx, *, query: str): |
|
"""Search for a bible verse.""" |
|
bible_api_url = f"https://labs.bible.org/api/?passage={query}&type=json" |
|
response = requests.get(bible_api_url) |
|
|
|
if response.status_code == 200: |
|
verses = response.json() |
|
if verses: |
|
|
|
passage = "\n".join(f"**{verse['bookname']} {verse['chapter']}:{verse['verse']}** \n{verse['text']}" for verse in verses) |
|
passage = truncate_response(passage) |
|
embed = discord.Embed(title=f"Search Results for '{query}'", description=passage, color=discord.Color.blue()) |
|
else: |
|
embed = discord.Embed(title="Search Results", description="No results found.", color=discord.Color.red()) |
|
else: |
|
embed = discord.Embed(title="Search Results", description="Unable to fetch search results.", color=discord.Color.red()) |
|
|
|
embed.set_footer(text="Created by Cosmos") |
|
await ctx.send(embed=embed) |
|
|
|
@bot.command() |
|
@commands.has_permissions(manage_messages=True) |
|
async def purge(ctx, limit: int): |
|
"""Removes N amount of messages.""" |
|
try: |
|
await ctx.channel.purge(limit=limit + 1) |
|
await ctx.send(f"{limit} messages have been purged.") |
|
except discord.Forbidden: |
|
await ctx.send("I don't have permission to manage messages.") |
|
except discord.HTTPException: |
|
await ctx.send("An error occurred while trying to purge messages. Please try again later.") |
|
|
|
|
|
@bot.event |
|
async def on_command_error(ctx, error): |
|
if isinstance(error, commands.CommandNotFound): |
|
await ctx.send("Sorry, I couldn't find that command. Use `$cmds` to see the list of available commands.") |
|
elif isinstance(error, commands.MissingRequiredArgument): |
|
await ctx.send("Oops! Looks like you're missing some required arguments.") |
|
elif isinstance(error, commands.CheckFailure): |
|
await ctx.send("You do not have the permissions to execute this command.") |
|
else: |
|
|
|
print(f"An error occurred: {error}") |
|
|
|
|
|
@bot.event |
|
async def on_error(event_method, *args, **kwargs): |
|
|
|
print(f"An error occurred in {event_method}: {sys.exc_info()}") |
|
|
|
|
|
@bot.event |
|
async def on_command_error(ctx, error): |
|
if isinstance(error, commands.CommandInvokeError): |
|
original_error = error.original |
|
if isinstance(original_error, discord.Forbidden): |
|
await ctx.send("I don't have permissions to do that.") |
|
elif isinstance(original_error, discord.HTTPException): |
|
await ctx.send("An error occurred while processing the command. Please try again later.") |
|
else: |
|
|
|
print(f"Error: {original_error}") |
|
await ctx.send("An unexpected error occurred.") |
|
|
|
|
|
def run_bot(): |
|
if not DISCORD_TOKEN: |
|
print("DISCORD_TOKEN NOT SET") |
|
event.set() |
|
else: |
|
bot.run(DISCORD_TOKEN) |
|
|
|
threading.Thread(target=run_bot).start() |
|
|
|
event.wait() |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown( |
|
f""" |
|
# Discord bot is online! |
|
""" |
|
) |
|
|
|
demo.launch() |