File size: 13,350 Bytes
b202865 bc96bf6 83d2a24 7174173 83d2a24 517e772 b202865 bc96bf6 b202865 bc96bf6 7174173 83d2a24 10cf6e6 7174173 b202865 10cf6e6 83d2a24 10cf6e6 bc96bf6 b202865 10cf6e6 bc96bf6 b202865 10cf6e6 83d2a24 10cf6e6 83d2a24 10cf6e6 83d2a24 10cf6e6 83d2a24 10cf6e6 bc96bf6 10cf6e6 83d2a24 10cf6e6 83d2a24 10cf6e6 b202865 10cf6e6 83d2a24 10cf6e6 83d2a24 10cf6e6 b202865 bc96bf6 83d2a24 bc96bf6 83d2a24 bc96bf6 83d2a24 bc96bf6 b202865 10cf6e6 bc96bf6 b202865 83d2a24 10cf6e6 83d2a24 10cf6e6 83d2a24 bc96bf6 b202865 bc96bf6 10cf6e6 83d2a24 bc96bf6 83d2a24 bc96bf6 83d2a24 bc96bf6 83d2a24 bc96bf6 83d2a24 bc96bf6 83d2a24 bc96bf6 10cf6e6 bc96bf6 10cf6e6 bc96bf6 b202865 bc96bf6 83d2a24 bc96bf6 10cf6e6 bc96bf6 b202865 bc96bf6 83d2a24 bc96bf6 7174173 bc96bf6 83d2a24 7174173 b202865 bc96bf6 b202865 bc96bf6 83d2a24 bc96bf6 83d2a24 bc96bf6 b202865 bc96bf6 83d2a24 bc96bf6 7174173 83d2a24 7174173 83d2a24 e44770e bc96bf6 b202865 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
import logging, os, re, asyncio, requests, aiohttp
from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid
from pyrogram.types import Message, InlineKeyboardButton
from pyrogram import filters, enums
from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORT_URL, SHORT_API
from imdb import Cinemagoer
from typing import Union, List
from datetime import datetime, timedelta
from database.users_chats_db import db
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
BTN_URL_REGEX = re.compile(r"(\[([^\[]+?)\]\((buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?\))")
BANNED = {}
SMART_OPEN = '“'
SMART_CLOSE = '”'
START_CHAR = ('\'', '"', SMART_OPEN)
# temp db for banned
class temp(object):
BANNED_USERS = []
BANNED_CHATS = []
CURRENT = 0
CANCEL = False
MELCOW = {}
U_NAME = None
B_NAME = None
SETTINGS = {}
GP_BUTTONS = {}
PM_BUTTONS = {}
PM_SPELL = {}
GP_SPELL = {}
async def is_subscribed(bot, query):
try:
user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id)
except UserNotParticipant:
logger.debug(f"User {query.from_user.id} is not subscribed to AUTH_CHANNEL")
pass
except Exception as e:
logger.error(f"Error checking subscription for user {query.from_user.id}: {e}")
print(e)
else:
if user.status != enums.ChatMemberStatus.BANNED:
logger.debug(f"User {query.from_user.id} is subscribed to AUTH_CHANNEL")
return True
return False
async def get_poster(query, bulk=False, id=False, file=None):
imdb = Cinemagoer()
if not id:
query = (query.strip()).lower()
title = query
year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE)
if year:
year = list_to_str(year[:1])
title = (query.replace(year, "")).strip()
elif file is not None:
year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE)
if year:
year = list_to_str(year[:1])
else:
year = None
try:
movieid = imdb.search_movie(title.lower(), results=10)
except Exception as e:
logger.error(f"Error searching movie: {e}")
return None
if not movieid:
logger.debug(f"No movie found for query: {query}")
return None
if year:
filtered=list(filter(lambda k: str(k.get('year')) == str(year), movieid))
if not filtered:
filtered = movieid
else:
filtered = movieid
movieid=list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered))
if not movieid:
movieid = filtered
if bulk:
return movieid
movieid = movieid[0].movieID
else:
movieid = query
movie = imdb.get_movie(movieid)
if movie.get("original air date"):
date = movie["original air date"]
elif movie.get("year"):
date = movie.get("year")
else:
date = "N/A"
plot = ""
if not LONG_IMDB_DESCRIPTION:
plot = movie.get('plot')
if plot and len(plot) > 0:
plot = plot[0]
else:
plot = movie.get('plot outline')
if plot and len(plot) > 800:
plot = plot[0:800] + "..."
return {
'title': movie.get('title'),
'votes': movie.get('votes'),
"aka": list_to_str(movie.get("akas")),
"seasons": movie.get("number of seasons"),
"box_office": movie.get('box office'),
'localized_title': movie.get('localized title'),
'kind': movie.get("kind"),
"imdb_id": f"tt{movie.get('imdbID')}",
"cast": list_to_str(movie.get("cast")),
"runtime": list_to_str(movie.get("runtimes")),
"countries": list_to_str(movie.get("countries")),
"certificates": list_to_str(movie.get("certificates")),
"languages": list_to_str(movie.get("languages")),
"director": list_to_str(movie.get("director")),
"writer":list_to_str(movie.get("writer")),
"producer":list_to_str(movie.get("producer")),
"composer":list_to_str(movie.get("composer")) ,
"cinematographer":list_to_str(movie.get("cinematographer")),
"music_team": list_to_str(movie.get("music department")),
"distributors": list_to_str(movie.get("distributors")),
'release_date': date,
'year': movie.get('year'),
'genres': list_to_str(movie.get("genres")),
'poster': movie.get('full-size cover url'),
'plot': plot,
'rating': str(movie.get("rating")),
'url':f'https://www.imdb.com/title/tt{movieid}'
}
def list_to_str(k):
if not k: return "N/A"
elif len(k) == 1: return str(k[0])
elif MAX_LIST_ELM:
k = k[:int(MAX_LIST_ELM)]
return ' '.join(f'{elem}, ' for elem in k)
else:
return ' '.join(f'{elem}, ' for elem in k)
__repo__ = "https://github.com/MrMKN/PROFESSOR-BOT"
__version__ = "PROFESSOR-BOT ᴠ4.5.0"
__license__ = "GNU GENERAL PUBLIC LICENSE V2"
__copyright__ = "Copyright (C) 2023-present MrMKN <https://github.com/MrMKN>"
async def search_gagala(text):
usr_agent = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/61.0.3163.100 Safari/537.36'
}
text = text.replace(" ", '+')
url = f'https://www.google.com/search?q={text}'
response = requests.get(url, headers=usr_agent)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
titles = soup.find_all('h3')
return [title.getText() for title in titles]
async def get_settings(group_id):
settings = temp.SETTINGS.get(group_id)
if not settings:
settings = await db.get_settings(group_id)
temp.SETTINGS[group_id] = settings
return settings
async def save_group_settings(group_id, key, value):
current = await get_settings(group_id)
current[key] = value
temp.SETTINGS[group_id] = current
await db.update_settings(group_id, current)
def get_size(size):
units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"]
size = float(size)
i = 0
while size >= 1024.0 and i < len(units):
i += 1
size /= 1024.0
return "%.2f %s" % (size, units[i])
def get_file_id(msg: Message):
if not msg.media: return None
for message_type in ("photo", "animation", "audio", "document", "video", "video_note", "voice", "sticker"):
obj = getattr(msg, message_type)
if obj:
setattr(obj, "message_type", message_type)
return obj
def extract_user(message: Message) -> Union[int, str]:
user_id = None
user_first_name = None
if message.reply_to_message:
user_id = message.reply_to_message.from_user.id
user_first_name = message.reply_to_message.from_user.first_name
elif len(message.command) > 1:
if (len(message.entities) > 1 and message.entities[1].type == enums.MessageEntityType.TEXT_MENTION):
required_entity = message.entities[1]
user_id = required_entity.user.id
user_first_name = required_entity.user.first_name
else:
user_id = message.command[1]
user_first_name = user_id
try:
user_id = int(user_id)
except ValueError: pass
else:
user_id = message.from_user.id
user_first_name = message.from_user.first_name
return (user_id, user_first_name)
def split_quotes(text: str) -> List:
if not any(text.startswith(char) for char in START_CHAR):
return text.split(None, 1)
counter = 1 # ignore first char -> is some kind of quote
while counter < len(text):
if text[counter] == "\\":
counter += 1
elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE):
break
counter += 1
else:
return text.split(None, 1)
# 1 to avoid starting quote, and counter is exclusive so avoids ending
key = remove_escapes(text[1:counter].strip())
# index will be in range, or `else` would have been executed and returned
rest = text[counter + 1:].strip()
if not key:
key = text[0] + text[0]
return list(filter(None, [key, rest]))
def parser(text, keyword, cb_data):
if "buttonalert" in text: text = (text.replace("\n", "\\n").replace("\t", "\\t"))
buttons = []
note_data = ""
prev = 0
i = 0
alerts = []
for match in BTN_URL_REGEX.finditer(text):
n_escapes = 0
to_check = match.start(1) - 1
while to_check > 0 and text[to_check] == "\\":
n_escapes += 1
to_check -= 1
# if even, not escaped -> create button
if n_escapes % 2 == 0:
note_data += text[prev:match.start(1)]
prev = match.end(1)
if match.group(3) == "buttonalert":
# create a triple with button label, url, and newline status
if bool(match.group(5)) and buttons:
buttons[-1].append(InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}"))
else:
buttons.append([InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}")])
i += 1
alerts.append(match.group(4))
elif bool(match.group(5)) and buttons:
buttons[-1].append(InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", "")))
else:
buttons.append([InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", ""))])
else:
note_data += text[prev:to_check]
prev = match.start(1) - 1
else: note_data += text[prev:]
try: return note_data, buttons, alerts
except: return note_data, buttons, None
def remove_escapes(text: str) -> str:
res = ""
is_escaped = False
for counter in range(len(text)):
if is_escaped:
res += text[counter]
is_escaped = False
elif text[counter] == "\\":
is_escaped = True
else:
res += text[counter]
return res
def humanbytes(size):
if not size:
return ""
power = 2**10
n = 0
Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'}
while size > power:
size /= power
n += 1
return str(round(size, 2)) + " " + Dic_powerN[n] + 'B'
def get_time(seconds):
periods = [('ᴅ', 86400), ('ʜ', 3600), ('ᴍ', 60), ('ꜱ', 1)]
result = ''
for period_name, period_seconds in periods:
if seconds >= period_seconds:
period_value, seconds = divmod(seconds, period_seconds)
result += f'{int(period_value)}{period_name}'
return result
async def get_shortlink(link):
url = f'{SHORT_URL}/api'
params = {'api': SHORT_API, 'url': link}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response:
data = await response.json()
if data["status"] == "success":
return data['shortenedUrl']
else:
logger.error(f"Error: {data['message']}")
return link
except Exception as e:
logger.error(f"Error getting shortlink: {e}")
return link
def extract_time(time_val):
if any(time_val.endswith(unit) for unit in ("s", "m", "h", "d")):
unit = time_val[-1]
time_num = time_val[:-1] # type: str
if not time_num.isdigit():
return None
if unit == "s":
bantime = datetime.now() + timedelta(seconds=int(time_num))
elif unit == "m":
bantime = datetime.now() + timedelta(minutes=int(time_num))
elif unit == "h":
bantime = datetime.now() + timedelta(hours=int(time_num))
elif unit == "d":
bantime = datetime.now() + timedelta(days=int(time_num))
else:
return None
return bantime
else:
return None
async def admin_check(message: Message) -> bool:
if not message.from_user: return False
if message.chat.type not in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: return False
if message.from_user.id in [777000, 1087968824]: return True
client = message._client
chat_id = message.chat.id
user_id = message.from_user.id
try:
check_status = await client.get_chat_member(chat_id=chat_id, user_id=user_id)
admin_strings = [enums.ChatMemberStatus.OWNER, enums.ChatMemberStatus.ADMINISTRATOR]
if check_status.status not in admin_strings:
logger.debug(f"User {user_id} is not an admin in chat {chat_id}")
return False
else:
logger.debug(f"User {user_id} is an admin in chat {chat_id}")
return True
except Exception as e:
logger.error(f"Error checking admin status for user {user_id} in chat {chat_id}: {e}")
return False
async def admin_filter(filt, client, message):
return await admin_check(message) |