aelitta's picture
Upload folder using huggingface_hub
4bdb245 verified
raw
history blame
No virus
4.22 kB
import datetime
import logging
import math
import time
from collections import deque
from typing import Any
logger = logging.getLogger(__name__)
def human_time(*args: Any, **kwargs: Any) -> str:
def timedelta_total_seconds(timedelta: datetime.timedelta) -> float:
return (
timedelta.microseconds
+ 0.0
+ (timedelta.seconds + timedelta.days * 24 * 3600) * 10**6
) / 10**6
secs = float(timedelta_total_seconds(datetime.timedelta(*args, **kwargs)))
# We want (ms) precision below 2 seconds
if secs < 2:
return f"{secs * 1000}ms"
units = [("y", 86400 * 365), ("d", 86400), ("h", 3600), ("m", 60), ("s", 1)]
parts = []
for unit, mul in units:
if secs / mul >= 1 or mul == 1:
if mul > 1:
n = int(math.floor(secs / mul))
secs -= n * mul
else:
# >2s we drop the (ms) component.
n = int(secs)
if n:
parts.append(f"{n}{unit}")
return " ".join(parts)
def eta(iterator: list[Any]) -> Any:
"""Report an ETA after 30s and every 60s thereafter."""
total = len(iterator)
_eta = ETA(total)
_eta.needReport(30)
for processed, data in enumerate(iterator, start=1):
yield data
_eta.update(processed)
if _eta.needReport(60):
logger.info(f"{processed}/{total} - ETA {_eta.human_time()}")
class ETA:
"""Predict how long something will take to complete."""
def __init__(self, total: int):
self.total: int = total # Total expected records.
self.rate: float = 0.0 # per second
self._timing_data: deque[tuple[float, int]] = deque(maxlen=100)
self.secondsLeft: float = 0.0
self.nexttime: float = 0.0
def human_time(self) -> str:
if self._calc():
return f"{human_time(seconds=self.secondsLeft)} @ {int(self.rate * 60)}/min"
return "(computing)"
def update(self, count: int) -> None:
# count should be in the range 0 to self.total
assert count > 0
assert count <= self.total
self._timing_data.append((time.time(), count)) # (X,Y) for pearson
def needReport(self, whenSecs: int) -> bool:
now = time.time()
if now > self.nexttime:
self.nexttime = now + whenSecs
return True
return False
def _calc(self) -> bool:
# A sample before a prediction. Need two points to compute slope!
if len(self._timing_data) < 3:
return False
# http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
# Calculate means and standard deviations.
samples = len(self._timing_data)
# column wise sum of the timing tuples to compute their mean.
mean_x, mean_y = (
sum(i) / samples for i in zip(*self._timing_data, strict=False)
)
std_x = math.sqrt(
sum(pow(i[0] - mean_x, 2) for i in self._timing_data) / (samples - 1)
)
std_y = math.sqrt(
sum(pow(i[1] - mean_y, 2) for i in self._timing_data) / (samples - 1)
)
# Calculate coefficient.
sum_xy, sum_sq_v_x, sum_sq_v_y = 0.0, 0.0, 0
for x, y in self._timing_data:
x -= mean_x
y -= mean_y
sum_xy += x * y
sum_sq_v_x += pow(x, 2)
sum_sq_v_y += pow(y, 2)
pearson_r = sum_xy / math.sqrt(sum_sq_v_x * sum_sq_v_y)
# Calculate regression line.
# y = mx + b where m is the slope and b is the y-intercept.
m = self.rate = pearson_r * (std_y / std_x)
y = self.total
b = mean_y - m * mean_x
x = (y - b) / m
# Calculate fitted line (transformed/shifted regression line horizontally).
fitted_b = self._timing_data[-1][1] - (m * self._timing_data[-1][0])
fitted_x = (y - fitted_b) / m
_, count = self._timing_data[-1] # adjust last data point progress count
adjusted_x = ((fitted_x - x) * (count / self.total)) + x
eta_epoch = adjusted_x
self.secondsLeft = max([eta_epoch - time.time(), 0])
return True