# -*- coding:utf-8 -*- # @Software :PyCharm # @Project :LOL-DeepWinPredictor # @Path :/Data_CrawlProcess # @FileName :LPL.py # @Time :2025/4/21 23:36 # @Author :Viper373 # @GitHub :https://github.com/Viper373 # @Home :https://viper3.top # @Blog :https://blog.viper3.top import asyncio import os import re from functools import partial from typing import Dict, List import aiohttp import orjson import requests from pymongo import ASCENDING, errors from Data_CrawlProcess import env from Data_CrawlProcess.Process import Process from tool_utils.log_utils import RichLogger from tool_utils.mongo_utils import MongoUtils from tool_utils.progress_utils import RichProgressUtils process = Process() rich_logger = RichLogger() mongo_utils = MongoUtils() class LPL: def __init__(self, rich_progress=None): """ LPL类初始化方法。 :param rich_progress: RichProgressUtils实例(用于全局进度条) :return: None """ self.url = "https://lpl.qq.com/web201612/data/LOL_MATCH2_MATCH_HOMEPAGE_BMATCH_LIST_{}.js" self.match_url = "https://open.tjstats.com/match-auth-app/open/v1/compound/matchDetail?matchId={}" self.seasons_url = "https://lol.qq.com/act/AutoCMS/publish/LOLWeb/EventdataTab/EventdataTab.js" self.headers = { 'sec-ch-ua': '"Microsoft Edge";v="125", "Chromium";v="125", "Not.A/Brand";v="24"', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Referer': 'https://lpl.qq.com/web202301/schedule.html', 'X-Requested-With': 'XMLHttpRequest', 'sec-ch-ua-mobile': '?0', 'User-Agent': env.UA, # UA头 'Authorization': env.AUTHORIZATION, # 身份验证 'sec-ch-ua-platform': '"Windows"', } self.seasonsIds_headers = { 'accept': '*/*', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5,ko;q=0.4,fr;q=0.3', 'cache-control': 'no-cache', 'pragma': 'no-cache', 'priority': 'u=2', 'referer': 'https://lpl.qq.com/web202301/schedule.html', 'sec-ch-ua': '"Microsoft Edge";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'script', 'sec-fetch-mode': 'no-cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36 Edg/135.0.0.0', } self.cookies = { 'tj_rp_did': '60013b4b18ce11efa938fe48ef6d951e', } self.proxies = env.PROXIES # 代理 self.rich_progress = rich_progress async def auto_seasonIds(self, col_name: str, rich_progress) -> None: """ 自动从LPL官网获取赛季数据,合并env.py中的SEASONS并写入MongoDB。 :param col_name: MongoDB集合名称 :param rich_progress: RichProgressUtils实例 :return: None """ rich_progress = rich_progress or RichProgressUtils() collection = mongo_utils.use_collection(col_name) try: loop = asyncio.get_event_loop() request_func = partial( requests.get, url=self.seasons_url, headers=self.seasonsIds_headers, proxies=self.proxies ) response = await loop.run_in_executor(None, request_func, *tuple()) response.encoding = 'utf-8' match = re.search(r'return\s+(\[.*?\])[\s;]*}\)', response.text) if not match: rich_logger.error("[LPL] auto_seasonIds: 未找到JSON数据") return json_str = match.group(1) data = orjson.loads(json_str) seasons = [] for season in data: for game in season.get('domestic', []): seasons.append({ "name": game.get("gameName"), "id": game.get("iGameId"), "type": game.get("sGameType"), "url": game.get("url") }) for game in season.get('abroad', []): seasons.append({ "name": game.get("gameName"), "id": game.get("iGameId"), "type": game.get("sGameType"), "url": game.get("url") }) rich_logger.info(f"[LPL] auto_seasonIds: 官网获取到 {len(seasons)} 个赛季数据") except Exception as e: rich_logger.error(f"[LPL] auto_seasonIds: 获取官网赛季数据失败: {e}") return def get_existing_seasons(): if not os.path.exists(env.ENV_SEASONS): return [] with open(env.ENV_SEASONS, 'r', encoding='utf-8') as f: content = f.read() match = re.search(r'SEASONS\s*=\s*(\[.*?\])', content, re.DOTALL) if not match: return [] try: return orjson.loads(match.group(1)) except: return [] existing_seasons = get_existing_seasons() all_ids = set() merged_seasons = [] for s in seasons + existing_seasons: if s['id'] not in all_ids: merged_seasons.append(s) all_ids.add(s['id']) rich_logger.info(f"[LPL] auto_seasonIds: 合并后共有 {len(merged_seasons)} 个赛季数据(已自动去重)") content = "SEASONS = " + orjson.dumps(merged_seasons, option=orjson.OPT_INDENT_2).decode('utf-8') + "\n" if os.path.exists(env.ENV_SEASONS): with open(env.ENV_SEASONS, 'r', encoding='utf-8') as f: existing_content = f.read() if 'SEASONS = [' in existing_content: new_content = re.sub(r'SEASONS = \[.*?\]', content, existing_content, flags=re.DOTALL) else: new_content = existing_content + '\n' + content else: new_content = content with open(env.ENV_SEASONS, 'w', encoding='utf-8') as f: f.write(new_content) rich_logger.info(f"[LPL] auto_seasonIds: 赛季数据已成功更新到env.py文件!") # 4. 写入MongoDB collection.create_index([("id", ASCENDING)], unique=True) queue = asyncio.Queue(maxsize=20) fetch_task_id = rich_progress.add_task("[LPL] auto_seasonIds生产", total=len(merged_seasons)) store_task_id = rich_progress.add_task("[LPL] auto_seasonIds入库", total=len(merged_seasons)) async def producer(): for season in merged_seasons: await queue.put(season) rich_progress.advance(fetch_task_id) await queue.put(None) async def consumer(): count = 0 while True: item = await queue.get() if item is None: break try: collection.insert_one(item) except errors.DuplicateKeyError: pass count += 1 rich_progress.advance(store_task_id) rich_progress.update(store_task_id, completed=len(merged_seasons)) await asyncio.gather(producer(), consumer()) rich_logger.info(f"[LPL] auto_seasonIds: 赛季数据已全部入库,共{len(merged_seasons)}条") @staticmethod async def get_seasonIds(col_name: str, seasons: Dict[str, str], rich_progress) -> None: """ 异步生产者-消费者:获取赛季ID并写入数据库。 :param col_name: MongoDB集合名称 :param seasons: 赛季ID映射字典{str: str} :param rich_progress: RichProgressUtils实例 :return: None """ rich_progress = rich_progress or RichProgressUtils() collection = mongo_utils.use_collection(col_name) collection.create_index([("season_id", ASCENDING)], unique=True) queue = asyncio.Queue(maxsize=20) fetch_task_id = rich_progress.add_task("[LPL] seasonIDs生产", total=len(seasons)) store_task_id = rich_progress.add_task("[LPL] seasonIDs入库", total=len(seasons)) async def producer(): for season_name, season_id in seasons.items(): await queue.put({"season_name": season_name, "season_id": season_id}) rich_progress.advance(fetch_task_id) await queue.put(None) async def consumer(): count = 0 while True: item = await queue.get() if item is None: break try: collection.insert_one(item) except errors.DuplicateKeyError: pass count += 1 rich_progress.advance(store_task_id) rich_progress.update(store_task_id, completed=len(seasons)) await asyncio.gather(producer(), consumer()) rich_logger.info(f"爬取完成丨共计[{len(seasons)}]LPL_season") async def get_bMatchIds(self, col_name: str, seasons: Dict[str, str], rich_progress) -> None: """ 异步生产者-消费者:获取bMatchId并写入数据库。 :param col_name: MongoDB集合名称 :param seasons: 赛季ID映射字典{str: str} :param rich_progress: RichProgressUtils实例 :return: None """ rich_progress = rich_progress or self.rich_progress collection = mongo_utils.use_collection(col_name) collection.create_index([("bMatchId", ASCENDING)], unique=True) queue = asyncio.Queue(maxsize=50) all_records = [] # 先收集所有bMatchId async def fetch_all_records(): async def fetch_season_data(_url: str): try: loop = asyncio.get_event_loop() request_func = partial( requests.get, url=_url, headers=self.headers, cookies=self.cookies, proxies=self.proxies ) response = await loop.run_in_executor(None, request_func) if response.status_code != 200: rich_logger.error(f"获取bMatchId失败,状态码: {response.status_code}") return seasons_data = orjson.loads(response.text) if seasons_data.get('status') == "0": msg = seasons_data.get('msg') for match in msg: record = { 'GameName': match.get('GameName'), 'bMatchName': match.get('bMatchName'), 'MatchDate': match.get('MatchDate'), 'bMatchId': match.get('bMatchId') } all_records.append(record) except Exception as _error: rich_logger.error(f"爬取[LPL]bMatchId错误: {_error}") urls = [self.url.format(season_id) for season_id in seasons.values()] await asyncio.gather(*(fetch_season_data(url) for url in urls)) await fetch_all_records() fetch_task_id = rich_progress.add_task("[LPL] bMatchId生产", total=len(all_records)) store_task_id = rich_progress.add_task("[LPL] bMatchId入库", total=len(all_records)) async def producer_queue(): for record in all_records: await queue.put(record) rich_progress.advance(fetch_task_id) await queue.put(None) async def consumer(): count = 0 while True: item = await queue.get() if item is None: break try: collection.insert_one(item) except errors.DuplicateKeyError: pass count += 1 rich_progress.advance(store_task_id) rich_progress.update(store_task_id, completed=len(all_records)) await asyncio.gather(producer_queue(), consumer()) rich_logger.info(f"爬取完成丨LPL_bMatchId已全部入库") async def get_match_data(self, bmatch_ids: List[str], col_name: str, rich_progress=None) -> None: """ 生产者-消费者解耦:高并发爬取match_data并结构化入库,进度独立。 :param bmatch_ids: 比赛ID列表 :param col_name: MongoDB集合名称 :param rich_progress: RichProgressUtils实例 :return: None """ collection = mongo_utils.use_collection(col_name) session_timeout = aiohttp.ClientTimeout(total=30) fetch_task_id = rich_progress.add_task("[LPL] match_data爬取", total=len(bmatch_ids)) process_task_id = rich_progress.add_task("[LPL] process_data入库", total=len(bmatch_ids)) hero_win_rates = process.read_win_rate() process_queue = asyncio.Queue(maxsize=500) total_count = len(bmatch_ids) async def fetcher(): async with aiohttp.ClientSession(timeout=session_timeout) as session: sem = asyncio.Semaphore(50) async def fetch_one(bid): url = self.match_url.format(bid) async with sem: try: async with session.get(url, headers=self.headers, cookies=self.cookies) as resp: data = await resp.json() if data.get('success') and data.get('data'): match = data['data'] await process_queue.put(match) except Exception as e: rich_logger.error(f"[LPL] match_data爬取失败: {bid} {e}") self.rich_progress.advance(fetch_task_id) await asyncio.gather(*(fetch_one(bid) for bid in bmatch_ids)) await process_queue.put(None) async def processor(): count = 0 while True: match = await process_queue.get() if match is None: break try: processed_data = {} teamAId, teamAName, teamBId, teamBName = match['teamAId'], match['teamAName'], match['teamBId'], match['teamBName'] matchWin = 1 if match["matchWin"] == teamAId else 0 processed_data.update({ "teamAId": teamAId, "teamAName": teamAName, "teamBId": teamBId, "teamBName": teamBName, "matchWin": matchWin, }) matchInfos = match["matchInfos"] for bo in matchInfos: teamInfos = bo["teamInfos"] team_po = "A" for team in teamInfos: playerInfos = team["playerInfos"] count_ = 1 for player in playerInfos: playerLocation = "ADC" if player["playerLocation"] == "BOT" else player["playerLocation"] heroId, heroTitle, heroName = player["heroId"], player["heroTitle"], player["heroName"] key = f"{heroId}{playerLocation[:3].upper()}" heroWinRate = hero_win_rates.get(key, 0.50) processed_data.update({ f"{team_po}{count_}playerLocation": playerLocation, f"{team_po}{count_}heroId": heroId, f"{team_po}{count_}heroName": f"{heroTitle}-{heroName}", f"{team_po}{count_}heroWinRate": heroWinRate, }) count_ += 1 team_po = "B" if isinstance(processed_data, dict): collection.insert_one(processed_data) else: rich_logger.error(f"[LPL] processed_data类型错误: {type(processed_data)} | bMatchId: {match.get('bMatchId', 'unknown')}") except Exception as e: match_id = match.get('bMatchId', 'unknown') if isinstance(match, dict) else 'unknown' snippet = orjson.dumps(match)[:200].decode('utf-8') if isinstance(match, dict) else str(match)[:200] rich_logger.error(f"[LPL] process_data单条处理失败: {str(e)} | bMatchId: {match_id} | 数据片段: {snippet} ...") count += 1 rich_progress.advance(process_task_id) rich_progress.update(process_task_id, completed=total_count) await asyncio.gather(fetcher(), processor()) rich_logger.info(f"[LPL] match_data已全部入库") return None async def main(self, lpl_db: str, col_season: str, col_bmatch: str, col_match: str, seasons: dict) -> None: """ LPL主流程,自动串联所有流程:抓取seasonIds、bMatchIds、match_data。 :param lpl_db: LPL专用MongoDB数据库名称 :param col_season: 赛季ID集合名 :param col_bmatch: bMatchId集合名 :param col_match: 结构化比赛数据集合名 :param seasons: 赛季ID映射字典{str: str} :return: None """ mongo_utils.use_db(lpl_db) await self.get_seasonIds(col_season, seasons, self.rich_progress) await self.get_bMatchIds(col_bmatch, seasons, self.rich_progress) bmatch_ids = [item['bMatchId'] for item in mongo_utils.use_collection(col_bmatch).find({}, {'bMatchId': 1, '_id': 0})] await self.get_match_data(bmatch_ids, col_match, self.rich_progress) rich_logger.info("[LPL] main流程执行完毕")