editfiltrbot / main.py
Foydalanuvchi
Phase 3 Optimization & Final Deployment
9b345f9
import socket
import os
# --- Hugging Face Ultra-Robust DNS Patch Start ---
# 1. Proxylarni o'chirish (HF dagi noto'g'ri sozlangan proxylarni chetlab o'tish)
os.environ.pop('HTTP_PROXY', None)
os.environ.pop('HTTPS_PROXY', None)
os.environ.pop('http_proxy', None)
os.environ.pop('https_proxy', None)
# 2. DNS Monkey Patch
TELEGRAM_IPS = ['149.154.167.220', '149.154.167.189', '149.154.167.50']
_original_getaddrinfo = socket.getaddrinfo
def _patched_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
# httpx va boshqalar hostni bytes shaklida yuborishi mumkin
is_telegram = False
if isinstance(host, str) and host == "api.telegram.org":
is_telegram = True
elif isinstance(host, bytes) and host == b"api.telegram.org":
is_telegram = True
if is_telegram:
# Barcha oilalarni (IPv6 bo'lsa ham) IPv4 ga majburlaymiz
return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (ip, port)) for ip in TELEGRAM_IPS]
return _original_getaddrinfo(host, port, family, type, proto, flags)
socket.getaddrinfo = _patched_getaddrinfo
# --- Hugging Face Ultra-Robust DNS Patch End ---
import logging
import asyncio
import collections
import gc
import time
import threading
from datetime import datetime, timedelta
from dotenv import load_dotenv
from http.server import BaseHTTPRequestHandler, HTTPServer
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, constants
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
MessageHandler,
filters,
ContextTypes,
CallbackQueryHandler
)
from filters import (
apply_retro_filter, upscale_image, apply_face_restore, apply_auto_enhance,
process_video_retro, process_video_upscale, process_video_slowmo,
process_video_bw, process_video_color_correct, process_video_remove_audio,
process_video_trim, process_video_face_fix, process_video_auto_enhance,
process_video_fps_boost, apply_nudenet_filter, process_video_nnsfw,
process_video_subtitle, process_video_stabilize,
apply_glitch_filter, apply_mirror_filter, apply_watermark, apply_bg_remove,
apply_style_transfer, apply_quality_boost,
process_video_glitch, process_video_mirror, process_video_watermark,
process_video_subtitle_translate
)
# Admin sozlamalari
ADMIN_ID = int(os.environ.get("ADMIN_ID", 6309900880))
BROADCAST_MODE = {} # {admin_id: True}
from database import db
from concurrent.futures import ThreadPoolExecutor
from apscheduler.schedulers.asyncio import AsyncIOScheduler
# muhit o'zgaruvchilari
load_dotenv()
TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
# Loglarni sozlash
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Global media storage
media_storage = {}
processing_semaphore = asyncio.Semaphore(2)
executor = ThreadPoolExecutor(max_workers=4)
base_dir = os.path.dirname(os.path.abspath(__file__))
# Rate Limit (Spam himoyasi)
user_active_tasks = {} # {user_id: timestamp}
RATE_LIMIT_SECONDS = 5 # Minimum kutish vaqti (soniya)
def check_rate_limit(user_id):
"""Foydalanuvchi spam qilayotganini tekshiradi. True = ruxsat, False = bloklangan."""
now = time.time()
last_time = user_active_tasks.get(user_id)
if last_time and (now - last_time) < RATE_LIMIT_SECONDS:
return False
user_active_tasks[user_id] = now
return True
# Scheduler for cleanup
scheduler = AsyncIOScheduler()
async def post_init(application):
"""Bot ishga tushgandan so'ng bajariladigan amallar."""
scheduler.add_job(cleanup_old_files, 'interval', minutes=30)
scheduler.start()
logger.info("Scheduler va tozalash tizimi ishga tushdi.")
async def cleanup_old_files():
"""Eski fayllarni, vaqtinchalik fayllarni va xotirani tozalash."""
now = datetime.now()
# 1. Eskirgan media_storage yozuvlarini tozalash
to_delete = [k for k, v in media_storage.items() if (now - v.get('timestamp', now)) > timedelta(hours=1)]
for k in to_delete:
media_storage.pop(k, None)
# 2. Eskirgan user_active_tasks ni tozalash
stale_users = [uid for uid, ts in user_active_tasks.items() if (time.time() - ts) > 300]
for uid in stale_users:
user_active_tasks.pop(uid, None)
# 3. in_/out_/tmp_ fayllarni tozalash
for f in os.listdir(base_dir):
fpath = os.path.join(base_dir, f)
if (f.startswith("in_") or f.startswith("out_") or f.startswith("tmp_")):
try:
file_age = now - datetime.fromtimestamp(os.path.getmtime(fpath))
if file_age > timedelta(minutes=30):
os.remove(fpath)
logger.info(f"Tozalandi: {f}")
except: pass
gc.collect()
logger.info(f"Tozalash yakunlandi. media_storage: {len(media_storage)}, active_tasks: {len(user_active_tasks)}")
# --- UI Helpers ---
def get_main_menu_keyboard(user_id=None):
"""Asosiy menyu tugmalari. Admin uchun qo'shimcha tugma qo'shiladi."""
keyboard = [
[
InlineKeyboardButton("📜 Tarix", callback_data="nav|history"),
InlineKeyboardButton("⚙️ Sozlamalar", callback_data="nav|settings")
],
[
InlineKeyboardButton("ℹ️ Yordam", callback_data="nav|help")
]
]
# Faqat admin uchun tugma
if user_id == ADMIN_ID:
keyboard.append([InlineKeyboardButton("👑 Admin Panel", callback_data="admin_main")])
return InlineKeyboardMarkup(keyboard)
async def show_main_menu(update_or_query, context):
"""Asosiy menyuni ko'rsatish."""
text = (
"✨ **Mukammal Filtr Botga xush kelibsiz!**\n\n"
"Menga rasm yoki video yuboring, so'ngra mo''jizani ko'ring. 🎨✨\n\n"
"💡 *Pastdagi tugmalar orqali botni boshqarishingiz mumkin:*"
)
if isinstance(update_or_query, Update):
user_id = update_or_query.effective_user.id
await update_or_query.message.reply_text(text, reply_markup=get_main_menu_keyboard(user_id), parse_mode=constants.ParseMode.MARKDOWN)
else:
user_id = update_or_query.from_user.id
await update_or_query.edit_message_text(text, reply_markup=get_main_menu_keyboard(user_id), parse_mode=constants.ParseMode.MARKDOWN)
# --- Handlers ---
async def settings_command(update_or_query, context: ContextTypes.DEFAULT_TYPE):
"""Sozlamalar menyusi."""
if isinstance(update_or_query, Update):
user_id = update_or_query.effective_user.id
else:
user_id = update_or_query.from_user.id
current_filter = db.get_user_settings(user_id)
text = (
"⚙️ **Sozlamalar**\n\n"
f"Joriy standart filtr: **{current_filter.upper()}**\n"
"Yangi standart filtrni tanlang:"
)
keyboard = [
[
InlineKeyboardButton("🎞 Retro", callback_data="set|retro"),
InlineKeyboardButton("💎 Ultra HD", callback_data="set|upscale")
],
[
InlineKeyboardButton("🤖 Face Fix", callback_data="set|face_fix"),
InlineKeyboardButton("✨ Auto-Enhance", callback_data="set|auto_enhance")
],
[InlineKeyboardButton("🔙 Orqaga", callback_data="nav|main")]
]
if isinstance(update_or_query, Update):
await update_or_query.message.reply_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode=constants.ParseMode.MARKDOWN)
else:
await update_or_query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode=constants.ParseMode.MARKDOWN)
async def settings_callback_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Filtrni tanlash callbacki."""
query = update.callback_query
await query.answer()
filter_type = query.data.split("|")[1]
db.set_default_filter(query.from_user.id, filter_type)
name_map = {"retro": "RETRO", "upscale": "ULTRA HD", "face_fix": "FACE FIX", "auto_enhance": "AUTO-ENHANCE"}
display_name = name_map.get(filter_type, filter_type.upper())
await query.edit_message_text(
f"✅ Standart filtr **{display_name}** ga o'zgartirildi!",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🔙 Orqaga", callback_data="nav|settings")]]),
parse_mode=constants.ParseMode.MARKDOWN
)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/start buyrug'i."""
user = update.effective_user
db.add_user(user.id, user.username, user.first_name)
await show_main_menu(update, context)
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Rasmlarni qabul qilish va Kategoriyalangan Menyuni ko'rsatish."""
try:
photo = update.message.photo[-1]
file_id = photo.file_id
short_id = str(update.message.message_id)
media_storage[short_id] = {"file_id": file_id, "type": "photo", "timestamp": datetime.now()}
# 1. Haqiqiy Xiralikni tekshirish (Laplacian Variance)
is_blurry = False
try:
file = await context.bot.get_file(file_id)
input_path = os.path.join(base_dir, f"tmp_chk_{short_id}.jpg")
await file.download_to_drive(input_path)
import cv2
img_chk = cv2.imread(input_path, cv2.IMREAD_GRAYSCALE)
if img_chk is not None:
variance = cv2.Laplacian(img_chk, cv2.CV_64F).var()
if variance < 100: # Odatda 100 dan pasti xira hisoblanadi
is_blurry = True
if os.path.exists(input_path):
os.remove(input_path)
except Exception as filter_err:
logger.warning(f"Xiralikni tekshirish xatosi: {filter_err}")
text = "🖼 **Rasm qabul qilindi!**\nO'zingizga kerakli bo'limni tanlang 👇"
if is_blurry:
text = "🔍 **Tahlil:** Rasm xiraroq ko'rinmoqda. 'AI Asboblar' bo'limidan sifatni oshirish tavsiya etiladi!\n\n" + text
# KATEGORIYALANGAN MENYU (Asosiy)
keyboard = [
[
InlineKeyboardButton("🪄 Effektlar", callback_data=f"cat_fx|p|{short_id}"),
InlineKeyboardButton("🤖 AI Asboblar", callback_data=f"cat_ai|p|{short_id}")
],
[
InlineKeyboardButton("🎨 Fon (Tahrirlash)", callback_data=f"cat_bg|p|{short_id}"),
InlineKeyboardButton("🖌️ Badiiy Uslublar", callback_data=f"cat_st|p|{short_id}")
],
[
InlineKeyboardButton("🔙 Bekor qilish", callback_data="nav|main")
]
]
await update.message.reply_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode=constants.ParseMode.MARKDOWN)
except Exception as e:
logger.error(f"Error in handle_photo: {e}")
await update.message.reply_text("❌ Xatolik yuz berdi.")
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Matn xabarlarini qabul qilish (broadcast uchun)."""
# Agar admin xabar tarqatish rejimida bo'lsa
if update.effective_user.id == ADMIN_ID and BROADCAST_MODE.get(ADMIN_ID):
BROADCAST_MODE[ADMIN_ID] = False
text_to_send = update.message.text
users = db.get_all_users()
count = 0
failed = 0
status_msg = await update.message.reply_text(f"🚀 Xabar tarqatish boshlandi (0/{len(users)})...")
for u_id in users:
try:
await context.bot.send_message(u_id, text_to_send)
count += 1
if count % 10 == 0:
await status_msg.edit_text(f"🚀 Xabar tarqatish davom etmoqda ({count}/{len(users)})...")
except Exception:
failed += 1
await status_msg.edit_text(f"✅ Xabar tarqatish yakunlandi.\n✉️ Yuborildi: {count} ta\n❌ Xato: {failed} ta")
return
# Oddiy foydalanuvchilar uchun — hech narsa qilmaymiz
# (rasm yoki video yuboring degan javob keraksiz, chunki ortiqcha xabar beradi)
return
async def handle_video(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Videolarni qabul qilish va Kategoriyalangan Menyuni ko'rsatish."""
try:
message = update.message
video = message.video or message.animation or message.document
if message.document and not (message.document.mime_type or "").startswith('video/'):
return
file_id = getattr(video, 'file_id', None)
if not file_id: return
if getattr(video, 'file_size', 0) > 50 * 1024 * 1024:
await message.reply_text("⚠️ Video juda katta (max 50MB).")
return
short_id = str(message.message_id)
media_storage[short_id] = {"file_id": file_id, "type": "video", "timestamp": datetime.now()}
# KATEGORIYALANGAN MENYU (Video uchun)
keyboard = [
[
InlineKeyboardButton("🪄 Effektlar", callback_data=f"cv_fx|v|{short_id}"),
InlineKeyboardButton("🤖 AI Kadrlar", callback_data=f"cv_ai|v|{short_id}")
],
[
InlineKeyboardButton("🛠 Uzunlik va Ovoz", callback_data=f"cv_ed|v|{short_id}"),
InlineKeyboardButton("📝 Subtitr va Tarjima", callback_data=f"cv_sb|v|{short_id}")
],
[
InlineKeyboardButton("🔙 Bekor qilish", callback_data="nav|main")
]
]
await update.message.reply_text("🎥 **Video qabul qilindi!**\nQanday mo'jiza yaratamiz? 👇", reply_markup=InlineKeyboardMarkup(keyboard), parse_mode=constants.ParseMode.MARKDOWN)
except Exception as e:
logger.error(f"Error in handle_video: {e}")
await update.message.reply_text("❌ Xatolik yuz berdi.")
async def navigation_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Navigatsiya tugmalari (Menu, History, Settings)."""
query = update.callback_query
await query.answer()
data = query.data.split("|")[1]
if data == "main":
await show_main_menu(query, context)
elif data == "history":
history = db.get_user_history(query.from_user.id)
if not history:
text = "📭 Tarix bo'sh."
else:
text = "📜 **Sizning oxirgi 5 ta amalingiz:**\n\n"
for i, (m_type, f_type, dt) in enumerate(history, 1):
dt_obj = datetime.fromisoformat(dt).strftime("%H:%M %d.%m")
text += f"{i}. {'🖼' if m_type == 'photo' else '🎥'} {f_type.upper()} - {dt_obj}\n"
keyboard = [[InlineKeyboardButton("🔙 Orqaga", callback_data="nav|main")]]
await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode=constants.ParseMode.MARKDOWN)
elif data == "settings":
await settings_command(query, context)
elif data == "help":
text = (
"ℹ️ **Yordam**\n\n"
"1. Rasm yoki Video yuboring.\n"
"2. Tugmalar orqali filtrni tanlang.\n"
"3. Progress bar orqali jarayonni kuzating.\n"
"4. Natijani yuklab oling!"
)
keyboard = [[InlineKeyboardButton("🔙 Orqaga", callback_data="nav|main")]]
await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode=constants.ParseMode.MARKDOWN)
async def admin_panel(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Faqat admin ko'ra oladigan boshqaruv paneli."""
if update.effective_user.id != ADMIN_ID:
return
keyboard = [
[
InlineKeyboardButton("📊 Statistika", callback_data="admin_stats"),
InlineKeyboardButton("📂 Bazani yuklab olish", callback_data="admin_db")
],
[
InlineKeyboardButton("📢 Xabar tarqatish", callback_data="admin_broadcast")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("👑 **Admin Panel**\n\nBotni boshqarish uchun kerakli bo'limni tanlang:",
reply_markup=reply_markup, parse_mode=constants.ParseMode.MARKDOWN)
async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
if query.data.startswith("admin_"):
if update.effective_user.id != ADMIN_ID:
return
if query.data == "admin_stats":
stats = db.get_stats()
text = (f"📊 **Bot Statistikasi**\n\n"
f"👥 Jami foydalanuvchilar: {stats['total_users']}\n"
f"🔄 Jami tahrirlangan media: {stats['total_processed']}\n"
f"📈 Oxirgi 24 soatdagi faollik: {stats['daily_active']}")
await query.edit_message_text(text, parse_mode=constants.ParseMode.MARKDOWN,
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ Orqaga", callback_data="admin_back")]]))
elif query.data == "admin_db":
try:
with open(db.db_path, 'rb') as db_file:
await query.message.reply_document(document=db_file, caption="📂 Bot ma'lumotlar bazasi zaxira nusxasi.")
except Exception as e:
logger.error(f"Admin DB yuklash xatosi: {e}")
await query.message.reply_text("❌ Bazani yuklashda xatolik.")
elif query.data == "admin_broadcast":
BROADCAST_MODE[ADMIN_ID] = True
await query.edit_message_text("📝 **Xabar tarqatish rejimi**\n\nBarcha foydalanuvchilarga yubormoqchi bo'lgan matningizni kiriting:",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("❌ Bekor qilish", callback_data="admin_back")]]))
elif query.data == "admin_back" or query.data == "admin_main":
BROADCAST_MODE[ADMIN_ID] = False
keyboard = [
[InlineKeyboardButton("📊 Statistika", callback_data="admin_stats"),
InlineKeyboardButton("📂 Bazani yuklab olish", callback_data="admin_db")],
[InlineKeyboardButton("📢 Xabar tarqatish", callback_data="admin_broadcast")],
[InlineKeyboardButton("🔙 Asosiy Menyu", callback_data="nav|main")]
]
await query.edit_message_text("👑 **Admin Panel**\n\nBo'limni tanlang:",
reply_markup=InlineKeyboardMarkup(keyboard), parse_mode=constants.ParseMode.MARKDOWN)
return
# KATEGORIYA MENYULARINI QAYTA ISHLASH (Sub-menyular)
if query.data.startswith("cat_") or query.data.startswith("cv_"):
data_parts = query.data.split("|")
cat_type = data_parts[0]
m_type = data_parts[1]
short_id = data_parts[2]
keyboard = []
text = "Tanlang 👇"
# --- RASM KATEGORIYALARI ---
if cat_type == "cat_fx":
text = "🪄 **Rasm Effektlari:**"
keyboard = [
[InlineKeyboardButton("📸 Retro Kamera", callback_data=f"r|p|{short_id}"), InlineKeyboardButton("🪄 Glitch", callback_data=f"gl|p|{short_id}")],
[InlineKeyboardButton("🪞 Oyna", callback_data=f"mr|p|{short_id}"), InlineKeyboardButton("📱 Suv belgisi", callback_data=f"wm|p|{short_id}")],
[InlineKeyboardButton("🔙 Orqaga", callback_data="nav|main")]
]
elif cat_type == "cat_ai":
text = "🤖 **AI Asboblar:**"
keyboard = [
[InlineKeyboardButton("✨ Sifat (Ultra HD)", callback_data=f"u|p|{short_id}"), InlineKeyboardButton("👤 Yuz (Face Fix)", callback_data=f"f|p|{short_id}")],
[InlineKeyboardButton("🪄 Avto-Tahrir", callback_data=f"a|p|{short_id}"), InlineKeyboardButton("💎 Sifat+ (Pro)", callback_data=f"qb|p|{short_id}")],
[InlineKeyboardButton("🛡️ Media Qalqon", callback_data=f"n|p|{short_id}")],
[InlineKeyboardButton("🔙 Orqaga", callback_data="nav|main")]
]
elif cat_type == "cat_bg":
text = "🎨 **Fonni Tahrirlash (AI):**"
keyboard = [
[InlineKeyboardButton("🎨 Shaffof", callback_data=f"bg_t|p|{short_id}"), InlineKeyboardButton("⬜ Oq Fon", callback_data=f"bg_w|p|{short_id}")],
[InlineKeyboardButton("⬛ Qora Fon", callback_data=f"bg_k|p|{short_id}"), InlineKeyboardButton("🌫️ Blur", callback_data=f"bg_b|p|{short_id}")],
[InlineKeyboardButton("🌈 Gradient", callback_data=f"bg_c|p|{short_id}")],
[InlineKeyboardButton("🔙 Orqaga", callback_data="nav|main")]
]
elif cat_type == "cat_st":
text = "🖌️ **Badiiy Uslublar (AI):**"
keyboard = [
[InlineKeyboardButton("🖌️ Anime", callback_data=f"st_anime|p|{short_id}"), InlineKeyboardButton("🖌️ Qalam", callback_data=f"st_sketch|p|{short_id}")],
[InlineKeyboardButton("🖌️ Moybo'yoq", callback_data=f"st_oil|p|{short_id}"), InlineKeyboardButton("🖌️ Multfilm", callback_data=f"st_cart|p|{short_id}")],
[InlineKeyboardButton("🔙 Orqaga", callback_data="nav|main")]
]
# --- VIDEO KATEGORIYALARI ---
elif cat_type == "cv_fx":
text = "🪄 **Video Effektlari:**"
keyboard = [
[InlineKeyboardButton("🎞️ Retro", callback_data=f"r|v|{short_id}"), InlineKeyboardButton("⚫ Oq-Qora", callback_data=f"bw|v|{short_id}")],
[InlineKeyboardButton("🪄 Glitch", callback_data=f"gl|v|{short_id}"), InlineKeyboardButton("🪞 Oyna", callback_data=f"mr|v|{short_id}")],
[InlineKeyboardButton("🎨 Rang Tahrir", callback_data=f"cc|v|{short_id}"), InlineKeyboardButton("📱 Suv belgisi", callback_data=f"wm|v|{short_id}")],
[InlineKeyboardButton("🔙 Orqaga", callback_data="nav|main")]
]
elif cat_type == "cv_ai":
text = "🤖 **Video AI Kadrlar:**"
keyboard = [
[InlineKeyboardButton("✨ Sifatni Oshirish", callback_data=f"u|v|{short_id}"), InlineKeyboardButton("👤 Yuzni Tiniqlash", callback_data=f"vf|v|{short_id}")],
[InlineKeyboardButton("🪄 Avto-Tahrir", callback_data=f"va|v|{short_id}"), InlineKeyboardButton("🛡️ Media Qalqon", callback_data=f"n|v|{short_id}")],
[InlineKeyboardButton("📹 Stabilizatsiya", callback_data=f"stb|v|{short_id}")],
[InlineKeyboardButton("🚀 30 FPS", callback_data=f"fps30|v|{short_id}"), InlineKeyboardButton("🚀 60 FPS", callback_data=f"fps60|v|{short_id}")],
[InlineKeyboardButton("🚀 120 FPS", callback_data=f"fps120|v|{short_id}")],
[InlineKeyboardButton("🔙 Orqaga", callback_data="nav|main")]
]
elif cat_type == "cv_ed":
text = "🛠 **Uzunlik va Ovoz:**"
keyboard = [
[InlineKeyboardButton("🐢 Sekinlashtirish", callback_data=f"s|v|{short_id}"), InlineKeyboardButton("✂️ Kesish", callback_data=f"t|v|{short_id}")],
[InlineKeyboardButton("🔇 Ovozni O'chirish", callback_data=f"ra|v|{short_id}")],
[InlineKeyboardButton("🔙 Orqaga", callback_data="nav|main")]
]
elif cat_type == "cv_sb":
text = "📝 **Subtitr va Tarjima (AI):**"
keyboard = [
[InlineKeyboardButton("📝 Avto-Taglavha (Asl)", callback_data=f"sub|v|{short_id}")],
[InlineKeyboardButton("🌐 Sub+O'zbek", callback_data=f"stuz|v|{short_id}"), InlineKeyboardButton("🌐 Sub+Rus", callback_data=f"stru|v|{short_id}")],
[InlineKeyboardButton("🌐 Sub+English", callback_data=f"sten|v|{short_id}")],
[InlineKeyboardButton("🔙 Orqaga", callback_data="nav|main")]
]
await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode=constants.ParseMode.MARKDOWN)
return
# [Avvalgi button_handler mantiqi davom etadi...]
try:
data = query.data.split("|")
action, m_type, short_id = data[0], data[1], data[2]
if short_id not in media_storage:
await query.edit_message_text("⚠️ Media ma'lumotlari xotiradan tozalangan. Iltimos, aserni qayta yuboring.")
return
# Spam himoyasi: foydalanuvchi juda tez-tez so'rov yuborsa bloklash
user_id = query.from_user.id
if not check_rate_limit(user_id):
await query.edit_message_text("⏳ Sizda tugallanmagan jarayon bor yoki juda tez so'rov yuboryapsiz. Biroz kuting!")
return
async with processing_semaphore:
media_info = media_storage[short_id]
file_id = media_info["file_id"]
msg = await query.edit_message_text("⏳ Jarayon boshlanmoqda...")
file = await context.bot.get_file(file_id, read_timeout=300)
ext = "mp4" if m_type == "v" else "jpg"
input_path = os.path.join(base_dir, f"in_{short_id}.{ext}")
output_path = os.path.join(base_dir, f"out_{short_id}.{ext}")
await file.download_to_drive(input_path, read_timeout=300)
last_percent = [0]
async def progress(p):
# Faqat 10% dan oshganda xabarni yangilash (Telegram API ni qiynamaslik uchun)
if p - last_percent[0] >= 10 or p == 99:
last_percent[0] = p
bar = "█" * (p // 10) + "░" * (10 - p // 10)
try: await query.edit_message_text(f"⚙️ Qayta ishlanmoqda...\n[{bar}] {p}%")
except: pass
loop = asyncio.get_event_loop()
callback = lambda p: asyncio.run_coroutine_threadsafe(progress(p), loop)
success_path = None
logger.info(f"Processing started: {action} on {m_type} for {short_id}")
if m_type == "p":
func = None
# Style Transfer uchun alohida mantiq
if action.startswith("st_"):
try: await query.edit_message_text("🖌️ AI uslubiga o'girilmoqda...\n(Birinchi ishlashda 20 soniyagacha vaqt olishi mumkin)")
except: pass
style = action.replace("st_", "").replace("cart", "cartoon")
success_path = await loop.run_in_executor(
executor, apply_style_transfer, input_path, output_path, style
)
elif action == "u":
try: await query.edit_message_text("💎 AI Ultra HD sifatga oshirilmoqda...\n(Bu jarayon bir oz vaqt olishi mumkin)")
except: pass
success_path = await loop.run_in_executor(executor, upscale_image, input_path, output_path)
elif action.startswith("bg_"):
bg_mode_map = {
"bg_t": "transparent", "bg_w": "white",
"bg_k": "black", "bg_b": "blur", "bg_c": "gradient"
}
bg_mode = bg_mode_map.get(action, "transparent")
mode_names = {"bg_t": "🎨 Shaffof fon", "bg_w": "⬜ Oq fon", "bg_k": "⬛ Qora fon", "bg_b": "🌫️ Blur fon", "bg_c": "🌈 Gradient fon"}
try: await query.edit_message_text(f"✂️ {mode_names.get(action, 'Fon')} tayyorlanmoqda...\n(AI tahlil + Edge Refinement)")
except: pass
success_path = await loop.run_in_executor(
executor, apply_bg_remove, input_path, output_path, bg_mode
)
else:
func = {
"r": apply_retro_filter, "u": upscale_image,
"f": apply_face_restore, "a": apply_auto_enhance,
"n": apply_nudenet_filter,
"gl": apply_glitch_filter, "mr": apply_mirror_filter,
"wm": apply_watermark,
"qb": apply_quality_boost,
}.get(action)
if not func:
logger.error(f"Unknown action: {action}")
await context.bot.send_message(query.message.chat_id, "❌ Noma'lum amal.")
return
success_path = await loop.run_in_executor(executor, func, input_path, output_path)
else:
# FPS Boost uchun alohida mantiq
if action in ("fps30", "fps60", "fps120"):
fps_map = {"fps30": 30, "fps60": 60, "fps120": 120}
target = fps_map[action]
success_path = await loop.run_in_executor(
executor, process_video_fps_boost, input_path, output_path, target, callback
)
else:
func = None
# Subtitle Tarjima uchun alohida mantiq
if action in ("stuz", "stru", "sten"):
try: await query.edit_message_text("🌐 Ovoz aniqlanib tarjima qilinmoqda...\n(Bu jarayon video uzunligiga qarab bir oz vaqt oladi)")
except: pass
lang_map = {"stuz": "uz", "stru": "ru", "sten": "en"}
target = lang_map[action]
success_path = await loop.run_in_executor(
executor, process_video_subtitle_translate, input_path, output_path, target, callback
)
else:
video_funcs = {
"r": process_video_retro, "u": process_video_upscale,
"s": process_video_slowmo, "bw": process_video_bw,
"cc": process_video_color_correct, "ra": process_video_remove_audio,
"t": process_video_trim, "vf": process_video_face_fix,
"va": process_video_auto_enhance, "n": process_video_nnsfw,
"sub": process_video_subtitle, "gl": process_video_glitch,
"mr": process_video_mirror, "wm": process_video_watermark,
"stb": process_video_stabilize
}
func = video_funcs.get(action)
if not func:
logger.error(f"Unknown video action: {action}")
await context.bot.send_message(query.message.chat_id, "❌ Noma'lum video amali.")
return
success_path = await loop.run_in_executor(executor, func, input_path, output_path, callback)
logger.info(f"Processing finished. Success path: {success_path}")
if success_path and os.path.exists(success_path):
# Extentsionni tiklash (rembg kabi png yaratuvchilar uchun)
final_ext = str(success_path).split('.')[-1]
f_name = {
"r": "retro", "u": "4k", "f": "face fix", "a": "auto enhance",
"s": "slow motion", "bw": "oq-qora", "cc": "rang fix",
"ra": "ovoz olib tashlash", "t": "kesish", "vf": "video face fix",
"va": "video auto enhance", "fps30": "30 FPS boost",
"fps60": "60 FPS boost", "fps120": "120 FPS boost",
"n": "media shield",
"sub": "auto-subtitle",
"gl": "glitch", "mr": "mirror",
"wm": "watermark", "bg": "fon o'chirish",
"st_anime": "AI anime", "st_sketch": "AI sketch",
"st_oil": "AI oil paint", "st_cart": "AI cartoon",
"stuz": "sub+o'zbek", "stru": "sub+rus", "sten": "sub+english",
"qb": "sifat+ pro", "stb": "stabilizatsiya",
"bg_t": "shaffof fon", "bg_w": "oq fon",
"bg_k": "qora fon", "bg_b": "blur fon", "bg_c": "gradient fon"
}.get(action, f"filter_{action}")
db.log_history(query.from_user.id, "photo" if m_type == "p" else "video", f_name, file_id)
with open(success_path, 'rb') as f:
caption = f"✅ <b>{f_name.upper()}</b> muvaffaqiyatli qo'llanildi!\n✨ @editfiltrbot orqali maxsus tayyorlandi!"
try: await query.edit_message_text("✅ Tayyor! Yuklanmoqda...")
except: pass
if m_type == "p":
if final_ext in ["png", "webp"]:
await context.bot.send_document(query.message.chat_id, f, caption=caption, parse_mode=constants.ParseMode.HTML, read_timeout=300, write_timeout=300)
else:
await context.bot.send_photo(query.message.chat_id, f, caption=caption, parse_mode=constants.ParseMode.HTML, read_timeout=300, write_timeout=300)
else:
await context.bot.send_video(query.message.chat_id, f, caption=caption, parse_mode=constants.ParseMode.HTML, read_timeout=300, write_timeout=300)
else:
logger.error(f"Processing failed for {short_id}. Action: {action}")
await context.bot.send_message(query.message.chat_id, "❌ Afsuski, ishlov berishda xatolik yuz berdi yoki natija topilmadi.")
await query.delete_message()
except Exception as e:
logger.error(f"Global button_handler error: {e}")
try: await context.bot.send_message(query.message.chat_id, f"❌ Tizimli xatolik: {e}")
except: pass
finally:
# Rate limit tozalash
try:
user_active_tasks.pop(query.from_user.id, None)
except: pass
# Fayllarni tozalash
for cleanup_path in [locals().get('input_path'), locals().get('output_path')]:
if cleanup_path and os.path.exists(cleanup_path):
try: os.remove(cleanup_path)
except: pass
# media_storage dan eskirgan yozuvlarni tozalash
if short_id in media_storage:
media_storage.pop(short_id, None)
# Xotirani majburiy tozalash (RAM to'lishini oldini oladi)
gc.collect()
class HealthCheckHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(b"Bot ishlamoqda!")
def log_message(self, format, *args):
pass
def run_dummy_server():
try:
server = HTTPServer(("0.0.0.0", 7860), HealthCheckHandler)
server.serve_forever()
except Exception as e:
logger.error(f"Dummy server xatosi: {e}")
def wait_for_internet(host="api.telegram.org", port=443, timeout=60):
"""Internet ulanishini, DNS va IP orqali ulanishni kutish."""
start_time = time.time()
logger.info(f"Tarmoq kutilmoqda: {host}...")
# Sinab ko'rish uchun manzillar ro'yxati
targets = [(host, port)] + [(ip, port) for ip in TELEGRAM_IPS]
while time.time() - start_time < timeout:
for target_host, target_port in targets:
try:
socket.create_connection((target_host, target_port), timeout=5)
logger.info(f"Tarmoq tayyor! {target_host} ga ulanish muvaffaqiyatli.")
return True
except Exception:
continue
time.sleep(2)
logger.warning("Tarmoq ulanishida muammo bo'lishi mumkin, lekin davom etamiz...")
return False
if __name__ == '__main__':
if not TOKEN:
logger.error("TELEGRAM_BOT_TOKEN topilmadi! Iltimos, Secrets bo'limini tekshiring.")
else:
# 1. Dummy serverni fonda ishga tushirish
threading.Thread(target=run_dummy_server, daemon=True).start()
# 2. Tarmoqni kutish
wait_for_internet()
# 3. Botni qurish (IPv4 ni majburiy qilish uchun request sozlamalari bilan)
from telegram.request import HTTPXRequest
# Hugging Face da IPv6 xatolarini chetlab o'tish uchun IPv4 ni majburlash
request = HTTPXRequest(connection_pool_size=8, read_timeout=30, write_timeout=30, connect_timeout=30)
app = ApplicationBuilder().token(TOKEN).request(request).post_init(post_init).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("admin", admin_panel))
app.add_handler(CallbackQueryHandler(navigation_handler, pattern=r"^nav\|"))
app.add_handler(CallbackQueryHandler(settings_callback_handler, pattern=r"^set\|"))
app.add_handler(CallbackQueryHandler(handle_callback)) # This now handles all callbacks, including admin and regular buttons
app.add_handler(MessageHandler(filters.PHOTO, handle_photo))
app.add_handler(MessageHandler(filters.VIDEO | filters.ANIMATION | filters.Document.ALL, handle_video))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text)) # Handle general text messages
logger.info("Bot polling rejimi ishga tushmoqda...")
app.run_polling()