File size: 4,215 Bytes
4bdb245
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import datetime
import logging
import math
import time
from collections import deque
from typing import Any

logger = logging.getLogger(__name__)


def human_time(*args: Any, **kwargs: Any) -> str:
    def timedelta_total_seconds(timedelta: datetime.timedelta) -> float:
        return (
            timedelta.microseconds
            + 0.0
            + (timedelta.seconds + timedelta.days * 24 * 3600) * 10**6
        ) / 10**6

    secs = float(timedelta_total_seconds(datetime.timedelta(*args, **kwargs)))
    # We want (ms) precision below 2 seconds
    if secs < 2:
        return f"{secs * 1000}ms"
    units = [("y", 86400 * 365), ("d", 86400), ("h", 3600), ("m", 60), ("s", 1)]
    parts = []
    for unit, mul in units:
        if secs / mul >= 1 or mul == 1:
            if mul > 1:
                n = int(math.floor(secs / mul))
                secs -= n * mul
            else:
                # >2s we drop the (ms) component.
                n = int(secs)
            if n:
                parts.append(f"{n}{unit}")
    return " ".join(parts)


def eta(iterator: list[Any]) -> Any:
    """Report an ETA after 30s and every 60s thereafter."""
    total = len(iterator)
    _eta = ETA(total)
    _eta.needReport(30)
    for processed, data in enumerate(iterator, start=1):
        yield data
        _eta.update(processed)
        if _eta.needReport(60):
            logger.info(f"{processed}/{total} - ETA {_eta.human_time()}")


class ETA:
    """Predict how long something will take to complete."""

    def __init__(self, total: int):
        self.total: int = total  # Total expected records.
        self.rate: float = 0.0  # per second
        self._timing_data: deque[tuple[float, int]] = deque(maxlen=100)
        self.secondsLeft: float = 0.0
        self.nexttime: float = 0.0

    def human_time(self) -> str:
        if self._calc():
            return f"{human_time(seconds=self.secondsLeft)} @ {int(self.rate * 60)}/min"
        return "(computing)"

    def update(self, count: int) -> None:
        # count should be in the range 0 to self.total
        assert count > 0
        assert count <= self.total
        self._timing_data.append((time.time(), count))  # (X,Y) for pearson

    def needReport(self, whenSecs: int) -> bool:
        now = time.time()
        if now > self.nexttime:
            self.nexttime = now + whenSecs
            return True
        return False

    def _calc(self) -> bool:
        # A sample before a prediction.   Need two points to compute slope!
        if len(self._timing_data) < 3:
            return False

        # http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
        # Calculate means and standard deviations.
        samples = len(self._timing_data)
        # column wise sum of the timing tuples to compute their mean.
        mean_x, mean_y = (
            sum(i) / samples for i in zip(*self._timing_data, strict=False)
        )
        std_x = math.sqrt(
            sum(pow(i[0] - mean_x, 2) for i in self._timing_data) / (samples - 1)
        )
        std_y = math.sqrt(
            sum(pow(i[1] - mean_y, 2) for i in self._timing_data) / (samples - 1)
        )

        # Calculate coefficient.
        sum_xy, sum_sq_v_x, sum_sq_v_y = 0.0, 0.0, 0
        for x, y in self._timing_data:
            x -= mean_x
            y -= mean_y
            sum_xy += x * y
            sum_sq_v_x += pow(x, 2)
            sum_sq_v_y += pow(y, 2)
        pearson_r = sum_xy / math.sqrt(sum_sq_v_x * sum_sq_v_y)

        # Calculate regression line.
        # y = mx + b where m is the slope and b is the y-intercept.
        m = self.rate = pearson_r * (std_y / std_x)
        y = self.total
        b = mean_y - m * mean_x
        x = (y - b) / m

        # Calculate fitted line (transformed/shifted regression line horizontally).
        fitted_b = self._timing_data[-1][1] - (m * self._timing_data[-1][0])
        fitted_x = (y - fitted_b) / m
        _, count = self._timing_data[-1]  # adjust last data point progress count
        adjusted_x = ((fitted_x - x) * (count / self.total)) + x
        eta_epoch = adjusted_x

        self.secondsLeft = max([eta_epoch - time.time(), 0])
        return True