|
import datetime |
|
import logging |
|
import logging.handlers |
|
import os |
|
import sys |
|
import numpy as np |
|
|
|
import requests |
|
|
|
from .constants import LOGDIR |
|
|
|
server_error_msg = "**NETWORK ERROR DUE TO HIGH TRAFFIC. PLEASE REGENERATE OR REFRESH THIS PAGE.**" |
|
moderation_msg = "I am sorry. Your input may violate our content moderation guidelines. Please avoid using harmful or offensive content." |
|
|
|
handler = None |
|
|
|
import torch.distributed as dist |
|
|
|
try: |
|
import av |
|
except ImportError: |
|
print("Please install pyav to use video processing functions.") |
|
|
|
|
|
def process_video_with_pyav(video_file, data_args): |
|
container = av.open(video_file) |
|
stream = container.streams.video[0] |
|
total_frame_num = stream.frames |
|
avg_fps = round(stream.average_rate / data_args.video_fps) |
|
frame_idx = [i for i in range(0, total_frame_num, avg_fps)] |
|
if data_args.frames_upbound > 0: |
|
if len(frame_idx) > data_args.frames_upbound: |
|
uniform_sampled_frames = np.linspace(0, total_frame_num - 1, data_args.frames_upbound, dtype=int) |
|
frame_idx = uniform_sampled_frames.tolist() |
|
|
|
video_frames = [] |
|
for index, frame in enumerate(container.decode(video=0)): |
|
if index in frame_idx: |
|
video_frames.append(frame.to_rgb().to_ndarray()) |
|
if len(video_frames) == len(frame_idx): |
|
break |
|
|
|
video = np.stack(video_frames) |
|
return video |
|
|
|
|
|
def rank0_print(*args): |
|
if dist.is_initialized(): |
|
if dist.get_rank() == 0: |
|
print(f"Rank {dist.get_rank()}: ", *args) |
|
else: |
|
print(*args) |
|
|
|
|
|
def build_logger(logger_name, logger_filename): |
|
global handler |
|
|
|
formatter = logging.Formatter( |
|
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", |
|
datefmt="%Y-%m-%d %H:%M:%S", |
|
) |
|
|
|
|
|
if not logging.getLogger().handlers: |
|
logging.basicConfig(level=logging.INFO) |
|
logging.getLogger().handlers[0].setFormatter(formatter) |
|
|
|
|
|
stdout_logger = logging.getLogger("stdout") |
|
stdout_logger.setLevel(logging.INFO) |
|
sl = StreamToLogger(stdout_logger, logging.INFO) |
|
sys.stdout = sl |
|
|
|
stderr_logger = logging.getLogger("stderr") |
|
stderr_logger.setLevel(logging.ERROR) |
|
sl = StreamToLogger(stderr_logger, logging.ERROR) |
|
sys.stderr = sl |
|
|
|
|
|
logger = logging.getLogger(logger_name) |
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
if handler is None: |
|
os.makedirs(LOGDIR, exist_ok=True) |
|
filename = os.path.join(LOGDIR, logger_filename) |
|
handler = logging.handlers.TimedRotatingFileHandler(filename, when="D", utc=True) |
|
handler.setFormatter(formatter) |
|
|
|
for name, item in logging.root.manager.loggerDict.items(): |
|
if isinstance(item, logging.Logger): |
|
item.addHandler(handler) |
|
|
|
return logger |
|
|
|
|
|
class StreamToLogger(object): |
|
""" |
|
Fake file-like stream object that redirects writes to a logger instance. |
|
""" |
|
|
|
def __init__(self, logger, log_level=logging.INFO): |
|
self.terminal = sys.stdout |
|
self.logger = logger |
|
self.log_level = log_level |
|
self.linebuf = "" |
|
|
|
def __getattr__(self, attr): |
|
return getattr(self.terminal, attr) |
|
|
|
def write(self, buf): |
|
temp_linebuf = self.linebuf + buf |
|
self.linebuf = "" |
|
for line in temp_linebuf.splitlines(True): |
|
|
|
|
|
|
|
|
|
|
|
if line[-1] == "\n": |
|
self.logger.log(self.log_level, line.rstrip()) |
|
else: |
|
self.linebuf += line |
|
|
|
def flush(self): |
|
if self.linebuf != "": |
|
self.logger.log(self.log_level, self.linebuf.rstrip()) |
|
self.linebuf = "" |
|
|
|
|
|
def disable_torch_init(): |
|
""" |
|
Disable the redundant torch default initialization to accelerate model creation. |
|
""" |
|
import torch |
|
|
|
setattr(torch.nn.Linear, "reset_parameters", lambda self: None) |
|
setattr(torch.nn.LayerNorm, "reset_parameters", lambda self: None) |
|
|
|
|
|
def violates_moderation(text): |
|
""" |
|
Check whether the text violates OpenAI moderation API. |
|
""" |
|
url = "https://api.openai.com/v1/moderations" |
|
headers = {"Content-Type": "application/json", "Authorization": "Bearer " + os.environ["OPENAI_API_KEY"]} |
|
text = text.replace("\n", "") |
|
data = "{" + '"input": ' + f'"{text}"' + "}" |
|
data = data.encode("utf-8") |
|
try: |
|
ret = requests.post(url, headers=headers, data=data, timeout=5) |
|
flagged = ret.json()["results"][0]["flagged"] |
|
except requests.exceptions.RequestException as e: |
|
print(f"######################### Moderation Error: {e} #########################") |
|
flagged = False |
|
except KeyError as e: |
|
print(f"######################### Moderation Error: {e} #########################") |
|
flagged = False |
|
|
|
return flagged |
|
|
|
|
|
def pretty_print_semaphore(semaphore): |
|
if semaphore is None: |
|
return "None" |
|
return f"Semaphore(value={semaphore._value}, locked={semaphore.locked()})" |
|
|