# 用于刷新idToken的定时任务 import asyncio import os import time from typing import Optional from datetime import datetime from core.utils import sign_in_with_idp, handle_firebase_response, refresh_token_via_rest class TokenManager: def __init__(self): self.id_token: Optional[str] = None self.last_refresh_time: Optional[float] = None self.refresh_interval = 30 * 60 # 30分钟,单位:秒 self.is_running = False async def get_token(self) -> str: """ 获取当前的 idToken,如果不存在或已过期则刷新 """ if not self.id_token or self._should_refresh(): await self.refresh_token() return self.id_token def _should_refresh(self) -> bool: """ 检查是否需要刷新 token """ if not self.last_refresh_time: return True return time.time() - self.last_refresh_time >= self.refresh_interval async def refresh_token(self): """ 刷新 idToken """ try: if os.getenv("REFRESH_TOKEN", "") == "" or os.getenv("REFRESH_TOKEN", "") == "None": response = await sign_in_with_idp() result = await handle_firebase_response(response) # idToken 实际就是bearer token else: result = await refresh_token_via_rest(os.getenv("REFRESH_TOKEN")) if result is not None: self.id_token = result # 修改配置中的 TOKEN # print(f"Before Token is {os.getenv('TOKEN', '')}") os.environ["TOKEN"] = self.id_token # print(f"Now Token is {os.getenv('TOKEN', '')}") self.last_refresh_time = time.time() print(f"Token refreshed at {datetime.now()}") else: print(f"Failed to refresh token: {result['error']}") except Exception as e: print(f"Error refreshing token: {str(e)}") async def start_auto_refresh(self): """ 启动自动刷新任务 """ if self.is_running: return self.is_running = True while self.is_running: try: await self.refresh_token() # 等待到下次刷新时间 await asyncio.sleep(self.refresh_interval) except Exception as e: print(f"Auto refresh error: {str(e)}") # 发生错误时等待短暂时间后重试 await asyncio.sleep(60) async def stop_auto_refresh(self): """ 停止自动刷新任务 """ self.is_running = False # 使用示例 # async def main(): # # 创建 TokenManager 实例 # token_manager = TokenManager() # # try: # # 启动自动刷新任务 # refresh_task = asyncio.create_task(token_manager.start_auto_refresh()) # # # 模拟应用运行 # while True: # # 获取当前 token # token = await token_manager.get_token() # print(f"Current token: {token[:20]}...") # # # 等待一段时间再次获取 # await asyncio.sleep(300) # 每5分钟打印一次当前token # # except KeyboardInterrupt: # # 处理 Ctrl+C # await token_manager.stop_auto_refresh() # await refresh_task # # # 运行示例 # if __name__ == "__main__": # asyncio.run(main())