Spaces:
Running
Running
import time | |
from datetime import datetime, timedelta | |
import asyncio | |
from graphgen.utils import logger | |
class RPM: | |
def __init__(self, rpm: int = 1000): | |
self.rpm = rpm | |
self.record = {'rpm_slot': self.get_minute_slot(), 'counter': 0} | |
def get_minute_slot(self): | |
current_time = time.time() | |
dt_object = datetime.fromtimestamp(current_time) | |
total_minutes_since_midnight = dt_object.hour * 60 + dt_object.minute | |
return total_minutes_since_midnight | |
async def wait(self, silent=False): | |
current = time.time() | |
dt_object = datetime.fromtimestamp(current) | |
minute_slot = self.get_minute_slot() | |
if self.record['rpm_slot'] == minute_slot: | |
# check RPM exceed | |
if self.record['counter'] >= self.rpm: | |
# wait until next minute | |
next_minute = dt_object.replace( | |
second=0, microsecond=0) + timedelta(minutes=1) | |
_next = next_minute.timestamp() | |
sleep_time = abs(_next - current) | |
if not silent: | |
logger.info('RPM sleep %s', sleep_time) | |
await asyncio.sleep(sleep_time) | |
self.record = { | |
'rpm_slot': self.get_minute_slot(), | |
'counter': 0 | |
} | |
else: | |
self.record = {'rpm_slot': self.get_minute_slot(), 'counter': 0} | |
self.record['counter'] += 1 | |
if not silent: | |
logger.debug(self.record) | |
class TPM: | |
def __init__(self, tpm: int = 20000): | |
self.tpm = tpm | |
self.record = {'tpm_slot': self.get_minute_slot(), 'counter': 0} | |
def get_minute_slot(self): | |
current_time = time.time() | |
dt_object = datetime.fromtimestamp(current_time) | |
total_minutes_since_midnight = dt_object.hour * 60 + dt_object.minute | |
return total_minutes_since_midnight | |
async def wait(self, token_count, silent=False): | |
current = time.time() | |
dt_object = datetime.fromtimestamp(current) | |
minute_slot = self.get_minute_slot() | |
# get next slot, skip | |
if self.record['tpm_slot'] != minute_slot: | |
self.record = {'tpm_slot': minute_slot, 'counter': token_count} | |
return | |
# check RPM exceed | |
self.record['counter'] += token_count | |
if self.record['counter'] > self.tpm: | |
# wait until next minute | |
next_minute = dt_object.replace( | |
second=0, microsecond=0) + timedelta(minutes=1) | |
_next = next_minute.timestamp() | |
sleep_time = abs(_next - current) | |
logger.info('TPM sleep %s', sleep_time) | |
await asyncio.sleep(sleep_time) | |
self.record = { | |
'tpm_slot': self.get_minute_slot(), | |
'counter': token_count | |
} | |
if not silent: | |
logger.debug(self.record) | |