import os import time import json import random import string import sqlite3 import hashlib import requests # SQLite3 class class EazySQLite3(): def __init__(self, db: str = "sqlite.db"): self.connection = sqlite3.connect(db) #self.cursor = sqlite_connection.cursor() def query(self, query: str = ""): cursor = self.connection.cursor() try: cursor.execute(query) self.connection.commit() try: record = cursor.fetchall() cursor.close() return {"status": True, "details": record} except: try: cursor.close() except: pass finally: return {"status": True, "details": None} except Exception as e: return {"status": False, "details": str(e)} def close(self): self.connection.close() # Deleting audio def deleteAudio(path: str, waitInSeconds: int = 0): config = configFile() os.system("rm {}/{}".format(config['static-path'], path)) # Config file reading def configFile(): with open("/app/routes/config.json", "r") as file: config = json.loads(file.read()) return config # Signatures check!!! Wow!!! def checkSignature(signature: str): config = configFile() db = EazySQLite3(config['signatures-db']) query = db.query(f"SELECT * FROM `table` WHERE (`key` LIKE \"{signature}\" OR `key`=\"{signature}\") AND `endtime`>{time.time()}") #if len(query['details']) == 0: return False for row in query['details']: if signature == row[0]: newTTL = round(time.time())+86400 if newTTL > row[1]: db.query(f"UPDATE `table` SET `endtime`={newTTL} WHERE `key`=\"{signature}\"") return True return False # Hook for yt-dlp def thisIsHook(d): try: if d['total_bytes'] > 52428800: print("\nFILE IS BIG, ABORT!!!\n") raise Exception(f"Too long file (recieved {d['total_bytes']/1048576}MB, max is 50MB)") except: pass # Get variable from request def getFromRequest(request, thing: str): try: if request.method == 'POST': reqthing = request.form[thing] else: reqthing = request.args[thing] return reqthing except: return None # Recognizing things def req(access_token, meth, path, params, **kwargs): full_url = "https://api.wit.ai" + path headers = { "authorization": "Bearer " + access_token, "accept": "application/vnd.wit." + "20221114" + "+json", } headers.update(kwargs.pop("headers", {})) rsp = requests.request(meth, full_url, headers=headers, params=params, **kwargs) if rsp.status_code > 200: raise Exception( str(rsp.status_code) + " (" + rsp.reason + ")" ) try: r = rsp.json() except: r = rsp.text if "error" in r: raise Exception(json["error"]) return r def dictation(access_token, audio_file, headers=None, verbose=None): params = {} headers = headers or {} if verbose: params["verbose"] = True resp = req( access_token, "POST", "/dictation", params, data=audio_file, headers=headers, ) return resp def clean(text): if text not in ['', None]: return text.replace('?', '').replace('!', '').replace(';', '').replace('.', '').replace(',', '') def delivering(text): result = [] temp = None toPush = None tempLength = 0 for dirtLine in text.split('\n'): if '"text"' in dirtLine: line = dirtLine[11:-1] if temp == None: temp = line else: if temp in line: toPush = line else: temp = line result.append(toPush) return ' '.join(result) def devRaw(text): result = [] for line in text.split('\n'): #line[11:-1] if '"text"' in line: result.append(line[11:-1]) return result def randString(len: int = 16): return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(len))