notdiamond2api / auth_utils.py
ffreemt
Update files
dabcedb
import requests
import json
import base64
import re
import logging
class AuthManager:
"""
AuthManager类用于管理身份验证过程,包括获取API密钥、用户信息和处理刷新令牌等操作。
Attributes:
session (requests.Session): 一个session对象,用于保持与服务器之间的连接。
api_key (str): API密钥,用于身份验证。
user_info (dict): 用户信息字典。
refresh_token (str): 用于刷新的令牌。
user_id (str): 用户唯一标识符。
next_action (str): 下一步操作的标识符。
logger (logging.Logger): 日志记录器。
Methods:
log_values(): 记录关键信息到日志。
get_next_action(): 获取下一步的操作。
fetch_apikey(): 获取并存储API密钥。
login(email, password): 使用email和密码进行登录。
refresh_user_token(): 刷新用户的身份验证令牌。
base64_encode(data): 以JSON格式编码然后转换为Base64。
get_cookie_value(): 获取cookie值用于会话验证。
"""
email = ""
password = ""
def __init__(self, email, password):
self.email = email
self.password = password
self.session = requests.Session()
self.api_key = ""
self.user_info = {}
self.refresh_token = ""
self.user_id = ""
self.next_action = ""
self.logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
self.login()
self.fetch_apikey()
self.get_next_action()
self.log_values()
def log_values(self):
"""
记录刷新令牌、用户ID和下一步操作到日志中。
"""
self.logger.info(f"\033[92mRefresh Token: {self.refresh_token}\033[0m")
self.logger.info(f"\033[94mUser ID: {self.user_id}\033[0m")
self.logger.info(f"\033[96mNext Action: {self.next_action}\033[0m")
def get_next_action(self):
"""
尝试从服务器获取下一步操作标识符,并更新实例变量。
Returns:
str: 下一步的操作标识符,如果未找到则返回空字符串。
"""
try:
url = "https://chat.notdiamond.ai/"
headers = {
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
'cookie': self.get_cookie_value()
}
response = self.session.get(url, headers=headers)
response.raise_for_status()
script_tags = re.findall(r'<script[^>]*>(.*?)<\/script>', response.text, re.DOTALL)
matches = []
for script_content in script_tags:
if "static/chunks/app/(chat)/page-" in script_content:
matches.extend(re.findall(r'static/chunks/[a-zA-Z0-9]+-[a-zA-Z0-9]+\.js', script_content))
if not matches:
self.logger.warning("未找到匹配的脚本标签")
else:
full_script_urls = [f"https://chat.notdiamond.ai/_next/{match}" for match in matches]
for full_url in full_script_urls:
try:
script_response = self.session.get(full_url, headers=headers)
script_response.raise_for_status()
match = re.search(r'v=\(0,s.\$\)\("([^"]+)"\)', script_response.text)
if match:
desired_value = match.group(1)
self.next_action = desired_value
return self.next_action
except requests.RequestException as e:
self.logger.error(f"请求脚本URL时发生错误 {full_url}: {e}")
return ""
return ""
except requests.RequestException as e:
return ""
def fetch_apikey(self):
"""
获取API密钥并将其存储在本地文件中,以备后用。
Returns:
str: 获取的API密钥。
"""
if self.api_key:
return self.api_key
try:
url = "https://chat.notdiamond.ai/login"
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/128.0.0.0 Safari/537.36'
}
response = self.session.get(url, headers=headers)
response.raise_for_status()
match = re.search(r'<script src="(/_next/static/chunks/app/layout-[^"]+\.js)"', response.text)
if not match:
self.logger.warning("未找到匹配的脚本标签")
return ""
js_url = f"https://chat.notdiamond.ai{match.group(1)}"
response = self.session.get(js_url, headers=headers)
response.raise_for_status()
match = re.search(r'\("https://spuckhogycrxcbomznwo\.supabase\.co","([^"]+)"\)', response.text)
if match:
self.api_key = match.group(1)
return self.api_key
else:
self.logger.error("未能匹配API key")
return ""
except requests.RequestException as e:
self.logger.error(f"请求JS文件时发生错误: {e}")
return ""
def login(self):
"""
使用类成员中的电子邮件和密码进行用户登录,并获取用户信息。
"""
url = "https://spuckhogycrxcbomznwo.supabase.co/auth/v1/token?grant_type=password"
headers = {
'apikey': self.fetch_apikey(),
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
'Content-Type': 'application/json'
}
data = {
"email": self.email,
"password": self.password,
"gotrue_meta_security": {}
}
try:
response = self.session.post(url, headers=headers, json=data)
response.raise_for_status()
self.user_info = response.json()
self.refresh_token = self.user_info.get('refresh_token', '')
self.user_id = self.user_info.get('user', {}).get('id', '')
except requests.RequestException as e:
self.logger.error(f"\033[91m登录请求错误: {e}\033[0m")
return ""
def refresh_user_token(self):
"""
使用刷新令牌来请求一个新的访问令牌并更新实例变量。
"""
url = "https://spuckhogycrxcbomznwo.supabase.co/auth/v1/token?grant_type=refresh_token"
headers = {
'apikey': self.fetch_apikey(),
'content-type': 'application/json;charset=UTF-8',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
}
data = {
"refresh_token": self.refresh_token
}
try:
response = self.session.post(url, headers=headers, json=data)
response.raise_for_status()
self.user_info = response.json()
self.refresh_token = self.user_info.get('refresh_token', '')
self.user_id = self.user_info.get('user', {}).get('id', '')
except requests.RequestException as e:
self.logger.error(f"刷新令牌请求错误: {e}")
return ""
def base64_encode(self, data):
"""
将输入数据转换为JSON字符串并编码为Base64格式。
Parameters:
data (any): 需要编码的数据。
Returns:
str: 编码后的Base64字符串。
"""
json_str = json.dumps(data, ensure_ascii=False)
return base64.b64encode(json_str.encode('utf-8')).decode('utf-8')
def get_cookie_value(self):
"""
生成并返回用于会话验证的cookie值。
Returns:
str: 用于验证的cookie字符串。
"""
encoded_user_info = self.base64_encode(self.user_info)
return f"sb-spuckhogycrxcbomznwo-auth-token=base64-{encoded_user_info}"