Viper373's picture
Upload Data_CrawlProcess/LPL.py with huggingface_hub
01bf6bb verified
# -*- coding:utf-8 -*-
# @Software :PyCharm
# @Project :LOL-DeepWinPredictor
# @Path :/Data_CrawlProcess
# @FileName :LPL.py
# @Time :2025/4/21 23:36
# @Author :Viper373
# @GitHub :https://github.com/Viper373
# @Home :https://viper3.top
# @Blog :https://blog.viper3.top
import asyncio
import os
import re
from functools import partial
from typing import Dict, List
import aiohttp
import orjson
import requests
from pymongo import ASCENDING, errors
from Data_CrawlProcess import env
from Data_CrawlProcess.Process import Process
from tool_utils.log_utils import RichLogger
from tool_utils.mongo_utils import MongoUtils
from tool_utils.progress_utils import RichProgressUtils
process = Process()
rich_logger = RichLogger()
mongo_utils = MongoUtils()
class LPL:
def __init__(self, rich_progress=None):
"""
LPL类初始化方法。
:param rich_progress: RichProgressUtils实例(用于全局进度条)
:return: None
"""
self.url = "https://lpl.qq.com/web201612/data/LOL_MATCH2_MATCH_HOMEPAGE_BMATCH_LIST_{}.js"
self.match_url = "https://open.tjstats.com/match-auth-app/open/v1/compound/matchDetail?matchId={}"
self.seasons_url = "https://lol.qq.com/act/AutoCMS/publish/LOLWeb/EventdataTab/EventdataTab.js"
self.headers = {
'sec-ch-ua': '"Microsoft Edge";v="125", "Chromium";v="125", "Not.A/Brand";v="24"',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Referer': 'https://lpl.qq.com/web202301/schedule.html',
'X-Requested-With': 'XMLHttpRequest',
'sec-ch-ua-mobile': '?0',
'User-Agent': env.UA, # UA头
'Authorization': env.AUTHORIZATION, # 身份验证
'sec-ch-ua-platform': '"Windows"',
}
self.seasonsIds_headers = {
'accept': '*/*',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5,ko;q=0.4,fr;q=0.3',
'cache-control': 'no-cache',
'pragma': 'no-cache',
'priority': 'u=2',
'referer': 'https://lpl.qq.com/web202301/schedule.html',
'sec-ch-ua': '"Microsoft Edge";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'script',
'sec-fetch-mode': 'no-cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36 Edg/135.0.0.0',
}
self.cookies = {
'tj_rp_did': '60013b4b18ce11efa938fe48ef6d951e',
}
self.proxies = env.PROXIES # 代理
self.rich_progress = rich_progress
async def auto_seasonIds(self, col_name: str, rich_progress) -> None:
"""
自动从LPL官网获取赛季数据,合并env.py中的SEASONS并写入MongoDB。
:param col_name: MongoDB集合名称
:param rich_progress: RichProgressUtils实例
:return: None
"""
rich_progress = rich_progress or RichProgressUtils()
collection = mongo_utils.use_collection(col_name)
try:
loop = asyncio.get_event_loop()
request_func = partial(
requests.get, url=self.seasons_url, headers=self.seasonsIds_headers, proxies=self.proxies
)
response = await loop.run_in_executor(None, request_func, *tuple())
response.encoding = 'utf-8'
match = re.search(r'return\s+(\[.*?\])[\s;]*}\)', response.text)
if not match:
rich_logger.error("[LPL] auto_seasonIds: 未找到JSON数据")
return
json_str = match.group(1)
data = orjson.loads(json_str)
seasons = []
for season in data:
for game in season.get('domestic', []):
seasons.append({
"name": game.get("gameName"),
"id": game.get("iGameId"),
"type": game.get("sGameType"),
"url": game.get("url")
})
for game in season.get('abroad', []):
seasons.append({
"name": game.get("gameName"),
"id": game.get("iGameId"),
"type": game.get("sGameType"),
"url": game.get("url")
})
rich_logger.info(f"[LPL] auto_seasonIds: 官网获取到 {len(seasons)} 个赛季数据")
except Exception as e:
rich_logger.error(f"[LPL] auto_seasonIds: 获取官网赛季数据失败: {e}")
return
def get_existing_seasons():
if not os.path.exists(env.ENV_SEASONS):
return []
with open(env.ENV_SEASONS, 'r', encoding='utf-8') as f:
content = f.read()
match = re.search(r'SEASONS\s*=\s*(\[.*?\])', content, re.DOTALL)
if not match:
return []
try:
return orjson.loads(match.group(1))
except:
return []
existing_seasons = get_existing_seasons()
all_ids = set()
merged_seasons = []
for s in seasons + existing_seasons:
if s['id'] not in all_ids:
merged_seasons.append(s)
all_ids.add(s['id'])
rich_logger.info(f"[LPL] auto_seasonIds: 合并后共有 {len(merged_seasons)} 个赛季数据(已自动去重)")
content = "SEASONS = " + orjson.dumps(merged_seasons, option=orjson.OPT_INDENT_2).decode('utf-8') + "\n"
if os.path.exists(env.ENV_SEASONS):
with open(env.ENV_SEASONS, 'r', encoding='utf-8') as f:
existing_content = f.read()
if 'SEASONS = [' in existing_content:
new_content = re.sub(r'SEASONS = \[.*?\]', content, existing_content, flags=re.DOTALL)
else:
new_content = existing_content + '\n' + content
else:
new_content = content
with open(env.ENV_SEASONS, 'w', encoding='utf-8') as f:
f.write(new_content)
rich_logger.info(f"[LPL] auto_seasonIds: 赛季数据已成功更新到env.py文件!")
# 4. 写入MongoDB
collection.create_index([("id", ASCENDING)], unique=True)
queue = asyncio.Queue(maxsize=20)
fetch_task_id = rich_progress.add_task("[LPL] auto_seasonIds生产", total=len(merged_seasons))
store_task_id = rich_progress.add_task("[LPL] auto_seasonIds入库", total=len(merged_seasons))
async def producer():
for season in merged_seasons:
await queue.put(season)
rich_progress.advance(fetch_task_id)
await queue.put(None)
async def consumer():
count = 0
while True:
item = await queue.get()
if item is None:
break
try:
collection.insert_one(item)
except errors.DuplicateKeyError:
pass
count += 1
rich_progress.advance(store_task_id)
rich_progress.update(store_task_id, completed=len(merged_seasons))
await asyncio.gather(producer(), consumer())
rich_logger.info(f"[LPL] auto_seasonIds: 赛季数据已全部入库,共{len(merged_seasons)}条")
@staticmethod
async def get_seasonIds(col_name: str, seasons: Dict[str, str], rich_progress) -> None:
"""
异步生产者-消费者:获取赛季ID并写入数据库。
:param col_name: MongoDB集合名称
:param seasons: 赛季ID映射字典{str: str}
:param rich_progress: RichProgressUtils实例
:return: None
"""
rich_progress = rich_progress or RichProgressUtils()
collection = mongo_utils.use_collection(col_name)
collection.create_index([("season_id", ASCENDING)], unique=True)
queue = asyncio.Queue(maxsize=20)
fetch_task_id = rich_progress.add_task("[LPL] seasonIDs生产", total=len(seasons))
store_task_id = rich_progress.add_task("[LPL] seasonIDs入库", total=len(seasons))
async def producer():
for season_name, season_id in seasons.items():
await queue.put({"season_name": season_name, "season_id": season_id})
rich_progress.advance(fetch_task_id)
await queue.put(None)
async def consumer():
count = 0
while True:
item = await queue.get()
if item is None:
break
try:
collection.insert_one(item)
except errors.DuplicateKeyError:
pass
count += 1
rich_progress.advance(store_task_id)
rich_progress.update(store_task_id, completed=len(seasons))
await asyncio.gather(producer(), consumer())
rich_logger.info(f"爬取完成丨共计[{len(seasons)}]LPL_season")
async def get_bMatchIds(self, col_name: str, seasons: Dict[str, str], rich_progress) -> None:
"""
异步生产者-消费者:获取bMatchId并写入数据库。
:param col_name: MongoDB集合名称
:param seasons: 赛季ID映射字典{str: str}
:param rich_progress: RichProgressUtils实例
:return: None
"""
rich_progress = rich_progress or self.rich_progress
collection = mongo_utils.use_collection(col_name)
collection.create_index([("bMatchId", ASCENDING)], unique=True)
queue = asyncio.Queue(maxsize=50)
all_records = []
# 先收集所有bMatchId
async def fetch_all_records():
async def fetch_season_data(_url: str):
try:
loop = asyncio.get_event_loop()
request_func = partial(
requests.get, url=_url,
headers=self.headers,
cookies=self.cookies,
proxies=self.proxies
)
response = await loop.run_in_executor(None, request_func)
if response.status_code != 200:
rich_logger.error(f"获取bMatchId失败,状态码: {response.status_code}")
return
seasons_data = orjson.loads(response.text)
if seasons_data.get('status') == "0":
msg = seasons_data.get('msg')
for match in msg:
record = {
'GameName': match.get('GameName'),
'bMatchName': match.get('bMatchName'),
'MatchDate': match.get('MatchDate'),
'bMatchId': match.get('bMatchId')
}
all_records.append(record)
except Exception as _error:
rich_logger.error(f"爬取[LPL]bMatchId错误: {_error}")
urls = [self.url.format(season_id) for season_id in seasons.values()]
await asyncio.gather(*(fetch_season_data(url) for url in urls))
await fetch_all_records()
fetch_task_id = rich_progress.add_task("[LPL] bMatchId生产", total=len(all_records))
store_task_id = rich_progress.add_task("[LPL] bMatchId入库", total=len(all_records))
async def producer_queue():
for record in all_records:
await queue.put(record)
rich_progress.advance(fetch_task_id)
await queue.put(None)
async def consumer():
count = 0
while True:
item = await queue.get()
if item is None:
break
try:
collection.insert_one(item)
except errors.DuplicateKeyError:
pass
count += 1
rich_progress.advance(store_task_id)
rich_progress.update(store_task_id, completed=len(all_records))
await asyncio.gather(producer_queue(), consumer())
rich_logger.info(f"爬取完成丨LPL_bMatchId已全部入库")
async def get_match_data(self, bmatch_ids: List[str], col_name: str, rich_progress=None) -> None:
"""
生产者-消费者解耦:高并发爬取match_data并结构化入库,进度独立。
:param bmatch_ids: 比赛ID列表
:param col_name: MongoDB集合名称
:param rich_progress: RichProgressUtils实例
:return: None
"""
collection = mongo_utils.use_collection(col_name)
session_timeout = aiohttp.ClientTimeout(total=30)
fetch_task_id = rich_progress.add_task("[LPL] match_data爬取", total=len(bmatch_ids))
process_task_id = rich_progress.add_task("[LPL] process_data入库", total=len(bmatch_ids))
hero_win_rates = process.read_win_rate()
process_queue = asyncio.Queue(maxsize=500)
total_count = len(bmatch_ids)
async def fetcher():
async with aiohttp.ClientSession(timeout=session_timeout) as session:
sem = asyncio.Semaphore(50)
async def fetch_one(bid):
url = self.match_url.format(bid)
async with sem:
try:
async with session.get(url, headers=self.headers, cookies=self.cookies) as resp:
data = await resp.json()
if data.get('success') and data.get('data'):
match = data['data']
await process_queue.put(match)
except Exception as e:
rich_logger.error(f"[LPL] match_data爬取失败: {bid} {e}")
self.rich_progress.advance(fetch_task_id)
await asyncio.gather(*(fetch_one(bid) for bid in bmatch_ids))
await process_queue.put(None)
async def processor():
count = 0
while True:
match = await process_queue.get()
if match is None:
break
try:
processed_data = {}
teamAId, teamAName, teamBId, teamBName = match['teamAId'], match['teamAName'], match['teamBId'], match['teamBName']
matchWin = 1 if match["matchWin"] == teamAId else 0
processed_data.update({
"teamAId": teamAId,
"teamAName": teamAName,
"teamBId": teamBId,
"teamBName": teamBName,
"matchWin": matchWin,
})
matchInfos = match["matchInfos"]
for bo in matchInfos:
teamInfos = bo["teamInfos"]
team_po = "A"
for team in teamInfos:
playerInfos = team["playerInfos"]
count_ = 1
for player in playerInfos:
playerLocation = "ADC" if player["playerLocation"] == "BOT" else player["playerLocation"]
heroId, heroTitle, heroName = player["heroId"], player["heroTitle"], player["heroName"]
key = f"{heroId}{playerLocation[:3].upper()}"
heroWinRate = hero_win_rates.get(key, 0.50)
processed_data.update({
f"{team_po}{count_}playerLocation": playerLocation,
f"{team_po}{count_}heroId": heroId,
f"{team_po}{count_}heroName": f"{heroTitle}-{heroName}",
f"{team_po}{count_}heroWinRate": heroWinRate,
})
count_ += 1
team_po = "B"
if isinstance(processed_data, dict):
collection.insert_one(processed_data)
else:
rich_logger.error(f"[LPL] processed_data类型错误: {type(processed_data)} | bMatchId: {match.get('bMatchId', 'unknown')}")
except Exception as e:
match_id = match.get('bMatchId', 'unknown') if isinstance(match, dict) else 'unknown'
snippet = orjson.dumps(match)[:200].decode('utf-8') if isinstance(match, dict) else str(match)[:200]
rich_logger.error(f"[LPL] process_data单条处理失败: {str(e)} | bMatchId: {match_id} | 数据片段: {snippet} ...")
count += 1
rich_progress.advance(process_task_id)
rich_progress.update(process_task_id, completed=total_count)
await asyncio.gather(fetcher(), processor())
rich_logger.info(f"[LPL] match_data已全部入库")
return None
async def main(self, lpl_db: str, col_season: str, col_bmatch: str, col_match: str, seasons: dict) -> None:
"""
LPL主流程,自动串联所有流程:抓取seasonIds、bMatchIds、match_data。
:param lpl_db: LPL专用MongoDB数据库名称
:param col_season: 赛季ID集合名
:param col_bmatch: bMatchId集合名
:param col_match: 结构化比赛数据集合名
:param seasons: 赛季ID映射字典{str: str}
:return: None
"""
mongo_utils.use_db(lpl_db)
await self.get_seasonIds(col_season, seasons, self.rich_progress)
await self.get_bMatchIds(col_bmatch, seasons, self.rich_progress)
bmatch_ids = [item['bMatchId'] for item in mongo_utils.use_collection(col_bmatch).find({}, {'bMatchId': 1, '_id': 0})]
await self.get_match_data(bmatch_ids, col_match, self.rich_progress)
rich_logger.info("[LPL] main流程执行完毕")