File size: 4,929 Bytes
65d3d39
 
 
8786c0f
65d3d39
 
 
 
 
 
 
 
 
8786c0f
65d3d39
 
 
 
6ed2c3e
365ad3d
65d3d39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8786c0f
65d3d39
 
 
 
 
 
 
 
 
 
 
 
 
 
8786c0f
 
65d3d39
8786c0f
65d3d39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8786c0f
65d3d39
 
 
 
 
e694d87
 
 
8786c0f
65d3d39
8786c0f
 
 
365ad3d
8786c0f
5b6bac4
8786c0f
6ed2c3e
8786c0f
 
 
65d3d39
 
8786c0f
65d3d39
8786c0f
 
65d3d39
 
8786c0f
 
65d3d39
 
 
8786c0f
 
 
65d3d39
 
 
20360a6
 
 
 
 
 
 
 
 
8786c0f
 
 
 
 
 
 
 
 
 
 
65d3d39
 
 
8786c0f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# coding:utf-8
import socket
import asyncio
import aiofiles
import psutil
import requests

from os.path import join
from tld import get_tld
from loguru import logger
from playwright.async_api import async_playwright, Page
from playwright_stealth import stealth_async

from option_mysql import get_aiomysql_instance
from __init__ import (
    gotify_host,
    gotify_port,
    gotify_token,
    gotify_img,
    priority,
    DIR_PATH,
    DEBUG,
    browser_headless,
    browser_proxy,
    ignore_https_errors,
    user_agent,
)


# webdriver
async def init_page(storage_state=None, browser_type: str = "chromium") -> Page:
    if browser_type not in ("chromium", "firefox", "webkit"):
        raise TypeError("unspported browser")

    async with async_playwright() as p:
        if browser_type == "chromium":
            browser = await p.chromium.launch(headless=browser_headless)
        elif browser_type == "firefox":
            browser = await p.firefox.launch(headless=browser_headless)
        else:
            logger.error("初始化浏览器失败")
            exit(-1)

        context = await browser.new_context(
            proxy=browser_proxy,
            ignore_https_errors=ignore_https_errors,
            user_agent=user_agent,
        )
        # 从文件读取 Cookie
        context = await browser.new_context(storage_state=storage_state)
        page = await context.new_page()
        # 隐藏 webdriver 特征
        await stealth_async(page)
        # await page.pause()  # async_playwright 跳出上下文就会 playwright._impl._api_types.Error: Connection closed
        await goto_github(page)
        await dump_cookie(page)  # 创建函数:一个 browser 对应一个 context、page,在 with 内完成多个任务
        await browser.close()


async def goto_github(page):
    await page.goto("https://github.com")


async def dump_cookie(page):
    url = "https://huggingface.co/login"
    save_cookie = join(DIR_PATH, f"{get_domain(url)}.json")
    await page.goto(url)
    logger.debug(DIR_PATH)
    await page.screenshot(path=join(DIR_PATH, "screenshot.png"), full_page=True)  # 截图
    while input("input c to continue: ") != "c":  # 阻塞等待填完登录信息
        continue
    await page.context.storage_state(path=save_cookie)  # 保存 Cookie 到本地


# 获取容器内 IP
def get_local_ip():
    net_card_info = []
    info = psutil.net_if_addrs()
    for k, v in info.items():
        for item in v:
            if item[0] == 2 and not item[1] == "127.0.0.1":
                net_card_info.append(item[1])
    local_ip = net_card_info[0]
    return local_ip


# 从 url 获取域名
def get_domain(url: str) -> str:
    result = get_tld(url, as_object=True)
    domain = result.domain
    return domain


# 从域名获取 IP
def get_pi_ip(host: str) -> str:
    domain_ip = socket.gethostbyname(host)
    return domain_ip


# gotify 推送
def push_msg(
    title: str = "无标题", message: str = "无内容", img_url: str = gotify_img
) -> dict:
    gotify_ip = get_pi_ip(gotify_host)
    url = f"http://{gotify_ip}:{gotify_port}/message?token={gotify_token}"
    data = {
        "title": title,
        "message": message,
        "priority": priority,
        "extras": {
            "client::display": {"contentType": "text/plaintext"},
            "client::notification": {"bigImageUrl": img_url},
        },
    }
    resp = requests.post(url=url, json=data, timeout=20)
    return {"code": resp.status_code, "res": resp.content}


async def get_cookie_from_pi() -> dict:
    sql = f"SELECT cookie_name, cookie_value FROM cookie"
    pool = await get_aiomysql_instance()
    data = await pool.query(sql)
    for d in data:
        log_file = join(DIR_PATH, f"cookie/{d['cookie_name']}.json")
        async with aiofiles.open(log_file, mode="w") as handle:
            await handle.write(d["cookie_value"])
    return data[0]["cookie_name"]


async def read_logfile(log_file: str) -> str:
    async with aiofiles.open(log_file, mode="r", encoding="utf-8") as f:
        data = await f.read()
        return data


# 获取每日歌曲用于天使动漫签到
async def get_streevoice_today_song(page):
    await page.goto("https://streetvoice.com/")
    song = await page.locator("#player-collapse h4").get_by_role("link").text_content()
    logger.info(f"街声今日歌曲:{song}")
    await page.go_back()
    return song


async def main():
    # await init_page()
    # res = await get_bilibili_live_rooms_from_pi()
    # res = await get_cookie_from_pi()
    # res = await get_aliyundrive_refresh_token()
    # res = await update_aliyundrive_access_token("no use")
    # res = await get_bilibili_live_rooms_from_pi()
    res = await read_logfile("log/bilibili.log")
    logger.debug(f"utils test: {res}")


if __name__ == "__main__":
    logger.debug(f"DEBUG Mode: {DEBUG}")
    logger.debug(f"DIR_PATH: {DIR_PATH}")
    # asyncio.run(main())