File size: 3,140 Bytes
65d3d39 fa7ea4d 65d3d39 5b6bac4 65d3d39 ed9b045 65d3d39 71baa3b 5b6bac4 71baa3b 5b6bac4 1336d47 365ad3d 5b6bac4 1336d47 5b6bac4 71baa3b 5b6bac4 1336d47 365ad3d 1336d47 71baa3b 1336d47 365ad3d 1336d47 365ad3d 1336d47 365ad3d 1336d47 365ad3d 1336d47 71baa3b 65d3d39 5b6bac4 65d3d39 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
"""
https://github.com/agronholm/apscheduler/blob/3.x/examples/schedulers/asyncio_.py
"""
import asyncio
import threading
import time
import pytz
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from loguru import logger
from _playwright.bilibili import bilibili_sign, bilibili_live
from _playwright.v2ex import v2ex_sign
from _playwright.vits import vits_sign
from _playwright.tsdm import tsdm_sign
from _playwright.wuaipojie import wuaipojie_sign
from _requests.aliyundrive import aliyundrive_sign, validate_access_token
def tick():
logger.debug(f"Tick! The time is: {datetime.now():%Y-%m-%d %H:%M:%S}")
def run_scheduler(loop):
asyncio.set_event_loop(loop)
# add task
task = AsyncIOScheduler()
task.configure(timezone=pytz.timezone("Asia/Shanghai"))
task.add_job(tick, "cron", hour="*/2", next_run_time=datetime.now(), coalesce=True)
task.add_job(
bilibili_sign,
"cron",
hour=0,
minute=1,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
bilibili_live,
"cron",
hour=5,
minute=1,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
v2ex_sign,
"cron",
hour=0,
minute=2,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
tsdm_sign,
"cron",
hour=12,
minute=3,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
wuaipojie_sign,
"cron",
hour=12,
minute=4,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
vits_sign,
"cron",
hour="0, 12",
minute=6,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
aliyundrive_sign,
"cron",
hour=12,
minute=7,
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.add_job(
validate_access_token,
"cron",
hour="*/2",
next_run_time=datetime.now(),
coalesce=True,
misfire_grace_time=60 * 60 * 12,
)
task.start()
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass
def start_daemon_scheduler(loop):
t = threading.Thread(target=run_scheduler, args=(loop,), name="schedule", daemon=True)
t.start()
async def kill_daemon_scheduler():
loop = asyncio._get_running_loop()
loop.stop()
if __name__ == "__main__":
schedule_loop = asyncio.new_event_loop()
start_daemon_scheduler(schedule_loop)
time.sleep(3)
asyncio.run_coroutine_threadsafe(kill_daemon_scheduler(), loop=schedule_loop)
|