File size: 4,557 Bytes
66e0644 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
import os
import random
import sys
import time
from concurrent.futures import ThreadPoolExecutor
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from typing import Dict
from rich.align import Align
from rich.live import Live
from rich.panel import Panel
from rich.progress import (BarColumn, MofNCompleteColumn, Progress,
SpinnerColumn, TextColumn, TimeElapsedColumn,
TimeRemainingColumn, TransferSpeedColumn)
from tool_utils.file_utils import ProjectRootFinder
from tool_utils.global_console import get_console
from tool_utils.log_utils import RichLogger
rich_logger = RichLogger()
console = get_console()
project_root_finder = ProjectRootFinder(project_type="Python")
project_root = project_root_finder.get_root_name()
class RichProgressUtils:
"""
同步多任务进度条管理类,所有进度条在一个Panel内,Panel始终在终端下方。
日志输出请使用日志类,进度条类不负责日志打印。
"""
def __init__(self, panel_title=project_root, panel_width=160):
self.progress = Progress(
TextColumn("🍔 [#ff8c00]{task.description}[/#ff8c00]"),
SpinnerColumn(spinner_name="dots", style="#ffa500"),
BarColumn(bar_width=None, complete_style="#ffe066", finished_style="#43ea80"),
"🍢",
MofNCompleteColumn(separator="/"),
"🍰",
TransferSpeedColumn(),
"🧁",
TextColumn("[#9370db]{task.percentage:>5.1f}%[/#9370db]"),
"🍭",
TimeElapsedColumn(),
"🍸",
TimeRemainingColumn(),
expand=True,
console=console,
)
self.panel_title = panel_title
self.panel_width = panel_width
self.live = None
self._task_ids: Dict[str, int] = {}
def _render_panel(self):
table = self.progress.make_tasks_table(self.progress.tasks)
panel = Panel(
Align.left(table),
title=self.panel_title,
border_style="white",
width=self.panel_width,
)
return panel
def start(self):
if self.live is None:
self.live = Live(self._render_panel(), refresh_per_second=10, transient=False, console=console)
self.live.start()
def stop(self):
if self.live:
self.live.stop()
self.live = None
def add_task(self, description, total, start=True):
task_id = self.progress.add_task(description, total=total, start=start)
self._refresh()
return task_id
def advance(self, task_id, step=1):
self.progress.advance(task_id, step)
self._refresh()
def update(self, task_id, **kwargs):
self.progress.update(task_id, **kwargs)
self._refresh()
def remove_task(self, task_id):
self.progress.remove_task(task_id)
self._refresh()
def _refresh(self):
if self.live:
self.live.update(self._render_panel())
# ================== 用法示例 =====================
if __name__ == "__main__":
rich_progress = RichProgressUtils()
rich_progress.start()
try:
def inner_worker(task_id, steps, sleep_min=0.01, sleep_max=0.05):
for _ in range(steps):
time.sleep(random.uniform(sleep_min, sleep_max))
rich_progress.advance(task_id)
def outer_worker(task_id, total_steps, inner_threads=5):
steps_per_thread = total_steps // inner_threads
with ThreadPoolExecutor(max_workers=inner_threads) as inner_executor:
futures = [inner_executor.submit(inner_worker, task_id, steps_per_thread) for _ in range(inner_threads)]
for f in futures:
f.result()
# 补齐余数
remainder = total_steps % inner_threads
if remainder:
inner_worker(task_id, remainder)
rich_progress.update(task_id, completed=total_steps)
num_tasks = 4
steps_per_task = 200
task_ids = []
for i in range(num_tasks):
task_ids.append(rich_progress.add_task(f"多线程任务{i + 1}", total=steps_per_task))
with ThreadPoolExecutor(max_workers=num_tasks) as executor:
futures = [executor.submit(outer_worker, task_id, steps_per_task) for task_id in task_ids]
for f in futures:
f.result()
finally:
rich_progress.stop()
|