|
import asyncio |
|
import os |
|
import threading |
|
from threading import Event |
|
from typing import Optional |
|
|
|
import datetime |
|
import requests |
|
import discord |
|
import gradio as gr |
|
from gradio_client import Client |
|
from discord import Permissions |
|
from discord.ext import commands |
|
from discord.utils import oauth_url |
|
from gradio_client.utils import QueueError |
|
import base64 |
|
|
|
event = Event() |
|
|
|
DISCORD_TOKEN = os.getenv("DISCORD_TOKEN") |
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
API_URL = "https://api-inference.huggingface.co/models/stabilityai/stable-diffusion-xl-base-1.0" |
|
headers = {"Authorization": "Bearer " + HF_TOKEN} |
|
|
|
def ask(input_text): |
|
api_url = "https://somerandomapifor-disorc-cause-i-need-it.onrender.com/qa" |
|
params = {'question': input_text} |
|
|
|
response = requests.get(api_url, params=params) |
|
|
|
if response.status_code == 200: |
|
json_response = response.json() |
|
answer = json_response.get('answer') |
|
|
|
|
|
modified_answer = answer[:-4] if answer else "No answer found." |
|
|
|
return modified_answer |
|
else: |
|
return "Error: Unable to fetch response" |
|
|
|
async def wait(job): |
|
while not job.done(): |
|
await asyncio.sleep(0.2) |
|
|
|
intents = discord.Intents.all() |
|
bot = commands.Bot(command_prefix="$", intents=intents, help_command=None) |
|
|
|
@bot.command() |
|
async def uptime(ctx): |
|
"""Displays the uptime of the bot.""" |
|
delta = datetime.datetime.utcnow() - bot.start_time |
|
hours, remainder = divmod(int(delta.total_seconds()), 3600) |
|
minutes, seconds = divmod(remainder, 60) |
|
days, hours = divmod(hours, 24) |
|
|
|
|
|
embed = discord.Embed(title="Bot Uptime", color=discord.Color.green()) |
|
embed.add_field(name="Uptime", value=f"{days} days, {hours} hours, {minutes} minutes, {seconds} seconds", inline=False) |
|
embed.set_footer(text="Created by Cosmos") |
|
|
|
await ctx.send(embed=embed) |
|
|
|
@bot.command() |
|
async def ai(ctx, *, input_text: str): |
|
"""Ask our AI model a question.""" |
|
async with ctx.typing(): |
|
result = ask(input_text) |
|
|
|
|
|
await ctx.send(result) |
|
|
|
def query(prompt): |
|
payload = prompt |
|
response = requests.post(API_URL, headers=headers, json=payload) |
|
return response.content |
|
|
|
@bot.command() |
|
async def img(ctx, *, prompt: str): |
|
|
|
async with ctx.typing(): |
|
image_data = query(prompt) |
|
|
|
|
|
image_b64 = base64.b64encode(image_data).decode('utf-8') |
|
|
|
|
|
await ctx.send(file=discord.File(base64.b64decode(image_b64), filename="generated_image.png")) |
|
|
|
|
|
@bot.command() |
|
async def verse(ctx): |
|
"""Get a random Bible verse.""" |
|
|
|
bible_api_url = "https://labs.bible.org/api/?passage=random&type=json" |
|
response = requests.get(bible_api_url) |
|
if response.status_code == 200: |
|
verse = response.json()[0] |
|
passage = f"**{verse['bookname']} {verse['chapter']}:{verse['verse']}** \n{verse['text']}" |
|
else: |
|
passage = "Unable to fetch Bible verse" |
|
|
|
|
|
embed = discord.Embed(title="Random Bible Verse", description=passage, color=discord.Color.blue()) |
|
embed.set_footer(text="Created by Cosmos") |
|
await ctx.send(embed=embed) |
|
|
|
@bot.command() |
|
@commands.has_permissions(kick_members=True) |
|
async def kick(ctx, member: discord.Member, *, reason: Optional[str] = "No reason provided"): |
|
"""Kicks a member.""" |
|
try: |
|
await member.kick(reason=reason) |
|
await ctx.send(f"{member.mention} has been kicked. Reason: {reason}") |
|
except discord.Forbidden: |
|
await ctx.send("I don't have permission to kick members.") |
|
except discord.HTTPException: |
|
await ctx.send("An error occurred while trying to kick the member. Please try again later.") |
|
|
|
@bot.command() |
|
@commands.has_permissions(ban_members=True) |
|
async def ban(ctx, member: discord.Member, *, reason: Optional[str] = "No reason provided"): |
|
"""Bans a member.""" |
|
try: |
|
await member.ban(reason=reason) |
|
await ctx.send(f"{member.mention} has been banned. Reason: {reason}") |
|
except discord.Forbidden: |
|
await ctx.send("I don't have permission to ban members.") |
|
except discord.HTTPException: |
|
await ctx.send("An error occurred while trying to ban the member. Please try again later.") |
|
|
|
|
|
@bot.command() |
|
async def cmds(ctx): |
|
"""Returns a list of commands and bot information.""" |
|
|
|
command_list = [f"{command.name}: {command.help}" for command in bot.commands] |
|
|
|
|
|
bot_info = f"Bot Name: {bot.user.name}\nBot ID: {bot.user.id}" |
|
|
|
|
|
embed = discord.Embed(title="Bot prefix: $", color=discord.Color.blue()) |
|
embed.add_field(name="Commands", value="\n".join(command_list), inline=False) |
|
embed.add_field(name="Bot Information", value=bot_info, inline=False) |
|
embed.set_footer(text="Created by Cosmos") |
|
|
|
await ctx.send(embed=embed) |
|
|
|
async def update_status(): |
|
await bot.wait_until_ready() |
|
while True: |
|
|
|
member_count = sum(guild.member_count for guild in bot.guilds) |
|
|
|
await bot.change_presence(activity=discord.Activity(type=discord.ActivityType.watching, name=f"{member_count} users")) |
|
|
|
await asyncio.sleep(60) |
|
|
|
@bot.event |
|
async def on_ready(): |
|
bot.start_time = datetime.datetime.utcnow() |
|
print(f"Logged in as {bot.user} (ID: {bot.user.id})") |
|
event.set() |
|
print("------") |
|
bot.loop.create_task(update_status()) |
|
|
|
@bot.event |
|
async def on_member_join(member): |
|
|
|
channel_ids = [1210566384267034644, 1210987054658494477, 1210578060164866128] |
|
channels_info = [] |
|
for channel_id in channel_ids: |
|
channel = member.guild.get_channel(channel_id) |
|
if channel: |
|
channels_info.append(f"• {channel.name}: {channel.mention}") |
|
|
|
|
|
channels_message = "\n".join(channels_info) |
|
message_content = ( |
|
f"Welcome to the server, {member.name}!\n\n" |
|
f"Hope you have a great stay here, {member.mention}!\n\n" |
|
f"Here are some channels you might find interesting:\n{channels_message}\n\n" |
|
) |
|
|
|
|
|
bible_api_url = "https://labs.bible.org/api/?passage=random&type=json" |
|
response = requests.get(bible_api_url) |
|
if response.status_code == 200: |
|
verse = response.json()[0] |
|
passage = f"{verse['bookname']} {verse['chapter']}:{verse['verse']} - {verse['text']}" |
|
|
|
message_content += f"\n\nRandom Bible Verse:\n{passage}" |
|
else: |
|
passage = "Unable to fetch Bible verse" |
|
|
|
|
|
try: |
|
await member.send(message_content) |
|
except discord.HTTPException: |
|
print(f"Failed to send a DM to {member.name}") |
|
|
|
|
|
welcome_channel = discord.utils.get(member.guild.channels, name="👋wellcome-goodbye") |
|
if welcome_channel: |
|
embed = discord.Embed( |
|
title=f"Welcome to the server, {member.name}!", |
|
description=f"Hope you have a great stay here, {member.mention}!", |
|
color=discord.Color.blue() |
|
) |
|
embed.add_field(name="Random Bible Verse", value=passage, inline=False) |
|
embed.set_footer(text="Created by Cosmos") |
|
await welcome_channel.send(embed=embed) |
|
|
|
@bot.command() |
|
async def search(ctx, *, query: str): |
|
"""Search for a bible verse.""" |
|
bible_api_url = f"https://labs.bible.org/api/?passage={query}&type=json" |
|
response = requests.get(bible_api_url) |
|
|
|
if response.status_code == 200: |
|
verses = response.json() |
|
if verses: |
|
|
|
passage = "\n".join(f"**{verse['bookname']} {verse['chapter']}:{verse['verse']}** \n{verse['text']}" for verse in verses) |
|
passage = truncate_response(passage) |
|
embed = discord.Embed(title=f"Search Results for '{query}'", description=passage, color=discord.Color.blue()) |
|
else: |
|
embed = discord.Embed(title="Search Results", description="No results found.", color=discord.Color.red()) |
|
else: |
|
embed = discord.Embed(title="Search Results", description="Unable to fetch search results.", color=discord.Color.red()) |
|
|
|
embed.set_footer(text="Created by Cosmos") |
|
await ctx.send(embed=embed) |
|
|
|
@bot.command() |
|
@commands.has_permissions(manage_messages=True) |
|
async def purge(ctx, limit: int): |
|
"""Removes N amount of messages.""" |
|
try: |
|
await ctx.channel.purge(limit=limit + 1) |
|
await ctx.send(f"{limit} messages have been purged.") |
|
except discord.Forbidden: |
|
await ctx.send("I don't have permission to manage messages.") |
|
except discord.HTTPException: |
|
await ctx.send("An error occurred while trying to purge messages. Please try again later.") |
|
|
|
|
|
@bot.event |
|
async def on_command_error(ctx, error): |
|
if isinstance(error, commands.CommandNotFound): |
|
await ctx.send("Sorry, I couldn't find that command. Use `$cmds` to see the list of available commands.") |
|
elif isinstance(error, commands.MissingRequiredArgument): |
|
await ctx.send("Oops! Looks like you're missing some required arguments.") |
|
elif isinstance(error, commands.CheckFailure): |
|
await ctx.send("You do not have the permissions to execute this command.") |
|
else: |
|
|
|
print(f"An error occurred: {error}") |
|
|
|
|
|
@bot.event |
|
async def on_error(event_method, *args, **kwargs): |
|
|
|
print(f"An error occurred in {event_method}: {sys.exc_info()}") |
|
|
|
|
|
@bot.event |
|
async def on_command_error(ctx, error): |
|
if isinstance(error, commands.CommandInvokeError): |
|
original_error = error.original |
|
if isinstance(original_error, discord.Forbidden): |
|
await ctx.send("I don't have permissions to do that.") |
|
elif isinstance(original_error, discord.HTTPException): |
|
await ctx.send("An error occurred while processing the command. Please try again later.") |
|
else: |
|
|
|
print(f"Error: {original_error}") |
|
await ctx.send("An unexpected error occurred.") |
|
|
|
|
|
def run_bot(): |
|
if not DISCORD_TOKEN: |
|
print("DISCORD_TOKEN NOT SET") |
|
event.set() |
|
else: |
|
bot.run(DISCORD_TOKEN) |
|
|
|
threading.Thread(target=run_bot).start() |
|
|
|
event.wait() |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown( |
|
f""" |
|
# Discord bot is online! |
|
""" |
|
) |
|
|
|
demo.launch() |