Viper3733's picture
Upload/Update Data_CrawlProcess/Wanplus.py
b983df9 verified
# -*- coding:utf-8 -*-
# @Software :PyCharm
# @Project :LOL-DeepWinPredictor
# @Path :/Data_CrawlProcess
# @FileName :Wanplus.py
# @Time :2025/4/21 23:36
# @Author :Viper373
# @GitHub :https://github.com/Viper373
# @Home :https://viper3.top
# @Blog :https://blog.viper3.top
import asyncio
import copy
from functools import partial
from typing import List, Optional
import aiohttp
import orjson
import requests
from bs4 import BeautifulSoup
from pymongo import ASCENDING, errors
from Data_CrawlProcess import env
from Data_CrawlProcess.Process import Process
from tool_utils.log_utils import RichLogger
from tool_utils.mongo_utils import MongoUtils
from tool_utils.progress_utils import RichProgressUtils
process = Process()
rich_logger = RichLogger()
rich_progress = RichProgressUtils()
mongo_utils = MongoUtils()
class Wanplus:
def __init__(self, rich_progress=None):
"""
Wanplus类初始化方法。
:param rich_progress: RichProgressUtils实例(用于全局进度条)
:return: None
"""
self.start_url = "https://wanplus.cn/ajax/stats/list"
self.team_url = "https://www.wanplus.cn/ajax/team/recent"
self.match_url = "https://www.wanplus.cn/schedule/{}.html"
self.bo_url = "https://www.wanplus.cn/ajax/matchdetail/{}"
self.headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5,ko;q=0.4,fr;q=0.3',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Origin': 'https://www.wanplus.cn',
'Pragma': 'no-cache',
'Referer': 'https://www.wanplus.cn/lol/teamstats',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36 Edg/135.0.0.0',
'X-CSRF-Token': '1568910741',
'X-Requested-With': 'XMLHttpRequest',
'sec-ch-ua': '"Microsoft Edge";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
self.cookies = {
'wanplus_sid': '30a98b3049e7fdb540466e4b8ba8d3aa',
'wp_pvid': '9102809835',
'wp_info': 'ssid=s4318647280',
'Hm_lvt_23f0107a6dc01005e1cee339f3e738e9': '1745729309,1745824404',
'HMACCOUNT': '499406815A7E1420',
'wanplus_token': 'e383c73bd4ebd47dd22cd018bdcece59',
'wanplus_storage': 'w%2F0j5rOmbyuiKxm9zzaVzbybAqKwrSXHI5Ew3gGl58bh4NW5lPnTHHQ2gts8TehRKLE9zwNsxT4hH5VdpJiAgc%2FHlnes0bhxtraCZFLeY%2FhwzmvP%2BvZn22cC3VuqsLZLc%2FQ3wUwI9ScitpOHv8IFUfsiHg',
'wanplus_csrf': '_csrf_tk_1485024661',
'isShown': '1',
'gameType': '2',
'Hm_lpvt_23f0107a6dc01005e1cee339f3e738e9': '1745953169',
}
self.data = {
'_gtk': env.GTK,
'draw': '1',
'columns[0][data]': 'order',
'columns[0][name]': '',
'columns[0][searchable]': 'true',
'columns[0][orderable]': 'false',
'columns[0][search][value]': '',
'columns[0][search][regex]': 'false',
'columns[1][data]': 'teamname',
'columns[1][name]': '',
'columns[1][searchable]': 'true',
'columns[1][orderable]': 'false',
'columns[1][search][value]': '',
'columns[1][search][regex]': 'false',
'columns[2][data]': 'kda',
'columns[2][name]': '',
'columns[2][searchable]': 'true',
'columns[2][orderable]': 'true',
'columns[2][search][value]': '',
'columns[2][search][regex]': 'false',
'columns[3][data]': 'killsPergame',
'columns[3][name]': '',
'columns[3][searchable]': 'true',
'columns[3][orderable]': 'true',
'columns[3][search][value]': '',
'columns[3][search][regex]': 'false',
'columns[4][data]': 'deathsPergame',
'columns[4][name]': '',
'columns[4][searchable]': 'true',
'columns[4][orderable]': 'true',
'columns[4][search][value]': '',
'columns[4][search][regex]': 'false',
'columns[5][data]': 'damagetoheroPermin',
'columns[5][name]': '',
'columns[5][searchable]': 'true',
'columns[5][orderable]': 'true',
'columns[5][search][value]': '',
'columns[5][search][regex]': 'false',
'columns[6][data]': 'fstbloodpercentage',
'columns[6][name]': '',
'columns[6][searchable]': 'true',
'columns[6][orderable]': 'true',
'columns[6][search][value]': '',
'columns[6][search][regex]': 'false',
'columns[7][data]': 'avgDuration',
'columns[7][name]': '',
'columns[7][searchable]': 'true',
'columns[7][orderable]': 'true',
'columns[7][search][value]': '',
'columns[7][search][regex]': 'false',
'columns[8][data]': 'goldpermatch',
'columns[8][name]': '',
'columns[8][searchable]': 'true',
'columns[8][orderable]': 'true',
'columns[8][search][value]': '',
'columns[8][search][regex]': 'false',
'columns[9][data]': 'goldsPermin',
'columns[9][name]': '',
'columns[9][searchable]': 'true',
'columns[9][orderable]': 'true',
'columns[9][search][value]': '',
'columns[9][search][regex]': 'false',
'columns[10][data]': 'lasthitPermin',
'columns[10][name]': '',
'columns[10][searchable]': 'true',
'columns[10][orderable]': 'true',
'columns[10][search][value]': '',
'columns[10][search][regex]': 'false',
'columns[11][data]': 'dragonkillsPergame',
'columns[11][name]': '',
'columns[11][searchable]': 'true',
'columns[11][orderable]': 'true',
'columns[11][search][value]': '',
'columns[11][search][regex]': 'false',
'columns[12][data]': 'dragonkillspercentage',
'columns[12][name]': '',
'columns[12][searchable]': 'true',
'columns[12][orderable]': 'true',
'columns[12][search][value]': '',
'columns[12][search][regex]': 'false',
'columns[13][data]': 'baronkillsPergame',
'columns[13][name]': '',
'columns[13][searchable]': 'true',
'columns[13][orderable]': 'true',
'columns[13][search][value]': '',
'columns[13][search][regex]': 'false',
'columns[14][data]': 'baronkillspercentage',
'columns[14][name]': '',
'columns[14][searchable]': 'true',
'columns[14][orderable]': 'true',
'columns[14][search][value]': '',
'columns[14][search][regex]': 'false',
'columns[15][data]': 'wardsplacedpermin',
'columns[15][name]': '',
'columns[15][searchable]': 'true',
'columns[15][orderable]': 'true',
'columns[15][search][value]': '',
'columns[15][search][regex]': 'false',
'columns[16][data]': 'wardskilledpermin',
'columns[16][name]': '',
'columns[16][searchable]': 'true',
'columns[16][orderable]': 'true',
'columns[16][search][value]': '',
'columns[16][search][regex]': 'false',
'columns[17][data]': 'wardskilledrate',
'columns[17][name]': '',
'columns[17][searchable]': 'true',
'columns[17][orderable]': 'true',
'columns[17][search][value]': '',
'columns[17][search][regex]': 'false',
'columns[18][data]': 'towertakensPergame',
'columns[18][name]': '',
'columns[18][searchable]': 'true',
'columns[18][orderable]': 'true',
'columns[18][search][value]': '',
'columns[18][search][regex]': 'false',
'columns[19][data]': 'towerdeathsPergame',
'columns[19][name]': '',
'columns[19][searchable]': 'true',
'columns[19][orderable]': 'true',
'columns[19][search][value]': '',
'columns[19][search][regex]': 'false',
'order[0][column]': '2',
'order[0][dir]': 'desc',
'start': '0',
'length': '20',
'search[value]': '',
'search[regex]': 'false',
'area': '',
'eid': None,
'type': 'team',
'gametype': '2',
'filter': '{"team":{},"player":{},"meta":{}}',
}
self.params = {
'isAjax': '1',
'teamId': '7320',
'gameType': '2',
'objTeamId': '0',
'page': '99',
'egid': '0',
'_gtk': env.GTK,
}
self.proxies = env.PROXIES
self.hero_win_rates = process.read_win_rate()
self.po_dict = {"1": "TOP", "2": "JUN", "3": "MID", "4": "ADC", "5": "SUP"}
self.rich_progress = rich_progress or RichProgressUtils()
async def get_eids(self, db_name: str, col_name: str) -> None:
"""
异步生产者-消费者:获取赛事ID并写入数据库。
:param db_name: MongoDB数据库名称
:param col_name: MongoDB集合名称
:return: None
"""
mongo_utils.use_db(db_name)
collection = mongo_utils.use_collection(col_name)
collection.create_index([("eid", ASCENDING)], unique=True)
queue = asyncio.Queue(maxsize=20)
# 先获取eids
loop = asyncio.get_event_loop()
request_func = partial(
requests.post,
url=self.start_url,
headers=self.headers,
cookies=self.cookies,
data=self.data,
proxies=self.proxies
)
response = await loop.run_in_executor(None, request_func)
if response.status_code != 200:
rich_logger.error(f"获取eids失败,状态码: {response.status_code}")
return
eid_dict = orjson.loads(response.text).get('eventList', {})
if not eid_dict:
rich_logger.warning("未获取到eids数据")
return
eids = [
{"eid": eid, "name": info.get('name')}
for eid, info in eid_dict.items()
]
fetch_task_id = self.rich_progress.add_task("[Wanplus] eids生产", total=len(eids))
store_task_id = self.rich_progress.add_task("[Wanplus] eids入库", total=len(eids))
async def producer():
for eid_info in eids:
await queue.put(eid_info)
self.rich_progress.advance(fetch_task_id)
await queue.put(None)
async def consumer():
count = 0
while True:
item = await queue.get()
if item is None:
break
try:
collection.insert_one(item)
except errors.DuplicateKeyError:
pass
count += 1
self.rich_progress.advance(store_task_id)
self.rich_progress.update(store_task_id, completed=len(eids))
rich_logger.info(f"[Wanplus] 赛事ID爬取完成丨共{count}条")
await asyncio.gather(producer(), consumer())
async def get_teamids(self, db_name: str, col_name: str, eid_list: list) -> None:
"""
异步生产者-消费者:获取队伍ID并写入数据库。
:param db_name: MongoDB数据库名称
:param col_name: MongoDB集合名称
:param eid_list: 赛事eid列表
:return: None
"""
mongo_utils.use_db(db_name)
collection = mongo_utils.use_collection(col_name)
collection.create_index([("teamid", ASCENDING)], unique=True)
queue = asyncio.Queue(maxsize=100)
# 先统计所有teamid的总数
total_teamids = 0
teamid_records = []
loop = asyncio.get_event_loop()
for _eid in eid_list:
data = copy.deepcopy(self.data)
data['eid'] = _eid
try:
request_func = partial(
requests.post,
url=self.start_url,
headers=self.headers,
cookies=self.cookies,
data=data,
proxies=self.proxies
)
response = await loop.run_in_executor(None, request_func)
if response.status_code != 200:
continue
response_json = orjson.loads(response.text)
if response_json.get('msg') == 'success':
data_list = response_json.get('data')
if not data_list:
continue
for d in data_list:
if d.get('teamid'):
record = {
'teamid': d.get('teamid'),
'teamname': d.get('teamname')
}
teamid_records.append(record)
except Exception:
continue
total_teamids = len(teamid_records)
fetch_task_id = self.rich_progress.add_task("[Wanplus] teamids生产", total=total_teamids)
store_task_id = self.rich_progress.add_task("[Wanplus] teamids入库", total=total_teamids)
async def producer():
for record in teamid_records:
await queue.put(record)
self.rich_progress.advance(fetch_task_id)
await queue.put(None)
async def consumer():
count = 0
while True:
item = await queue.get()
if item is None:
break
try:
collection.insert_one(item)
except errors.DuplicateKeyError:
pass
count += 1
self.rich_progress.advance(store_task_id)
self.rich_progress.update(store_task_id, completed=total_teamids)
rich_logger.info(f"[Wanplus] 队伍ID爬取完成丨共{count}条")
await asyncio.gather(producer(), consumer())
async def get_scheduleids(self, db_name: str, col_name: str, teamid_list: list) -> None:
"""
异步生产者-消费者:获取赛程ID并写入数据库。
:param db_name: MongoDB数据库名称
:param col_name: MongoDB集合名称
:param teamid_list: 队伍ID列表
:return: None
"""
mongo_utils.use_db(db_name)
collection = mongo_utils.use_collection(col_name)
collection.create_index([("scheduleid", ASCENDING)], unique=True)
queue = asyncio.Queue(maxsize=100)
# 先统计所有scheduleid的总数
scheduleid_records = []
loop = asyncio.get_event_loop()
for _teamid in teamid_list:
params = copy.deepcopy(self.params)
params['teamId'] = _teamid
try:
request_func = partial(
requests.post,
url=self.team_url,
headers=self.headers,
cookies=self.cookies,
params=params,
proxies=self.proxies
)
response = await loop.run_in_executor(None, request_func)
if response.status_code != 200:
continue
response_json = orjson.loads(response.text)
data = response_json.get('data')
if not data:
continue
for d in data:
if d.get('scheduleid'):
record = {
'scheduleid': d.get('scheduleid'),
'desc': f"{d.get('oneseedname')} vs {d.get('twoseedname')}{d.get('starttime')}"
}
scheduleid_records.append(record)
except Exception:
continue
total_scheduleids = len(scheduleid_records)
fetch_task_id = self.rich_progress.add_task("[Wanplus] scheduleids生产", total=total_scheduleids)
store_task_id = self.rich_progress.add_task("[Wanplus] scheduleids入库", total=total_scheduleids)
async def producer():
for record in scheduleid_records:
await queue.put(record)
self.rich_progress.advance(fetch_task_id)
await queue.put(None)
async def consumer():
count = 0
while True:
item = await queue.get()
if item is None:
break
try:
collection.insert_one(item)
except errors.DuplicateKeyError:
pass
count += 1
self.rich_progress.advance(store_task_id)
self.rich_progress.update(store_task_id, completed=total_scheduleids)
rich_logger.info(f"[Wanplus] 赛程ID爬取完成丨共{count}条")
await asyncio.gather(producer(), consumer())
async def get_boids(self, db_name: str, col_name: str, scheduleid_list: list) -> None:
"""
异步生产者-消费者:获取boid并写入数据库。
:param db_name: MongoDB数据库名称
:param col_name: MongoDB集合名称
:param scheduleid_list: 赛程ID列表
:return: None
"""
mongo_utils.use_db(db_name)
collection = mongo_utils.use_collection(col_name)
collection.create_index([("boid", ASCENDING)], unique=True)
queue = asyncio.Queue(maxsize=100)
# 先统计所有boid的总数
boid_records = []
loop = asyncio.get_event_loop()
for _scheduleid in scheduleid_list:
url = self.match_url.format(_scheduleid)
try:
request_func = partial(
requests.get,
url=url,
headers=self.headers,
cookies=self.cookies,
proxies=self.proxies
)
response = await loop.run_in_executor(None, request_func)
if response.status_code != 200:
continue
soup = BeautifulSoup(response.text, 'html.parser')
team_detail_ov = soup.find('ul', attrs={'class': 'team-detail ov'})
if not team_detail_ov:
continue
try:
team_left = team_detail_ov.find('li', attrs={'class': 'team-left'}).find('a').find('span').text
bo_time = team_detail_ov.find('span', attrs={'class': 'time'}).text
team_right = team_detail_ov.find('li', attrs={'class': 'team-right tr'}).find('a').find('span').text
game_div = soup.find('div', attrs={'class': 'game'})
if not game_div:
continue
data_matchid = game_div.find_all('a')
for match_id in data_matchid:
bo_count = match_id.text[-1]
bo_detail = f"{bo_time}{team_left} vs {team_right}丨BO{bo_count}"
bo_id = match_id.get('data-matchid')
if bo_id:
record = {
'boid': bo_id,
'desc': bo_detail
}
boid_records.append(record)
except Exception:
continue
except Exception:
continue
total_boids = len(boid_records)
fetch_task_id = self.rich_progress.add_task("[Wanplus] boids生产", total=total_boids)
store_task_id = self.rich_progress.add_task("[Wanplus] boids入库", total=total_boids)
async def producer():
for record in boid_records:
await queue.put(record)
self.rich_progress.advance(fetch_task_id)
await queue.put(None)
async def consumer():
count = 0
while True:
item = await queue.get()
if item is None:
break
try:
collection.insert_one(item)
except errors.DuplicateKeyError:
pass
count += 1
self.rich_progress.advance(store_task_id)
self.rich_progress.update(store_task_id, completed=total_boids)
rich_logger.info(f"[Wanplus] boid爬取完成丨共{count}条")
await asyncio.gather(producer(), consumer())
async def get_match_data(self, boids_list: List[str], db_name: str, col_name: str):
"""
生产者-消费者解耦:高并发爬取match_data并结构化入库,进度独立。
:param boids_list: BOID列表
:param db_name: MongoDB数据库名称
:param col_name: MongoDB集合名称
:return: None
"""
session_timeout = aiohttp.ClientTimeout(total=30)
fetch_task_id = self.rich_progress.add_task("[Wanplus] match_data爬取", total=len(boids_list))
process_task_id = self.rich_progress.add_task("[Wanplus] match_data入库", total=len(boids_list))
hero_win_rates = self.hero_win_rates
total_count = len(boids_list)
process_queue = asyncio.Queue(maxsize=500)
sem = asyncio.Semaphore(100)
async def fetcher():
async with aiohttp.ClientSession(timeout=session_timeout) as session:
async def fetch_one(boid):
url = self.bo_url.format(boid)
async with sem:
try:
async with session.get(url, headers=self.headers, cookies=self.cookies) as resp:
try:
text = await resp.text()
data = orjson.loads(text)
except Exception as e:
rich_logger.error(f"[Wanplus] match_data爬取失败: {boid} {resp.status}, message='{e}', url='{url}', resp_text='{text[:100]}'")
return
if data.get('msg') == 'success' and data.get('data'):
match_data = data['data']
# 类型判断,防止NoneType报错
if not match_data or not isinstance(match_data, dict):
return
match = {"boid": boid, "match_data": match_data}
await process_queue.put(match)
except Exception as e:
rich_logger.error(f"[Wanplus] match_data爬取失败: {boid} {e}")
self.rich_progress.advance(fetch_task_id)
await asyncio.gather(*(fetch_one(boid) for boid in boids_list))
await process_queue.put(None)
async def processor():
count = 0
mongo_utils.use_db(db_name)
collection = mongo_utils.use_collection(col_name)
while True:
match = await process_queue.get()
if match is None:
break
try:
# 类型判断,防止NoneType报错
if not match or not isinstance(match, dict):
process_queue.task_done()
continue
if 'boid' not in match or 'match_data' not in match:
process_queue.task_done()
continue
processed_data = {"boid": match['boid']}
new_match = match["match_data"]
if not new_match or not isinstance(new_match, dict):
process_queue.task_done()
continue
plList = new_match.get("plList")
info = new_match.get("info")
if not plList or not info:
process_queue.task_done()
continue
processed_data.update({
"teamAId": info["oneteam"]["teamid"],
"teamAName": info["oneteam"]["teamalias"],
"teamBId": info["twoteam"]["teamid"],
"teamBName": info["twoteam"]["teamalias"],
"matchWin": 1 if info["winner"] == info["oneteam"]["teamid"] else 0,
})
team_po = "A"
isbreak = False
for pl in plList:
if not isinstance(pl, dict):
continue # 跳过空列表或异常结构
for idx, p in pl.items():
playerLocation = self.po_dict.get(idx)
try:
heroId, heroName = p["cpheroid"], p["heroname"]
except Exception:
isbreak = True
continue
key = f"{heroId}{playerLocation[:3].upper()}"
heroWinRate = hero_win_rates.get(key, 0.50)
processed_data.update({
f"{team_po}{idx}playerLocation": playerLocation,
f"{team_po}{idx}heroId": heroId,
f"{team_po}{idx}heroName": f"{heroName}",
f"{team_po}{idx}heroWinRate": heroWinRate,
})
team_po = "B"
if not isbreak:
collection.insert_one(processed_data)
except Exception as e:
rich_logger.error(f"[Wanplus] process_data单条处理失败: {str(e)} | 数据: {match}")
count += 1
self.rich_progress.advance(process_task_id)
self.rich_progress.update(process_task_id, completed=total_count)
await asyncio.gather(fetcher(), processor())
rich_logger.info(f"[Wanplus] match_data已全部入库")
return None
async def main(self, wanplus_db=None, col_eid=None, col_team=None, col_schedule=None, col_boid=None, col_match=None) -> None:
"""
Wanplus主流程,自动串联所有流程:抓取eids、teamids、scheduleids、boids、match_data。
:param wanplus_db: Wanplus专用MongoDB数据库名称
:param col_eid: 赛事ID集合名
:param col_team: 队伍ID集合名
:param col_schedule: 赛程ID集合名
:param col_boid: boid集合名
:param col_match: 结构化比赛数据集合名
:return: None
"""
mongo_utils.use_db(wanplus_db)
await self.get_eids(wanplus_db, col_eid)
eid_list = [item['eid'] for item in mongo_utils.use_collection(col_eid).find({}, {'eid': 1, '_id': 0})]
await self.get_teamids(wanplus_db, col_team, eid_list)
teamid_list = [item['teamid'] for item in mongo_utils.use_collection(col_team).find({}, {'teamid': 1, '_id': 0})]
await self.get_scheduleids(wanplus_db, col_schedule, teamid_list)
scheduleid_list = [item['scheduleid'] for item in mongo_utils.use_collection(col_schedule).find({}, {'scheduleid': 1, '_id': 0})]
await self.get_boids(wanplus_db, col_boid, scheduleid_list)
boids_list = [item['boid'] for item in mongo_utils.use_collection(col_boid).find({}, {'boid': 1, '_id': 0})]
await self.get_match_data(boids_list, wanplus_db, col_match)
rich_logger.info(f"[Wanplus] main流程执行完毕")