|
|
|
|
|
|
|
import os
|
|
import re
|
|
import time
|
|
import random
|
|
import sqlite3
|
|
import threading
|
|
import concurrent.futures
|
|
from datetime import datetime
|
|
from typing import Optional, Tuple, List, Dict, Any
|
|
|
|
from loguru import logger
|
|
from jetbrains import CaptchaSolver
|
|
from 收发邮件 import EmailClient
|
|
|
|
|
|
STATUS_PENDING = 0
|
|
STATUS_SUBMITTED = 1
|
|
STATUS_FAILED = 2
|
|
STATUS_LINK_EXTRACTED = 3
|
|
|
|
class DatabaseManager:
|
|
"""数据库操作类,处理SQLite数据库的所有操作"""
|
|
|
|
def __init__(self, db_path: str = "unibo_jetbrains.db"):
|
|
"""初始化数据库连接和表结构
|
|
|
|
Args:
|
|
db_path: 数据库文件路径
|
|
"""
|
|
self.db_path = db_path
|
|
self.conn = None
|
|
self.lock = threading.Lock()
|
|
self._initialize_db()
|
|
|
|
def _initialize_db(self) -> None:
|
|
"""初始化数据库,创建必要的表结构"""
|
|
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
cursor = self.conn.cursor()
|
|
|
|
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS accounts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
register_time TEXT NOT NULL,
|
|
username TEXT NOT NULL UNIQUE,
|
|
password TEXT NOT NULL,
|
|
security_email TEXT,
|
|
status INTEGER DEFAULT 0,
|
|
activation_link TEXT,
|
|
updated_at TEXT,
|
|
notes TEXT,
|
|
used INTEGER DEFAULT 0 -- 新增字段,0未使用,1已使用
|
|
)
|
|
''')
|
|
|
|
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS operation_logs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT NOT NULL,
|
|
operation TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
message TEXT,
|
|
created_at TEXT NOT NULL
|
|
)
|
|
''')
|
|
|
|
self.conn.commit()
|
|
logger.info(f"数据库初始化完成: {self.db_path}")
|
|
|
|
def __del__(self):
|
|
"""析构函数,确保数据库连接正确关闭"""
|
|
if self.conn:
|
|
self.conn.close()
|
|
|
|
def import_from_file(self, filepath: str) -> int:
|
|
"""从文件导入账号数据
|
|
|
|
Args:
|
|
filepath: 包含账号数据的文件路径
|
|
|
|
Returns:
|
|
导入的账号数量
|
|
"""
|
|
count = 0
|
|
try:
|
|
with self.lock, open(filepath, 'r', encoding='utf-8') as file:
|
|
cursor = self.conn.cursor()
|
|
|
|
for line in file:
|
|
parts = line.strip().split('---')
|
|
if len(parts) >= 3:
|
|
register_time = parts[0]
|
|
username = parts[1]
|
|
password = parts[2]
|
|
security_email = parts[3] if len(parts) > 3 else ""
|
|
status = 0
|
|
activation_link = ""
|
|
|
|
|
|
if len(parts) > 4 and "success" in parts[4].lower():
|
|
status = STATUS_SUBMITTED
|
|
|
|
if len(parts) > 5 and "http" in parts[5].lower():
|
|
activation_link = parts[5]
|
|
status = STATUS_LINK_EXTRACTED
|
|
|
|
|
|
try:
|
|
cursor.execute('''
|
|
INSERT OR IGNORE INTO accounts
|
|
(register_time, username, password, security_email, status, activation_link, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, datetime('now', 'localtime'))
|
|
''', (register_time, username, password, security_email, status, activation_link))
|
|
|
|
if cursor.rowcount > 0:
|
|
count += 1
|
|
except sqlite3.IntegrityError:
|
|
logger.warning(f"账号已存在,跳过: {username}")
|
|
|
|
self.conn.commit()
|
|
logger.info(f"成功从{filepath}导入了{count}个账号")
|
|
return count
|
|
except Exception as e:
|
|
logger.error(f"从文件导入账号时出错: {str(e)}")
|
|
return 0
|
|
|
|
def get_pending_accounts(self, limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""获取待处理的账号(未提交状态)
|
|
|
|
Args:
|
|
limit: 返回账号的最大数量
|
|
|
|
Returns:
|
|
账号列表,每个账号是一个字典
|
|
"""
|
|
with self.lock:
|
|
cursor = self.conn.cursor()
|
|
cursor.execute('''
|
|
SELECT id, register_time, username, password, security_email, status, activation_link
|
|
FROM accounts
|
|
WHERE status = ?
|
|
LIMIT ?
|
|
''', (STATUS_PENDING, limit))
|
|
|
|
accounts = []
|
|
for row in cursor.fetchall():
|
|
accounts.append({
|
|
'id': row[0],
|
|
'register_time': row[1],
|
|
'username': row[2],
|
|
'password': row[3],
|
|
'security_email': row[4],
|
|
'status': row[5],
|
|
'activation_link': row[6]
|
|
})
|
|
|
|
return accounts
|
|
|
|
def get_submitted_accounts(self, limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""获取已提交但未提取链接的账号
|
|
|
|
Args:
|
|
limit: 返回账号的最大数量
|
|
|
|
Returns:
|
|
账号列表,每个账号是一个字典
|
|
"""
|
|
with self.lock:
|
|
cursor = self.conn.cursor()
|
|
cursor.execute('''
|
|
SELECT id, register_time, username, password, security_email, status, activation_link
|
|
FROM accounts
|
|
WHERE status = ? AND (activation_link IS NULL OR activation_link = '')
|
|
LIMIT ?
|
|
''', (STATUS_SUBMITTED, limit))
|
|
|
|
accounts = []
|
|
for row in cursor.fetchall():
|
|
accounts.append({
|
|
'id': row[0],
|
|
'register_time': row[1],
|
|
'username': row[2],
|
|
'password': row[3],
|
|
'security_email': row[4],
|
|
'status': row[5],
|
|
'activation_link': row[6]
|
|
})
|
|
|
|
return accounts
|
|
|
|
def update_account_status(self, id: int, status: int, activation_link: str = None, notes: str = None) -> bool:
|
|
"""更新账号状态
|
|
|
|
Args:
|
|
id: 账号ID
|
|
status: 新状态
|
|
activation_link: 激活链接(可选)
|
|
notes: 备注信息(可选)
|
|
|
|
Returns:
|
|
更新成功返回True,否则返回False
|
|
"""
|
|
try:
|
|
with self.lock:
|
|
cursor = self.conn.cursor()
|
|
|
|
if activation_link and notes:
|
|
cursor.execute('''
|
|
UPDATE accounts
|
|
SET status = ?, activation_link = ?, notes = ?, updated_at = datetime('now', 'localtime')
|
|
WHERE id = ?
|
|
''', (status, activation_link, notes, id))
|
|
elif activation_link:
|
|
cursor.execute('''
|
|
UPDATE accounts
|
|
SET status = ?, activation_link = ?, updated_at = datetime('now', 'localtime')
|
|
WHERE id = ?
|
|
''', (status, activation_link, id))
|
|
elif notes:
|
|
cursor.execute('''
|
|
UPDATE accounts
|
|
SET status = ?, notes = ?, updated_at = datetime('now', 'localtime')
|
|
WHERE id = ?
|
|
''', (status, notes, id))
|
|
else:
|
|
cursor.execute('''
|
|
UPDATE accounts
|
|
SET status = ?, updated_at = datetime('now', 'localtime')
|
|
WHERE id = ?
|
|
''', (status, id))
|
|
|
|
self.conn.commit()
|
|
return cursor.rowcount > 0
|
|
except Exception as e:
|
|
logger.error(f"更新账号状态时出错: {str(e)}")
|
|
return False
|
|
|
|
def log_operation(self, username: str, operation: str, status: str, message: str = None) -> None:
|
|
"""记录操作日志
|
|
|
|
Args:
|
|
username: 相关的用户名
|
|
operation: 操作类型
|
|
status: 操作状态
|
|
message: 附加信息
|
|
"""
|
|
try:
|
|
with self.lock:
|
|
cursor = self.conn.cursor()
|
|
cursor.execute('''
|
|
INSERT INTO operation_logs
|
|
(username, operation, status, message, created_at)
|
|
VALUES (?, ?, ?, ?, datetime('now', 'localtime'))
|
|
''', (username, operation, status, message))
|
|
self.conn.commit()
|
|
except Exception as e:
|
|
logger.error(f"记录操作日志时出错: {str(e)}")
|
|
|
|
def export_results_to_file(self, filepath: str, status: int = None) -> int:
|
|
"""将结果导出到文件
|
|
|
|
Args:
|
|
filepath: 输出文件路径
|
|
status: 筛选的状态(可选)
|
|
|
|
Returns:
|
|
导出的记录数量
|
|
"""
|
|
try:
|
|
with self.lock, open(filepath, 'w', encoding='utf-8') as file:
|
|
cursor = self.conn.cursor()
|
|
|
|
if status is not None:
|
|
cursor.execute('''
|
|
SELECT register_time, username, password, security_email, status, activation_link
|
|
FROM accounts
|
|
WHERE status = ?
|
|
''', (status,))
|
|
else:
|
|
cursor.execute('''
|
|
SELECT register_time, username, password, security_email, status, activation_link
|
|
FROM accounts
|
|
''')
|
|
|
|
count = 0
|
|
for row in cursor.fetchall():
|
|
|
|
status_text = ""
|
|
if row[4] == STATUS_SUBMITTED:
|
|
status_text = "success"
|
|
elif row[4] == STATUS_FAILED:
|
|
status_text = "failed"
|
|
elif row[4] == STATUS_LINK_EXTRACTED:
|
|
status_text = "successed"
|
|
|
|
|
|
if row[5]:
|
|
line = f"{row[0]}---{row[1]}---{row[2]}---{row[3]}---{status_text}---{row[5]}\n"
|
|
elif status_text:
|
|
line = f"{row[0]}---{row[1]}---{row[2]}---{row[3]}---{status_text}\n"
|
|
else:
|
|
line = f"{row[0]}---{row[1]}---{row[2]}---{row[3]}\n"
|
|
|
|
file.write(line)
|
|
count += 1
|
|
|
|
return count
|
|
except Exception as e:
|
|
logger.error(f"导出结果到文件时出错: {str(e)}")
|
|
return 0
|
|
|
|
|
|
class JetbrainsSubmitter:
|
|
"""提交邮箱到JetBrains获取激活链接的类"""
|
|
|
|
def __init__(self, db_manager: DatabaseManager, proxy: str = None):
|
|
"""初始化提交器
|
|
|
|
Args:
|
|
db_manager: 数据库管理器实例
|
|
proxy: 代理服务器地址
|
|
"""
|
|
self.db_manager = db_manager
|
|
self.proxy = proxy
|
|
logger.info("JetbrainsSubmitter 初始化完成")
|
|
|
|
def random_name(self) -> Tuple[str, str]:
|
|
"""生成随机的意大利名字和姓氏
|
|
|
|
Returns:
|
|
(名字, 姓氏)元组
|
|
"""
|
|
first_names = [
|
|
"Marco", "Giuseppe", "Antonio", "Giovanni", "Mario", "Luigi", "Paolo", "Francesco", "Roberto", "Stefano",
|
|
"Alessandro", "Andrea", "Giorgio", "Bruno", "Carlo", "Enrico", "Fabio", "Davide", "Claudio", "Massimo",
|
|
"Sofia", "Giulia", "Isabella", "Valentina", "Chiara", "Laura", "Maria", "Anna", "Francesca", "Elena",
|
|
"Alessandra", "Martina", "Giovanna", "Rosa", "Angela", "Lucia", "Paola", "Silvia", "Monica", "Cristina"
|
|
]
|
|
last_names = [
|
|
"Rossi", "Ferrari", "Russo", "Bianchi", "Romano", "Gallo", "Costa", "Fontana", "Conti", "Esposito",
|
|
"Ricci", "Bruno", "De Luca", "Moretti", "Marino", "Greco", "Barbieri", "Lombardi", "Giordano", "Colombo",
|
|
"Mancini", "Longo", "Leone", "Martinelli", "Santoro", "Mariani", "Vitale", "Ferraro", "Rinaldi", "Villa"
|
|
]
|
|
return random.choice(first_names), random.choice(last_names)
|
|
|
|
def submit_email(self, account: Dict[str, Any]) -> bool:
|
|
"""提交邮箱到JetBrains获取激活链接
|
|
|
|
Args:
|
|
account: 账号信息字典
|
|
|
|
Returns:
|
|
提交成功返回True,否则返回False
|
|
"""
|
|
username = account['username']
|
|
logger.info(f"正在尝试提交邮箱: {username} 到JetBrains...")
|
|
|
|
try:
|
|
firstname, lastname = self.random_name()
|
|
|
|
|
|
with CaptchaSolver(
|
|
email=username,
|
|
firstname=firstname,
|
|
lastname=lastname,
|
|
is_teacher=False,
|
|
proxy=self.proxy
|
|
) as solver:
|
|
success = solver.solve_audio_captcha()
|
|
|
|
if success:
|
|
logger.info(f"邮箱 {username} 提交成功")
|
|
self.db_manager.update_account_status(account['id'], STATUS_SUBMITTED)
|
|
self.db_manager.log_operation(username, "submit_email", "success")
|
|
return True
|
|
else:
|
|
logger.warning(f"邮箱 {username} 提交失败 (CaptchaSolver返回False)")
|
|
self.db_manager.update_account_status(account['id'], STATUS_FAILED, notes="提交失败请手动检查")
|
|
self.db_manager.log_operation(username, "submit_email", "failed", "CaptchaSolver返回False")
|
|
return False
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
logger.error(f"提交邮箱 {username} 时发生错误: {error_msg}")
|
|
self.db_manager.update_account_status(account['id'], STATUS_FAILED, notes=f"认证失败: {error_msg[:100]}")
|
|
self.db_manager.log_operation(username, "submit_email", "error", error_msg[:200])
|
|
return False
|
|
|
|
def submit_batch(self, max_accounts: int = 5, max_workers: int = 3) -> Tuple[int, int]:
|
|
"""批量提交邮箱
|
|
|
|
Args:
|
|
max_accounts: 最大处理账号数量
|
|
max_workers: 最大并发线程数
|
|
|
|
Returns:
|
|
(成功数量, 失败数量)元组
|
|
"""
|
|
accounts = self.db_manager.get_pending_accounts(limit=max_accounts)
|
|
if not accounts:
|
|
logger.info("没有待处理的账号")
|
|
return 0, 0
|
|
|
|
success_count = 0
|
|
error_count = 0
|
|
logger.info(f"开始批量处理 {len(accounts)} 个账号,最大并发数: {max_workers}")
|
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
future_to_account = {executor.submit(self.submit_email, account): account for account in accounts}
|
|
|
|
for future in concurrent.futures.as_completed(future_to_account):
|
|
account = future_to_account[future]
|
|
try:
|
|
success = future.result()
|
|
if success:
|
|
success_count += 1
|
|
else:
|
|
error_count += 1
|
|
except Exception as e:
|
|
logger.error(f"执行任务时发生异常: {str(e)}")
|
|
error_count += 1
|
|
|
|
self.db_manager.update_account_status(account['id'], STATUS_FAILED, notes=f"执行异常: {str(e)[:100]}")
|
|
|
|
logger.info(f"批量处理完成,成功: {success_count},失败: {error_count}")
|
|
return success_count, error_count
|
|
|
|
|
|
class LinkExtractor:
|
|
"""从邮箱中提取JetBrains激活链接的类"""
|
|
|
|
def __init__(self, db_manager: DatabaseManager):
|
|
"""初始化链接提取器
|
|
|
|
Args:
|
|
db_manager: 数据库管理器实例
|
|
"""
|
|
self.db_manager = db_manager
|
|
logger.info("LinkExtractor 初始化完成")
|
|
|
|
def _find_activation_link(self, text_content: str) -> Optional[str]:
|
|
"""在文本内容中查找JetBrains激活链接
|
|
|
|
Args:
|
|
text_content: 邮件文本内容
|
|
|
|
Returns:
|
|
找到的激活链接,未找到返回None
|
|
"""
|
|
if not text_content:
|
|
return None
|
|
|
|
|
|
match = re.search(r'https?://(?:account\.jetbrains\.com/login|www\.jetbrains\.com/shop/(?:account|eform)|jetbrains\.com/activate)[?\S]+', text_content)
|
|
if match:
|
|
link = match.group(0)
|
|
|
|
if "jetbrains.com" in link and ("activate" in link or "account" in link or "login" in link or "eform" in link):
|
|
|
|
link = link.split('<')[0].split('>')[0].split('"')[0].split("'")[0].strip()
|
|
logger.info(f"找到可能的激活链接: {link}")
|
|
return link
|
|
return None
|
|
|
|
def extract_link(self, account: Dict[str, Any]) -> Optional[str]:
|
|
"""登录邮箱并提取JetBrains激活链接
|
|
|
|
Args:
|
|
account: 账号信息字典
|
|
|
|
Returns:
|
|
提取到的激活链接,未找到返回None
|
|
"""
|
|
username = account['username']
|
|
password = account['password']
|
|
logger.info(f"正在尝试登录邮箱: {username} 并提取激活链接...")
|
|
|
|
try:
|
|
|
|
client = EmailClient(
|
|
username=username,
|
|
password=password,
|
|
email_address=username
|
|
)
|
|
|
|
|
|
logger.info(f"为 {username} 执行OAuth认证...")
|
|
client.authenticate_oauth()
|
|
logger.info(f"为 {username} OAuth认证成功")
|
|
|
|
|
|
logger.info(f"为 {username} 搜索JetBrains相关邮件...")
|
|
search_keywords = [
|
|
'BODY "jetbrain"',
|
|
'SUBJECT "JetBrains Account"',
|
|
'SUBJECT "Activate JetBrains"',
|
|
'BODY "jetbrains.com/activate"',
|
|
'BODY "account.jetbrains.com"',
|
|
'FROM "jetbrains.com"'
|
|
]
|
|
|
|
emails = []
|
|
for keyword in search_keywords:
|
|
logger.info(f"使用关键词 '{keyword}' 搜索...")
|
|
try:
|
|
fetched_emails = client.read_emails(mailbox="INBOX", limit=5, keyword=keyword)
|
|
emails.extend(fetched_emails)
|
|
if emails:
|
|
logger.info(f"使用关键词 '{keyword}' 找到 {len(fetched_emails)} 封邮件")
|
|
break
|
|
except Exception as read_err:
|
|
logger.warning(f"使用关键词 '{keyword}' 读取邮件时出错: {read_err}")
|
|
time.sleep(1)
|
|
|
|
if not emails:
|
|
logger.warning(f"邮箱 {username} 中未找到相关的JetBrains邮件")
|
|
self.db_manager.update_account_status(account['id'], STATUS_SUBMITTED, notes="没有找到包含关键字的邮件")
|
|
self.db_manager.log_operation(username, "extract_link", "failed", "没有找到包含关键字的邮件")
|
|
return None
|
|
|
|
logger.info(f"在 {username} 中找到 {len(emails)} 封相关邮件,开始解析...")
|
|
|
|
|
|
for email_content in emails:
|
|
html_body = email_content.get('html')
|
|
text_body = email_content.get('text')
|
|
|
|
link = None
|
|
if html_body:
|
|
link = self._find_activation_link(html_body)
|
|
if not link and text_body:
|
|
link = self._find_activation_link(text_body)
|
|
|
|
if link:
|
|
logger.success(f"为邮箱 {username} 成功提取到激活链接: {link}")
|
|
self.db_manager.update_account_status(account['id'], STATUS_LINK_EXTRACTED, activation_link=link)
|
|
self.db_manager.log_operation(username, "extract_link", "success", link)
|
|
return link
|
|
|
|
logger.warning(f"在找到的邮件中未能为邮箱 {username} 提取到激活链接")
|
|
self.db_manager.update_account_status(account['id'], STATUS_SUBMITTED, notes="没有找到激活链接")
|
|
self.db_manager.log_operation(username, "extract_link", "failed", "在邮件中未找到链接")
|
|
return None
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
|
|
if "Authentication failed" in error_msg or "认证失败" in error_msg:
|
|
logger.error(f"邮箱 {username} 认证失败: {error_msg}")
|
|
error_type = "认证失败"
|
|
elif "invalid_grant" in error_msg:
|
|
logger.error(f"邮箱 {username} OAuth Token无效或过期: {error_msg}")
|
|
error_type = "OAuth Token无效"
|
|
else:
|
|
logger.error(f"为邮箱 {username} 提取链接时发生未预料的错误: {error_msg}")
|
|
error_type = "未知错误"
|
|
|
|
self.db_manager.update_account_status(account['id'], STATUS_SUBMITTED, notes=f"{error_type}: {error_msg[:100]}")
|
|
self.db_manager.log_operation(username, "extract_link", "error", error_msg[:200])
|
|
return None
|
|
|
|
def extract_batch(self, max_accounts: int = 5, max_workers: int = 1) -> Tuple[int, int]:
|
|
"""批量提取激活链接
|
|
|
|
Args:
|
|
max_accounts: 最大处理账号数量
|
|
max_workers: 最大并发线程数
|
|
|
|
Returns:
|
|
(成功数量, 失败数量)元组
|
|
"""
|
|
accounts = self.db_manager.get_submitted_accounts(limit=max_accounts)
|
|
if not accounts:
|
|
logger.info("没有待提取链接的账号")
|
|
return 0, 0
|
|
|
|
success_count = 0
|
|
error_count = 0
|
|
logger.info(f"开始批量提取链接,处理 {len(accounts)} 个账号,最大并发数: {max_workers}")
|
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
future_to_account = {executor.submit(self.extract_link, account): account for account in accounts}
|
|
|
|
for future in concurrent.futures.as_completed(future_to_account):
|
|
account = future_to_account[future]
|
|
try:
|
|
link = future.result()
|
|
if link:
|
|
success_count += 1
|
|
else:
|
|
error_count += 1
|
|
except Exception as e:
|
|
logger.error(f"执行提取任务时发生异常: {str(e)}")
|
|
error_count += 1
|
|
|
|
self.db_manager.update_account_status(account['id'], STATUS_SUBMITTED, notes=f"执行异常: {str(e)[:100]}")
|
|
|
|
logger.info(f"批量提取完成,成功: {success_count},失败: {error_count}")
|
|
return success_count, error_count
|
|
|
|
|
|
class ProcessController:
|
|
"""流程控制类,协调整个处理流程"""
|
|
|
|
def __init__(self, db_path: str = "unibo_jetbrains.db", proxy: str = ''):
|
|
"""初始化流程控制器
|
|
|
|
Args:
|
|
db_path: 数据库文件路径
|
|
proxy: 代理服务器地址
|
|
"""
|
|
self.db_manager = DatabaseManager(db_path)
|
|
self.submitter = JetbrainsSubmitter(self.db_manager, proxy)
|
|
self.extractor = LinkExtractor(self.db_manager)
|
|
logger.info("ProcessController 初始化完成")
|
|
|
|
def import_data(self, filepath: str) -> int:
|
|
"""从文件导入账号数据
|
|
|
|
Args:
|
|
filepath: 账号数据文件路径
|
|
|
|
Returns:
|
|
导入的账号数量
|
|
"""
|
|
return self.db_manager.import_from_file(filepath)
|
|
|
|
def export_data(self, filepath: str, status: int = None) -> int:
|
|
"""导出账号数据到文件
|
|
|
|
Args:
|
|
filepath: 输出文件路径
|
|
status: 筛选状态(可选)
|
|
|
|
Returns:
|
|
导出的记录数量
|
|
"""
|
|
return self.db_manager.export_results_to_file(filepath, status)
|
|
|
|
def run_submission_process(self, max_accounts: int = 5, max_workers: int = 3) -> Tuple[int, int]:
|
|
"""运行邮箱提交流程
|
|
|
|
Args:
|
|
max_accounts: 最大处理账号数量
|
|
max_workers: 最大并发线程数
|
|
|
|
Returns:
|
|
(成功数量, 失败数量)元组
|
|
"""
|
|
return self.submitter.submit_batch(max_accounts, max_workers)
|
|
|
|
def run_extraction_process(self, max_accounts: int = 5, max_workers: int = 1) -> Tuple[int, int]:
|
|
"""运行链接提取流程
|
|
|
|
Args:
|
|
max_accounts: 最大处理账号数量
|
|
max_workers: 最大并发线程数
|
|
|
|
Returns:
|
|
(成功数量, 失败数量)元组
|
|
"""
|
|
return self.extractor.extract_batch(max_accounts, max_workers)
|
|
|
|
def run_full_process(self, max_submission: int = 5, max_extraction: int = 5,
|
|
submission_workers: int = 3, extraction_workers: int = 1) -> Dict[str, int]:
|
|
"""运行完整流程:提交邮箱并提取链接
|
|
|
|
Args:
|
|
max_submission: 提交流程最大处理账号数量
|
|
max_extraction: 提取流程最大处理账号数量
|
|
submission_workers: 提交流程最大并发线程数
|
|
extraction_workers: 提取流程最大并发线程数
|
|
|
|
Returns:
|
|
包含各步骤统计信息的字典
|
|
"""
|
|
|
|
sub_success, sub_fail = self.run_submission_process(max_submission, submission_workers)
|
|
|
|
|
|
ext_success, ext_fail = self.run_extraction_process(max_extraction, extraction_workers)
|
|
|
|
return {
|
|
'submission_success': sub_success,
|
|
'submission_fail': sub_fail,
|
|
'extraction_success': ext_success,
|
|
'extraction_fail': ext_fail,
|
|
'total_processed': sub_success + sub_fail + ext_success + ext_fail
|
|
}
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
logger.remove()
|
|
logger.add(lambda msg: print(msg, end=""), level="INFO")
|
|
logger.add("unibo_process.log", rotation="500 KB", level="DEBUG")
|
|
|
|
|
|
controller = ProcessController()
|
|
|
|
|
|
print("\n=== 从文件导入数据 ===")
|
|
imported = controller.import_data("user-4-21-success.txt")
|
|
print(f"导入了 {imported} 个账号")
|
|
|
|
|
|
print("\n=== 运行提交邮箱流程 ===")
|
|
sub_success, sub_fail = controller.run_submission_process(max_accounts=3, max_workers=1)
|
|
print(f"提交结果: 成功 {sub_success} 个, 失败 {sub_fail} 个")
|
|
|
|
|
|
print("\n=== 运行提取链接流程 ===")
|
|
ext_success, ext_fail = controller.run_extraction_process(max_accounts=2, max_workers=1)
|
|
print(f"提取结果: 成功 {ext_success} 个, 失败 {ext_fail} 个")
|
|
|
|
|
|
print("\n=== 导出链接提取成功的账号 ===")
|
|
exported = controller.export_data("export_results.txt", STATUS_LINK_EXTRACTED)
|
|
print(f"导出了 {exported} 个记录") |