chenzihong-gavin
init
acd7cf4
import time
from datetime import datetime, timedelta
import asyncio
from graphgen.utils import logger
class RPM:
def __init__(self, rpm: int = 1000):
self.rpm = rpm
self.record = {'rpm_slot': self.get_minute_slot(), 'counter': 0}
def get_minute_slot(self):
current_time = time.time()
dt_object = datetime.fromtimestamp(current_time)
total_minutes_since_midnight = dt_object.hour * 60 + dt_object.minute
return total_minutes_since_midnight
async def wait(self, silent=False):
current = time.time()
dt_object = datetime.fromtimestamp(current)
minute_slot = self.get_minute_slot()
if self.record['rpm_slot'] == minute_slot:
# check RPM exceed
if self.record['counter'] >= self.rpm:
# wait until next minute
next_minute = dt_object.replace(
second=0, microsecond=0) + timedelta(minutes=1)
_next = next_minute.timestamp()
sleep_time = abs(_next - current)
if not silent:
logger.info('RPM sleep %s', sleep_time)
await asyncio.sleep(sleep_time)
self.record = {
'rpm_slot': self.get_minute_slot(),
'counter': 0
}
else:
self.record = {'rpm_slot': self.get_minute_slot(), 'counter': 0}
self.record['counter'] += 1
if not silent:
logger.debug(self.record)
class TPM:
def __init__(self, tpm: int = 20000):
self.tpm = tpm
self.record = {'tpm_slot': self.get_minute_slot(), 'counter': 0}
def get_minute_slot(self):
current_time = time.time()
dt_object = datetime.fromtimestamp(current_time)
total_minutes_since_midnight = dt_object.hour * 60 + dt_object.minute
return total_minutes_since_midnight
async def wait(self, token_count, silent=False):
current = time.time()
dt_object = datetime.fromtimestamp(current)
minute_slot = self.get_minute_slot()
# get next slot, skip
if self.record['tpm_slot'] != minute_slot:
self.record = {'tpm_slot': minute_slot, 'counter': token_count}
return
# check RPM exceed
self.record['counter'] += token_count
if self.record['counter'] > self.tpm:
# wait until next minute
next_minute = dt_object.replace(
second=0, microsecond=0) + timedelta(minutes=1)
_next = next_minute.timestamp()
sleep_time = abs(_next - current)
logger.info('TPM sleep %s', sleep_time)
await asyncio.sleep(sleep_time)
self.record = {
'tpm_slot': self.get_minute_slot(),
'counter': token_count
}
if not silent:
logger.debug(self.record)