Spaces:
Sleeping
Sleeping
import base64 | |
from datetime import datetime, timedelta | |
from html import escape | |
from re import compile as compile_re | |
from typing import List | |
from pyrogram.enums import ChatType | |
from pyrogram.types import Message | |
from Powers import TIME_ZONE | |
from Powers.utils.parser import escape_markdown | |
BTN_URL_REGEX = compile_re( | |
r"(\[([^\[]+?)\]\(buttonurl:(?:/{0,2})(.+?)(:same)?\))") | |
async def extract_time(m: Message, time_val: str): | |
"""Extract time from message.""" | |
if any(time_val.endswith(unit) for unit in ("m", "h", "d")): | |
unit = time_val[-1] | |
time_num = time_val[:-1] # type: str | |
if not time_num.isdigit(): | |
await m.reply("Unspecified amount of time.") | |
return "" | |
initial_time = datetime.now(TIME_ZONE) | |
if unit == "m": | |
bantime = initial_time + timedelta(minutes=int(time_num)) | |
elif unit == "h": | |
bantime = initial_time + timedelta(hours=int(time_num)) | |
elif unit == "d": | |
bantime = initial_time + timedelta(days=int(time_num)) | |
else: | |
# how even...? | |
return "" | |
return bantime | |
await m.reply( | |
f"Invalid time type specified. Needed m, h, or s. got: {time_val[-1]}", | |
) | |
return "" | |
async def parse_button(text: str): | |
"""Parse button from text.""" | |
markdown_note = text | |
prev = 0 | |
note_data = "" | |
buttons = [] | |
for match in BTN_URL_REGEX.finditer(markdown_note): | |
n_escapes = 0 | |
to_check = match.start(1) - 1 | |
while to_check > 0 and markdown_note[to_check] == "\\": | |
n_escapes += 1 | |
to_check -= 1 | |
# if even, not escaped -> create button | |
if n_escapes % 2 == 0: | |
# create a thruple with button label, url, and newline status | |
buttons.append( | |
(match.group(2), match.group(3), bool(match.group(4)))) | |
note_data += markdown_note[prev: match.start(1)] | |
prev = match.end(1) | |
# if odd, escaped -> move along | |
else: | |
note_data += markdown_note[prev:to_check] | |
prev = match.start(1) - 1 | |
note_data += markdown_note[prev:] | |
return note_data, buttons | |
async def build_keyboard(buttons): | |
"""Build keyboards from provided buttons.""" | |
keyb = [] | |
for btn in buttons: | |
if btn[-1] and keyb: | |
keyb[-1].append((btn[0], btn[1], "url")) | |
else: | |
keyb.append([(btn[0], btn[1], "url")]) | |
return keyb | |
SMART_OPEN = "“" | |
SMART_CLOSE = "”" | |
START_CHAR = ("'", '"', SMART_OPEN) | |
async def escape_invalid_curly_brackets(text: str, valids: List[str]) -> str: | |
new_text = "" | |
idx = 0 | |
while idx < len(text): | |
if text[idx] == "{": | |
if idx + 1 < len(text) and text[idx + 1] == "{": | |
idx += 2 | |
new_text += "{{{{" | |
continue | |
success = False | |
for v in valids: | |
if text[idx:].startswith("{" + v + "}"): | |
success = True | |
break | |
if success: | |
new_text += text[idx: idx + len(v) + 2] | |
idx += len(v) + 2 | |
continue | |
new_text += "{{" | |
elif text[idx] == "}": | |
if idx + 1 < len(text) and text[idx + 1] == "}": | |
idx += 2 | |
new_text += "}}}}" | |
continue | |
new_text += "}}" | |
else: | |
new_text += text[idx] | |
idx += 1 | |
return new_text | |
async def escape_mentions_using_curly_brackets( | |
m: Message, | |
text: str, | |
parse_words: list, | |
) -> str: | |
if m.chat.type in [ChatType.SUPERGROUP, ChatType.GROUP, ChatType.CHANNEL]: | |
chat_name = escape(m.chat.title) | |
else: | |
chat_name = escape(m.from_user.first_name) | |
teks = await escape_invalid_curly_brackets(text, parse_words) | |
if teks: | |
teks = teks.format( | |
first=escape(m.from_user.first_name), | |
last=escape(m.from_user.last_name or m.from_user.first_name), | |
mention=m.from_user.mention, | |
username=( | |
"@" + (await escape_markdown(escape(m.from_user.username))) | |
if m.from_user.username | |
else m.from_user.mention | |
), | |
fullname=" ".join( | |
[ | |
escape(m.from_user.first_name), | |
escape(m.from_user.last_name), | |
] | |
if m.from_user.last_name | |
else [escape(m.from_user.first_name)], | |
), | |
chatname=chat_name, | |
id=m.from_user.id, | |
) | |
else: | |
teks = "" | |
return teks | |
async def split_quotes(text: str): | |
"""Split quotes in text.""" | |
if not any(text.startswith(char) for char in START_CHAR): | |
return text.split(None, 1) | |
counter = 1 # ignore first char -> is some kind of quote | |
while counter < len(text): | |
if text[counter] == "\\": | |
counter += 1 | |
elif text[counter] == text[0] or ( | |
text[0] == SMART_OPEN and text[counter] == SMART_CLOSE | |
): | |
break | |
counter += 1 | |
else: | |
return text.split(None, 1) | |
# 1 to avoid starting quote, and counter is exclusive so avoids ending | |
key = await remove_escapes(text[1:counter].strip()) | |
# index will be in range, or `else` would have been executed and returned | |
rest = text[counter + 1:].strip() | |
if not key: | |
key = text[0] + text[0] | |
return list(filter(None, [key, rest])) | |
async def remove_escapes(text: str) -> str: | |
"""Remove the escaped from message.""" | |
res = "" | |
is_escaped = False | |
for counter in range(len(text)): | |
if is_escaped: | |
res += text[counter] | |
is_escaped = False | |
elif text[counter] == "\\": | |
is_escaped = True | |
else: | |
res += text[counter] | |
return res | |
async def encode_decode(string: str, to_do="encode"): | |
""" | |
Function to encode or decode strings | |
string: string to be decoded or encoded | |
to_do: encode to encode the string or decode to decode the string | |
""" | |
if to_do.lower() == "encode": | |
encodee = string.encode("ascii") | |
base64_ = base64.b64encode(encodee) | |
return base64_.decode("ascii") | |
elif to_do.lower() == "decode": | |
decodee = string.encode("ascii") | |
base64_ = base64.b64decode(decodee) | |
return base64_.decode("ascii") | |
else: | |
return None | |