| |
|
|
| import urllib.parse |
| import aiohttp |
| import aiofiles |
| from WebStreamer.vars import Var |
| from WebStreamer.utils.database import Database |
| from WebStreamer.utils.human_readable import humanbytes |
| db = Database(Var.DATABASE_URL, Var.SESSION_NAME) |
|
|
| async def render_page(db_id): |
| html: str = "" |
| file_data=await db.get_file(db_id) |
| src = urllib.parse.urljoin(Var.URL, f'dl/{file_data["_id"]}') |
| if str((file_data['mime_type']).split('/')[0].strip()) == 'video': |
| async with aiofiles.open('WebStreamer/template/req.html') as r: |
| heading = f"Watch {file_data['file_name']}" |
| tag = (file_data['mime_type']).split('/')[0].strip() |
| html = (await r.read()).replace('tag', tag) % (heading, file_data['file_name'], src) |
| elif str((file_data['mime_type']).split('/')[0].strip()) == 'audio': |
| async with aiofiles.open('WebStreamer/template/req.html') as r: |
| heading = f"Listen {file_data['file_name']}" |
| tag = (file_data['mime_type']).split('/')[0].strip() |
| html = (await r.read()).replace('tag', tag) % (heading, file_data['file_name'], src) |
| else: |
| async with aiofiles.open('WebStreamer/template/dl.html') as r: |
| async with aiohttp.ClientSession() as s: |
| async with s.get(src) as u: |
| heading = f"Download {file_data['file_name']}" |
| file_size = humanbytes(int(u.headers.get('Content-Length'))) |
| html = (await r.read()) % (heading, file_data['file_name'], src, file_size) |
| return html |
|
|