|
|
from pathlib import Path |
|
|
import os |
|
|
import shutil |
|
|
import string |
|
|
import secrets |
|
|
import hashlib |
|
|
import random |
|
|
import time |
|
|
import re |
|
|
|
|
|
def get_files_count(directory_path): |
|
|
return len(os.listdir(directory_path)) |
|
|
|
|
|
def generate_random_string(length=10): |
|
|
characters = string.ascii_letters |
|
|
random_string = ''.join(secrets.choice(characters) for _ in range(length)) |
|
|
return random_string |
|
|
|
|
|
def generate_random_string_from_input(input_string, length=16): |
|
|
|
|
|
hash_object = hashlib.sha256(input_string.encode()) |
|
|
hashed_string = hash_object.hexdigest() |
|
|
|
|
|
|
|
|
random.seed(hashed_string) |
|
|
|
|
|
|
|
|
characters = string.ascii_letters + string.digits |
|
|
random_string = ''.join(random.choice(characters) for _ in range(length)) |
|
|
|
|
|
return random_string |
|
|
|
|
|
def is_mostly_black(frame, black_threshold=20, percentage_threshold=0.9, sample_rate=10): |
|
|
""" |
|
|
Fast black frame detection using pixel sampling. |
|
|
|
|
|
Args: |
|
|
frame: OpenCV BGR frame (NumPy array) |
|
|
black_threshold: grayscale value below which a pixel is considered black |
|
|
percentage_threshold: fraction of black pixels to consider frame mostly black |
|
|
sample_rate: sample every N-th pixel in both dimensions (higher = faster) |
|
|
Returns: |
|
|
True if mostly black, False otherwise |
|
|
""" |
|
|
import cv2 |
|
|
import numpy as np |
|
|
if frame is None or frame.size == 0: |
|
|
return True |
|
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
sampled = gray[::sample_rate, ::sample_rate] |
|
|
black_count = np.sum(sampled < black_threshold) |
|
|
total_count = sampled.size |
|
|
return (black_count / total_count) >= percentage_threshold |
|
|
|
|
|
def only_alpha(text: str) -> str: |
|
|
|
|
|
return re.sub(r'[^a-zA-Z]', '', text).lower() |
|
|
|
|
|
def manage_gpu(size_gb: float = 0, gpu_index: int = 0, action: str = "check"): |
|
|
""" |
|
|
Manage GPU memory: |
|
|
- check → just prints memory + process table |
|
|
- clear_cache → clears PyTorch cache |
|
|
- kill → kills all GPU processes |
|
|
""" |
|
|
try: |
|
|
import pynvml,signal, gc |
|
|
pynvml.nvmlInit() |
|
|
handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_index) |
|
|
info = pynvml.nvmlDeviceGetMemoryInfo(handle) |
|
|
|
|
|
free_gb = info.free / 1024**3 |
|
|
total_gb = info.total / 1024**3 |
|
|
|
|
|
print(f"\nGPU {gpu_index}: Free {free_gb:.2f} GB / Total {total_gb:.2f} GB") |
|
|
|
|
|
|
|
|
processes = pynvml.nvmlDeviceGetComputeRunningProcesses(handle) |
|
|
print("\nActive GPU Processes:") |
|
|
print(f"{'PID':<8} {'Process Name':<40} {'Used (GB)':<10}") |
|
|
print("-" * 60) |
|
|
for p in processes: |
|
|
used_gb = p.usedGpuMemory / 1024**3 |
|
|
proc_name = pynvml.nvmlSystemGetProcessName(p.pid).decode(errors="ignore") |
|
|
print(f"{p.pid:<8} {proc_name:<40} {used_gb:.2f}") |
|
|
|
|
|
if action == "clear_cache": |
|
|
try: |
|
|
import torch |
|
|
gc.collect() |
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() |
|
|
torch.cuda.reset_peak_memory_stats() |
|
|
torch.cuda.synchronize() |
|
|
time.sleep(1) |
|
|
print("\n🧹 Cleared PyTorch CUDA cache") |
|
|
except ImportError: |
|
|
print("\n⚠️ PyTorch not installed, cannot clear cache.") |
|
|
|
|
|
elif action == "kill": |
|
|
for p in processes: |
|
|
proc_name = pynvml.nvmlSystemGetProcessName(p.pid).decode(errors="ignore") |
|
|
try: |
|
|
os.kill(p.pid, signal.SIGKILL) |
|
|
print(f"❌ Killed {p.pid} ({proc_name})") |
|
|
except Exception as e: |
|
|
print(f"⚠️ Could not kill {p.pid}: {e}") |
|
|
manage_gpu(action="clear_cache") |
|
|
gc.collect() |
|
|
gc.collect() |
|
|
return free_gb > size_gb |
|
|
except: return False |
|
|
|
|
|
def is_gpu_available(verbose=True): |
|
|
import torch |
|
|
if not torch.cuda.is_available(): |
|
|
if verbose: |
|
|
print("CUDA not available.") |
|
|
return False |
|
|
|
|
|
try: |
|
|
|
|
|
torch.empty(1, device="cuda") |
|
|
if verbose: |
|
|
print(f"CUDA available. Using device: {torch.cuda.get_device_name(0)}") |
|
|
return True |
|
|
except RuntimeError as e: |
|
|
if "CUDA-capable device(s) is/are busy or unavailable" in str(e) or \ |
|
|
"CUDA error" in str(e): |
|
|
if verbose: |
|
|
print("CUDA detected but busy/unavailable. Please CPU.") |
|
|
return False |
|
|
raise |