from threading import Thread from datetime import datetime import cv2 import uvicorn import contextlib import threading import time import requests class Server(uvicorn.Server): ''' Use this Server class to server a uvicorn in a separate thread, so to proceed to gradio UI launch https://github.com/encode/uvicorn/issues/742 A couple of other links for reference -- https://stackoverflow.com/questions/61577643/python-how-to-use-fastapi-and-uvicorn-run-without-blocking-the-thread https://stackoverflow.com/questions/76142431/how-to-run-another-application-within-the-same-running-event-loop/76148361#76148361 ''' def install_signal_handlers(self): pass @contextlib.contextmanager def run_in_thread(self): thread = threading.Thread(target=self.run) thread.start() try: while not self.started: time.sleep(1e-3) yield finally: self.should_exit = True thread.join() class CountsPerSec: """ Class that tracks the number of occurrences ("counts") of an arbitrary event and returns the frequency in occurrences (counts) per second. The caller must increment the count. """ def __init__(self): self._start_time = None self._num_occurrences = 0 def start(self): self._start_time = datetime.now() return self def increment(self): self._num_occurrences += 1 def countsPerSec(self): elapsed_time = (datetime.now() - self._start_time).total_seconds() return self._num_occurrences / elapsed_time if elapsed_time > 0 else 0 class VideoGet: """ Class that continuously gets frames from a VideoCapture object with a dedicated thread. """ def __init__(self, src=0): self.stream = cv2.VideoCapture(src) (self.grabbed, self.frame) = self.stream.read() self.tn = Thread(target=self.get, args=()) self.stopped = False def start(self): self.tn.start() return self def get(self): while not self.stopped: if not self.grabbed: self.stop() else: (self.grabbed, self.frame) = self.stream.read() def stop(self): self.tn.join() self.stopped = True class VideoShow: """ Class that continuously shows a frame using a dedicated thread. """ def __init__(self, frame=None): self.frame = frame self.tn = Thread(target=self.show, args=()) self.stopped = False def start(self): self.tn.start() return self def show(self): while not self.stopped: cv2.imshow("Video", self.frame) if cv2.waitKey(1) == ord("q"): self.stopped = True def stop(self): self.tn.join() self.stopped = True def show_fps(frame, iterations_per_sec): """ Add iterations per second text to lower-left corner of a frame. """ cv2.putText( img=frame, text="{:.0f} fps".format(iterations_per_sec), org=(1000, 50), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.8, color=(0, 255, 255), thickness=1, lineType=cv2.LINE_AA ) cv2.putText( img=frame, # annotated_frame, text=datetime.now().strftime("%m/%d/%Y %H:%M:%S"), org=(500, 50), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.8, color=(0, 255, 255), thickness=1, lineType=cv2.LINE_AA ) return frame def draw_text( img, text, pos=(0, 0), font=cv2.FONT_HERSHEY_SIMPLEX, font_scale=1, font_thickness=2, line_type=cv2.LINE_AA, text_color=(0, 255, 0), text_color_bg=(0, 0, 0) ) -> None: """draw a text with background color on image frame Args: img (_type_): _description_ text (_type_): _description_ pos (tuple, optional): _description_. Defaults to (0, 0). font (_type_, optional): _description_. Defaults to cv2.FONT_HERSHEY_SIMPLEX. font_scale (int, optional): _description_. Defaults to 1. font_thickness (int, optional): _description_. Defaults to 2. line_type (_type_, optional): _description_. Defaults to cv2.LINE_AA. text_color (tuple, optional): _description_. Defaults to (0, 255, 0). text_color_bg (tuple, optional): _description_. Defaults to (0, 0, 0). Returns: _type_: _description_ """ x, y = pos text_size, _ = cv2.getTextSize(text, font, font_scale, font_thickness) text_w, text_h = text_size cv2.rectangle(img, (x, y + 10), (x + text_w, max(0, y - text_h - 10)), text_color_bg, -1) cv2.putText( img=img, text=text, org=pos, fontFace=font, fontScale=font_scale, color=text_color, thickness=font_thickness, lineType=line_type ) return text_size def try_site(youtube_url: str) -> bool: """Check if a youtube url is playable Args: youtube_url (str): a given url for testing Returns: bool: whether or not that youtube_url is playable """ pattern = '"playabilityStatus":{"status":"ERROR","reason":"Video unavailable"' request = requests.get(youtube_url) return False if pattern in request.text else True def make_table_from_dict(obj: dict, selected_key: str) -> list: table = [] for k, v in obj.items(): if k == selected_key: # print(k, v, selected_key) table.append({"name": k, "value": v, "selected": True}) else: table.append({"name": k, "value": v, "selected": False}) return table def make_table_from_dict_multiselect( obj: dict, selected_vals: list[int] ) -> list: table = [] for k, v in obj.items(): if v in selected_vals: # print(k, v, selected_key) table.append({"name": k, "value": v, "selected": True}) else: table.append({"name": k, "value": v, "selected": False}) return table