Professor / utils.py
azils3's picture
Update utils.py
83d2a24 verified
import logging, os, re, asyncio, requests, aiohttp
from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid
from pyrogram.types import Message, InlineKeyboardButton
from pyrogram import filters, enums
from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORT_URL, SHORT_API
from imdb import Cinemagoer
from typing import Union, List
from datetime import datetime, timedelta
from database.users_chats_db import db
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
BTN_URL_REGEX = re.compile(r"(\[([^\[]+?)\]\((buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?\))")
BANNED = {}
SMART_OPEN = '“'
SMART_CLOSE = '”'
START_CHAR = ('\'', '"', SMART_OPEN)
# temp db for banned
class temp(object):
BANNED_USERS = []
BANNED_CHATS = []
CURRENT = 0
CANCEL = False
MELCOW = {}
U_NAME = None
B_NAME = None
SETTINGS = {}
GP_BUTTONS = {}
PM_BUTTONS = {}
PM_SPELL = {}
GP_SPELL = {}
async def is_subscribed(bot, query):
try:
user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id)
except UserNotParticipant:
logger.debug(f"User {query.from_user.id} is not subscribed to AUTH_CHANNEL")
pass
except Exception as e:
logger.error(f"Error checking subscription for user {query.from_user.id}: {e}")
print(e)
else:
if user.status != enums.ChatMemberStatus.BANNED:
logger.debug(f"User {query.from_user.id} is subscribed to AUTH_CHANNEL")
return True
return False
async def get_poster(query, bulk=False, id=False, file=None):
imdb = Cinemagoer()
if not id:
query = (query.strip()).lower()
title = query
year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE)
if year:
year = list_to_str(year[:1])
title = (query.replace(year, "")).strip()
elif file is not None:
year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE)
if year:
year = list_to_str(year[:1])
else:
year = None
try:
movieid = imdb.search_movie(title.lower(), results=10)
except Exception as e:
logger.error(f"Error searching movie: {e}")
return None
if not movieid:
logger.debug(f"No movie found for query: {query}")
return None
if year:
filtered=list(filter(lambda k: str(k.get('year')) == str(year), movieid))
if not filtered:
filtered = movieid
else:
filtered = movieid
movieid=list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered))
if not movieid:
movieid = filtered
if bulk:
return movieid
movieid = movieid[0].movieID
else:
movieid = query
movie = imdb.get_movie(movieid)
if movie.get("original air date"):
date = movie["original air date"]
elif movie.get("year"):
date = movie.get("year")
else:
date = "N/A"
plot = ""
if not LONG_IMDB_DESCRIPTION:
plot = movie.get('plot')
if plot and len(plot) > 0:
plot = plot[0]
else:
plot = movie.get('plot outline')
if plot and len(plot) > 800:
plot = plot[0:800] + "..."
return {
'title': movie.get('title'),
'votes': movie.get('votes'),
"aka": list_to_str(movie.get("akas")),
"seasons": movie.get("number of seasons"),
"box_office": movie.get('box office'),
'localized_title': movie.get('localized title'),
'kind': movie.get("kind"),
"imdb_id": f"tt{movie.get('imdbID')}",
"cast": list_to_str(movie.get("cast")),
"runtime": list_to_str(movie.get("runtimes")),
"countries": list_to_str(movie.get("countries")),
"certificates": list_to_str(movie.get("certificates")),
"languages": list_to_str(movie.get("languages")),
"director": list_to_str(movie.get("director")),
"writer":list_to_str(movie.get("writer")),
"producer":list_to_str(movie.get("producer")),
"composer":list_to_str(movie.get("composer")) ,
"cinematographer":list_to_str(movie.get("cinematographer")),
"music_team": list_to_str(movie.get("music department")),
"distributors": list_to_str(movie.get("distributors")),
'release_date': date,
'year': movie.get('year'),
'genres': list_to_str(movie.get("genres")),
'poster': movie.get('full-size cover url'),
'plot': plot,
'rating': str(movie.get("rating")),
'url':f'https://www.imdb.com/title/tt{movieid}'
}
def list_to_str(k):
if not k: return "N/A"
elif len(k) == 1: return str(k[0])
elif MAX_LIST_ELM:
k = k[:int(MAX_LIST_ELM)]
return ' '.join(f'{elem}, ' for elem in k)
else:
return ' '.join(f'{elem}, ' for elem in k)
__repo__ = "https://github.com/MrMKN/PROFESSOR-BOT"
__version__ = "PROFESSOR-BOT ᴠ4.5.0"
__license__ = "GNU GENERAL PUBLIC LICENSE V2"
__copyright__ = "Copyright (C) 2023-present MrMKN <https://github.com/MrMKN>"
async def search_gagala(text):
usr_agent = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/61.0.3163.100 Safari/537.36'
}
text = text.replace(" ", '+')
url = f'https://www.google.com/search?q={text}'
response = requests.get(url, headers=usr_agent)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
titles = soup.find_all('h3')
return [title.getText() for title in titles]
async def get_settings(group_id):
settings = temp.SETTINGS.get(group_id)
if not settings:
settings = await db.get_settings(group_id)
temp.SETTINGS[group_id] = settings
return settings
async def save_group_settings(group_id, key, value):
current = await get_settings(group_id)
current[key] = value
temp.SETTINGS[group_id] = current
await db.update_settings(group_id, current)
def get_size(size):
units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"]
size = float(size)
i = 0
while size >= 1024.0 and i < len(units):
i += 1
size /= 1024.0
return "%.2f %s" % (size, units[i])
def get_file_id(msg: Message):
if not msg.media: return None
for message_type in ("photo", "animation", "audio", "document", "video", "video_note", "voice", "sticker"):
obj = getattr(msg, message_type)
if obj:
setattr(obj, "message_type", message_type)
return obj
def extract_user(message: Message) -> Union[int, str]:
user_id = None
user_first_name = None
if message.reply_to_message:
user_id = message.reply_to_message.from_user.id
user_first_name = message.reply_to_message.from_user.first_name
elif len(message.command) > 1:
if (len(message.entities) > 1 and message.entities[1].type == enums.MessageEntityType.TEXT_MENTION):
required_entity = message.entities[1]
user_id = required_entity.user.id
user_first_name = required_entity.user.first_name
else:
user_id = message.command[1]
user_first_name = user_id
try:
user_id = int(user_id)
except ValueError: pass
else:
user_id = message.from_user.id
user_first_name = message.from_user.first_name
return (user_id, user_first_name)
def split_quotes(text: str) -> List:
if not any(text.startswith(char) for char in START_CHAR):
return text.split(None, 1)
counter = 1 # ignore first char -> is some kind of quote
while counter < len(text):
if text[counter] == "\\":
counter += 1
elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE):
break
counter += 1
else:
return text.split(None, 1)
# 1 to avoid starting quote, and counter is exclusive so avoids ending
key = remove_escapes(text[1:counter].strip())
# index will be in range, or `else` would have been executed and returned
rest = text[counter + 1:].strip()
if not key:
key = text[0] + text[0]
return list(filter(None, [key, rest]))
def parser(text, keyword, cb_data):
if "buttonalert" in text: text = (text.replace("\n", "\\n").replace("\t", "\\t"))
buttons = []
note_data = ""
prev = 0
i = 0
alerts = []
for match in BTN_URL_REGEX.finditer(text):
n_escapes = 0
to_check = match.start(1) - 1
while to_check > 0 and text[to_check] == "\\":
n_escapes += 1
to_check -= 1
# if even, not escaped -> create button
if n_escapes % 2 == 0:
note_data += text[prev:match.start(1)]
prev = match.end(1)
if match.group(3) == "buttonalert":
# create a triple with button label, url, and newline status
if bool(match.group(5)) and buttons:
buttons[-1].append(InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}"))
else:
buttons.append([InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}")])
i += 1
alerts.append(match.group(4))
elif bool(match.group(5)) and buttons:
buttons[-1].append(InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", "")))
else:
buttons.append([InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", ""))])
else:
note_data += text[prev:to_check]
prev = match.start(1) - 1
else: note_data += text[prev:]
try: return note_data, buttons, alerts
except: return note_data, buttons, None
def remove_escapes(text: str) -> str:
res = ""
is_escaped = False
for counter in range(len(text)):
if is_escaped:
res += text[counter]
is_escaped = False
elif text[counter] == "\\":
is_escaped = True
else:
res += text[counter]
return res
def humanbytes(size):
if not size:
return ""
power = 2**10
n = 0
Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'}
while size > power:
size /= power
n += 1
return str(round(size, 2)) + " " + Dic_powerN[n] + 'B'
def get_time(seconds):
periods = [('ᴅ', 86400), ('ʜ', 3600), ('ᴍ', 60), ('ꜱ', 1)]
result = ''
for period_name, period_seconds in periods:
if seconds >= period_seconds:
period_value, seconds = divmod(seconds, period_seconds)
result += f'{int(period_value)}{period_name}'
return result
async def get_shortlink(link):
url = f'{SHORT_URL}/api'
params = {'api': SHORT_API, 'url': link}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response:
data = await response.json()
if data["status"] == "success":
return data['shortenedUrl']
else:
logger.error(f"Error: {data['message']}")
return link
except Exception as e:
logger.error(f"Error getting shortlink: {e}")
return link
def extract_time(time_val):
if any(time_val.endswith(unit) for unit in ("s", "m", "h", "d")):
unit = time_val[-1]
time_num = time_val[:-1] # type: str
if not time_num.isdigit():
return None
if unit == "s":
bantime = datetime.now() + timedelta(seconds=int(time_num))
elif unit == "m":
bantime = datetime.now() + timedelta(minutes=int(time_num))
elif unit == "h":
bantime = datetime.now() + timedelta(hours=int(time_num))
elif unit == "d":
bantime = datetime.now() + timedelta(days=int(time_num))
else:
return None
return bantime
else:
return None
async def admin_check(message: Message) -> bool:
if not message.from_user: return False
if message.chat.type not in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: return False
if message.from_user.id in [777000, 1087968824]: return True
client = message._client
chat_id = message.chat.id
user_id = message.from_user.id
try:
check_status = await client.get_chat_member(chat_id=chat_id, user_id=user_id)
admin_strings = [enums.ChatMemberStatus.OWNER, enums.ChatMemberStatus.ADMINISTRATOR]
if check_status.status not in admin_strings:
logger.debug(f"User {user_id} is not an admin in chat {chat_id}")
return False
else:
logger.debug(f"User {user_id} is an admin in chat {chat_id}")
return True
except Exception as e:
logger.error(f"Error checking admin status for user {user_id} in chat {chat_id}: {e}")
return False
async def admin_filter(filt, client, message):
return await admin_check(message)