Spaces:
Sleeping
Sleeping
import threading | |
import time | |
from typing import Union | |
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, model_validator | |
from crewai.utilities.logger import Logger | |
class RPMController(BaseModel): | |
model_config = ConfigDict(arbitrary_types_allowed=True) | |
max_rpm: Union[int, None] = Field(default=None) | |
logger: Logger = Field(default=None) | |
_current_rpm: int = PrivateAttr(default=0) | |
_timer: threading.Timer | None = PrivateAttr(default=None) | |
_lock: threading.Lock = PrivateAttr(default=None) | |
_shutdown_flag = False | |
def reset_counter(self): | |
if self.max_rpm: | |
if not self._shutdown_flag: | |
self._lock = threading.Lock() | |
self._reset_request_count() | |
return self | |
def check_or_wait(self): | |
if not self.max_rpm: | |
return True | |
with self._lock: | |
if self._current_rpm < self.max_rpm: | |
self._current_rpm += 1 | |
return True | |
else: | |
self.logger.log( | |
"info", "Max RPM reached, waiting for next minute to start." | |
) | |
self._wait_for_next_minute() | |
self._current_rpm = 1 | |
return True | |
def stop_rpm_counter(self): | |
if self._timer: | |
self._timer.cancel() | |
self._timer = None | |
def _wait_for_next_minute(self): | |
time.sleep(60) | |
self._current_rpm = 0 | |
def _reset_request_count(self): | |
with self._lock: | |
self._current_rpm = 0 | |
if self._timer: | |
self._shutdown_flag = True | |
self._timer.cancel() | |
self._timer = threading.Timer(60.0, self._reset_request_count) | |
self._timer.start() | |