File size: 3,634 Bytes
bd33f9c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# -*- coding:utf-8 -*-
# @Software       :PyCharm
# @Project        :LOL-DeepWinPredictor
# @Path           :/Data_CrawlProcess
# @FileName       :Concat.py
# @Time           :2025/5/5 01:52
# @Author         :Viper373
# @GitHub         :https://github.com/Viper373
# @Home           :https://viper3.top
# @Blog           :https://blog.viper3.top

import asyncio

from Data_CrawlProcess.Process import Process
from tool_utils.log_utils import RichLogger
from tool_utils.mongo_utils import MongoUtils

process = Process()
mongo_utils = MongoUtils()
rich_logger = RichLogger()


class Concat:
    def __init__(self, rich_progress=None):
        """

        Concat类初始化方法。

        :param rich_progress: RichProgressUtils实例(用于全局进度条)

        :return: None

        """
        self.rich_progress = rich_progress

    async def concat_db(self, db1: str, col1: str, db2: str, col2: str, target_db: str, target_col: str, chunk_size: int = 1000, consumer_num: int = 1):
        """

        合并两个MongoDB集合到一个目标集合(异步队列,边处理边写入并实时推进进度条)

        :param db1: 源数据库1

        :param col1: 源集合1

        :param db2: 源数据库2

        :param col2: 源集合2

        :param target_db: 目标数据库

        :param target_col: 目标集合

        :param chunk_size: 每批写入数量

        :param consumer_num: 消费者数量

        :return: None

        """
        mongo_utils.use_db(db1)
        lpl_col = mongo_utils.use_collection(col1)
        mongo_utils.use_db(db2)
        wanplus_col = mongo_utils.use_collection(col2)
        mongo_utils.use_db(target_db)
        target_collection = mongo_utils.use_collection(target_col)
        total_count = lpl_col.count_documents({}) + wanplus_col.count_documents({})
        total_task = self.rich_progress.add_task("[Concat]合并LPL与Wanplus", total=total_count)
        lpl_docs = list(lpl_col.find({}, {'_id': 0}))
        wanplus_docs = list(wanplus_col.find({}, {'_id': 0}))
        all_docs = lpl_docs + wanplus_docs
        queue = asyncio.Queue(maxsize=10)

        async def producer():
            for i in range(0, len(all_docs), chunk_size):
                await queue.put(all_docs[i:i + chunk_size])
            for _ in range(consumer_num):
                await queue.put(None)  # 结束信号

        async def consumer():
            while True:
                chunk = await queue.get()
                if chunk is None:
                    break
                await asyncio.to_thread(target_collection.insert_many, chunk)
                await asyncio.to_thread(self.rich_progress.advance, total_task, len(chunk))

        await asyncio.gather(
            producer(),
            *(consumer() for _ in range(consumer_num))
        )
        rich_logger.info(f"[Concat] 合并完成丨共计[{total_count}]moba_lol_data")

    async def main(self, db1: str, col1: str, db2: str, col2: str, target_db: str, target_col: str, json_data: dict) -> None:
        """

        主流程方法,依次异步执行concat_db_async和append_counter_async

        :param db1: 源数据库1

        :param col1: 源集合1

        :param db2: 源数据库2

        :param col2: 源集合2

        :param target_db: 目标数据库

        :param target_col: 目标集合

        :param json_data: 包含counter信息的json数据

        """
        await self.concat_db(db1, col1, db2, col2, target_db, target_col)
        rich_logger.info("[Concat] main流程执行完毕")