# -*- coding:utf-8 -*- # @Software :PyCharm # @Project :LOL-DeepWinPredictor # @Path :/Data_CrawlProcess # @FileName :Concat.py # @Time :2025/5/5 01:52 # @Author :Viper373 # @GitHub :https://github.com/Viper373 # @Home :https://viper3.top # @Blog :https://blog.viper3.top import asyncio from Data_CrawlProcess.Process import Process from tool_utils.log_utils import RichLogger from tool_utils.mongo_utils import MongoUtils process = Process() mongo_utils = MongoUtils() rich_logger = RichLogger() class Concat: def __init__(self, rich_progress=None): """ Concat类初始化方法。 :param rich_progress: RichProgressUtils实例(用于全局进度条) :return: None """ self.rich_progress = rich_progress async def concat_db(self, db1: str, col1: str, db2: str, col2: str, target_db: str, target_col: str, chunk_size: int = 1000, consumer_num: int = 1): """ 合并两个MongoDB集合到一个目标集合(异步队列,边处理边写入并实时推进进度条) :param db1: 源数据库1 :param col1: 源集合1 :param db2: 源数据库2 :param col2: 源集合2 :param target_db: 目标数据库 :param target_col: 目标集合 :param chunk_size: 每批写入数量 :param consumer_num: 消费者数量 :return: None """ mongo_utils.use_db(db1) lpl_col = mongo_utils.use_collection(col1) mongo_utils.use_db(db2) wanplus_col = mongo_utils.use_collection(col2) mongo_utils.use_db(target_db) target_collection = mongo_utils.use_collection(target_col) total_count = lpl_col.count_documents({}) + wanplus_col.count_documents({}) total_task = self.rich_progress.add_task("[Concat]合并LPL与Wanplus", total=total_count) lpl_docs = list(lpl_col.find({}, {'_id': 0})) wanplus_docs = list(wanplus_col.find({}, {'_id': 0})) all_docs = lpl_docs + wanplus_docs queue = asyncio.Queue(maxsize=10) async def producer(): for i in range(0, len(all_docs), chunk_size): await queue.put(all_docs[i:i + chunk_size]) for _ in range(consumer_num): await queue.put(None) # 结束信号 async def consumer(): while True: chunk = await queue.get() if chunk is None: break await asyncio.to_thread(target_collection.insert_many, chunk) await asyncio.to_thread(self.rich_progress.advance, total_task, len(chunk)) await asyncio.gather( producer(), *(consumer() for _ in range(consumer_num)) ) rich_logger.info(f"[Concat] 合并完成丨共计[{total_count}]moba_lol_data") async def main(self, db1: str, col1: str, db2: str, col2: str, target_db: str, target_col: str, json_data: dict) -> None: """ 主流程方法,依次异步执行concat_db_async和append_counter_async :param db1: 源数据库1 :param col1: 源集合1 :param db2: 源数据库2 :param col2: 源集合2 :param target_db: 目标数据库 :param target_col: 目标集合 :param json_data: 包含counter信息的json数据 """ await self.concat_db(db1, col1, db2, col2, target_db, target_col) rich_logger.info("[Concat] main流程执行完毕")