import socket |
import asyncio |
import aiofiles |
import psutil |
import requests |
from os.path import join |
from tld import get_tld |
from loguru import logger |
from playwright.async_api import async_playwright, Page |
from playwright_stealth import stealth_async |
from option_mysql import get_aiomysql_instance |
from __init__ import ( |
gotify_host, |
gotify_port, |
gotify_token, |
gotify_img, |
priority, |
browser_headless, |
browser_proxy, |
ignore_https_errors, |
user_agent, |
) |
async def init_page(storage_state=None, browser_type: str = "chromium") -> Page: |
if browser_type not in ("chromium", "firefox", "webkit"): |
raise TypeError("unspported browser") |
async with async_playwright() as p: |
if browser_type == "chromium": |
browser = await p.chromium.launch(headless=browser_headless) |
elif browser_type == "firefox": |
browser = await p.firefox.launch(headless=browser_headless) |
else: |
logger.error("初始化浏览器失败") |
exit(-1) |
context = await browser.new_context( |
proxy=browser_proxy, |
ignore_https_errors=ignore_https_errors, |
user_agent=user_agent, |
) |
context = await browser.new_context(storage_state=storage_state) |
page = await context.new_page() |
await stealth_async(page) |
await goto_github(page) |
await dump_cookie(page) |
await browser.close() |
async def goto_github(page): |
await page.goto("https://github.com") |
async def dump_cookie(page): |
url = "https://huggingface.co/login" |
save_cookie = join(DIR_PATH, f"{get_domain(url)}.json") |
await page.goto(url) |
logger.debug(DIR_PATH) |
await page.screenshot(path=join(DIR_PATH, "screenshot.png"), full_page=True) |
while input("input c to continue: ") != "c": |
continue |
await page.context.storage_state(path=save_cookie) |
def get_local_ip(): |
net_card_info = [] |
info = psutil.net_if_addrs() |
for k, v in info.items(): |
for item in v: |
if item[0] == 2 and not item[1] == "": |
net_card_info.append(item[1]) |
local_ip = net_card_info[0] |
return local_ip |
def get_domain(url: str) -> str: |
result = get_tld(url, as_object=True) |
domain = result.domain |
return domain |
def get_pi_ip(host: str) -> str: |
domain_ip = socket.gethostbyname(host) |
return domain_ip |
def push_msg( |
title: str = "无标题", message: str = "无内容", img_url: str = gotify_img |
) -> dict: |
gotify_ip = get_pi_ip(gotify_host) |
url = f"http://{gotify_ip}:{gotify_port}/message?token={gotify_token}" |
data = { |
"title": title, |
"message": message, |
"priority": priority, |
"extras": { |
"client::display": {"contentType": "text/plaintext"}, |
"client::notification": {"bigImageUrl": img_url}, |
}, |
} |
resp = requests.post(url=url, json=data, timeout=20) |
return {"code": resp.status_code, "res": resp.content} |
async def get_cookie_from_pi() -> dict: |
sql = f"SELECT cookie_name, cookie_value FROM cookie" |
pool = await get_aiomysql_instance() |
data = await pool.query(sql) |
for d in data: |
log_file = join(DIR_PATH, f"cookie/{d['cookie_name']}.json") |
async with aiofiles.open(log_file, mode="w") as handle: |
await handle.write(d["cookie_value"]) |
return data[0]["cookie_name"] |
async def read_logfile(log_file: str) -> str: |
async with aiofiles.open(log_file, mode="r", encoding="utf-8") as f: |
data = await f.read() |
return data |
async def get_streevoice_today_song(page): |
await page.goto("https://streetvoice.com/") |
song = await page.locator("#player-collapse h4").get_by_role("link").text_content() |
logger.info(f"街声今日歌曲:{song}") |
await page.go_back() |
return song |
async def main(): |
res = await read_logfile("log/bilibili.log") |
logger.debug(f"utils test: {res}") |
if __name__ == "__main__": |
logger.debug(f"DEBUG Mode: {DEBUG}") |
logger.debug(f"DIR_PATH: {DIR_PATH}") |