Infini / scheduler_jobs.py
yangtb24's picture
Upload 7 files
931e053 verified
import os
import threading
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
import pytz
import config
import data_manager
shanghai_tz = pytz.timezone(config.SHANGHAI_TZ_STR)
scheduler = BackgroundScheduler(daemon=True, timezone=shanghai_tz)
def initial_login_all_accounts_job():
print("程序启动,开始为所有账户执行初始登录...")
threads = []
emails_to_login = data_manager.get_all_account_emails()
for email in emails_to_login:
thread = threading.Thread(target=data_manager.login_and_store_token, args=(email,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print("所有账户初始登录尝试完成。")
def scheduled_login_all_accounts_job():
print(f"[{datetime.now(shanghai_tz)}] 定时任务:开始为所有账户重新登录...")
threads = []
emails_to_login = data_manager.get_all_account_emails()
for email in emails_to_login:
thread = threading.Thread(target=data_manager.login_and_store_token, args=(email,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"[{datetime.now(shanghai_tz)}] 定时任务:所有账户重新登录尝试完成。")
scheduled_fetch_all_profiles_job()
def scheduled_fetch_all_profiles_job():
print(f"[{datetime.now(shanghai_tz)}] 定时任务:开始为所有账户获取 Profile...")
threads = []
emails_to_fetch = data_manager.get_all_account_emails()
for email in emails_to_fetch:
thread = threading.Thread(target=data_manager.fetch_and_store_profile, args=(email,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"[{datetime.now(shanghai_tz)}] 定时任务:所有账户获取 Profile 尝试完成。")
def manual_refresh_all_data_job():
print(f"[{datetime.now(shanghai_tz)}] 手动触发数据刷新...")
threading.Thread(target=scheduled_fetch_all_profiles_job).start()
def initialize_scheduler(is_gunicorn=False):
if not data_manager.parse_accounts():
print("由于账户加载失败,定时任务无法初始化。请检查 ACCOUNTS 环境变量。")
return False
initial_login_all_accounts_job()
scheduled_fetch_all_profiles_job()
login_job_id = 'job_login_all_gunicorn' if is_gunicorn else 'job_login_all'
profile_job_id = 'job_fetch_profiles_gunicorn' if is_gunicorn else 'job_fetch_profiles'
if scheduler.get_job(login_job_id):
scheduler.remove_job(login_job_id)
if scheduler.get_job(profile_job_id):
scheduler.remove_job(profile_job_id)
run_date_login = datetime.now(shanghai_tz) + timedelta(days=3)
run_date_profile = datetime.now(shanghai_tz) + timedelta(minutes=30)
scheduler.add_job(scheduled_login_all_accounts_job, 'interval', days=3, id=login_job_id, next_run_time=run_date_login)
scheduler.add_job(scheduled_fetch_all_profiles_job, 'interval', minutes=30, id=profile_job_id, next_run_time=run_date_profile)
try:
if not scheduler.running:
scheduler.start()
print(f"定时任务已启动 ({'Gunicorn' if is_gunicorn else 'Local'})。")
else:
print(f"定时任务已在运行中 ({'Gunicorn' if is_gunicorn else 'Local'})。")
print(f"APScheduler jobs: {scheduler.get_jobs()}")
return True
except Exception as e:
print(f"启动 APScheduler 失败 ({'Gunicorn' if is_gunicorn else 'Local'}): {e}")
return False