Spaces:
Running
Running
import os | |
import json | |
import tempfile | |
import time | |
import uuid | |
import cv2 | |
import numpy as np | |
import gradio as gr | |
import boto3 | |
import torch | |
import subprocess | |
import threading | |
import sys | |
import platform | |
import traceback | |
from concurrent.futures import ThreadPoolExecutor, TimeoutError | |
from botocore.exceptions import NoCredentialsError, ClientError | |
from ultralytics import YOLO | |
from dotenv import load_dotenv | |
load_dotenv() | |
# force immediate flush | |
sys.stdout.reconfigure(line_buffering=True) | |
sys.stderr.reconfigure(line_buffering=True) | |
print("✅ DEBUG: Logging is now forced to flush immediately.") | |
print(f"DEBUG: PyTorch is using CUDA: {torch.cuda.is_available()}") | |
print(f"DEBUG: Available GPUs: {torch.cuda.device_count()}") | |
if torch.cuda.device_count() > 0: | |
print(f"DEBUG: Current GPU: {torch.cuda.get_device_name(0)}") | |
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID') | |
AWS_SECRET_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') | |
S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME') | |
AWS_REGION = os.getenv('AWS_REGION', 'us-east-1') | |
SQS_QUEUE_URL = os.environ.get("SQS_QUEUE_URL") | |
s3_client = boto3.client( | |
's3', | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
region_name=AWS_REGION, | |
) | |
sqs_client = boto3.client( | |
'sqs', | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
region_name=AWS_REGION, | |
) | |
def get_video_properties(video_path): | |
command = [ | |
'ffprobe', '-v', 'error', '-select_streams', 'v:0', | |
'-show_entries', 'stream=codec_name,width,height,r_frame_rate,bit_rate', | |
'-of', 'json', video_path | |
] | |
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
info = json.loads(result.stdout) | |
stream = info['streams'][0] | |
codec_name = stream['codec_name'] | |
width = stream['width'] | |
height = stream['height'] | |
fps = eval(stream['r_frame_rate']) | |
bit_rate = stream.get('bit_rate', None) | |
return codec_name, fps, width, height, int(bit_rate) if bit_rate else None | |
def run_ffmpeg_with_fallback(input_path, output_path, resize_filter, fps, codec_name): | |
""" | |
1) GPU‐only encode: software‐decode + h264_nvenc encode, | |
clamping dimensions to ≤2032px and embedding Rec.709 metadata | |
via bitstream filter with the correct option names. | |
""" | |
if not os.path.exists(input_path): | |
raise RuntimeError(f"Input file does not exist: {input_path}") | |
# 1) NVENC pass | |
clamp_filter = '-vf "scale=min(2032\\,iw):min(2032\\,ih)"' | |
nvenc_cmd = ( | |
f'ffmpeg -y ' | |
f'-i "{input_path}" ' | |
f'{clamp_filter} ' | |
f'-pix_fmt yuv420p ' | |
f'-color_range tv ' | |
f'-color_primaries bt709 ' | |
f'-color_trc bt709 ' | |
f'-colorspace bt709 ' | |
f'-c:v h264_nvenc -preset p7 -tune hq -rc:v vbr_hq ' | |
f'-cq:v 19 -qmin:v 19 -qmax:v 21 ' | |
f'-b:v 0 -maxrate:v 130M -bufsize:v 130M ' | |
f'-profile:v high -r {fps} ' | |
f'-c:a aac -b:a 192k ' | |
f'"{output_path}"' | |
) | |
print("Running GPU encoding:\n", nvenc_cmd) | |
res = subprocess.run(nvenc_cmd, shell=True, | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
print("NVENC STDERR:", res.stderr) | |
if res.returncode != 0: | |
raise RuntimeError(f"GPU encoding failed (exit {res.returncode}): {res.stderr.strip()}") | |
if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: | |
raise RuntimeError("GPU encoding produced no output or output is empty") | |
# 2) Patch the bitstream VUI using the correct field names: | |
# “colour_primaries” (British spelling), “transfer_characteristics”, | |
# and “matrix_coefficients” plus the full_range flag. | |
fixed_path = output_path.replace(".mp4", "_fixed.mp4") | |
bsf_cmd = ( | |
f'ffmpeg -y ' | |
f'-i "{output_path}" ' | |
f'-c copy ' | |
f'-bsf:v h264_metadata=' | |
f'video_full_range_flag=0:' | |
f'colour_primaries=1:' | |
f'transfer_characteristics=1:' | |
f'matrix_coefficients=1 ' | |
f'"{fixed_path}"' | |
) | |
print("Patching VUI with bitstream filter:\n", bsf_cmd) | |
res2 = subprocess.run(bsf_cmd, shell=True, | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
print("BSF STDERR:", res2.stderr) | |
if res2.returncode != 0: | |
raise RuntimeError(f"Failed to patch metadata (exit {res2.returncode}): {res2.stderr.strip()}") | |
# Replace the original file with the patched one | |
os.replace(fixed_path, output_path) | |
print("✅ GPU encoding succeeded and Rec.709 VUI injected") | |
def upload_to_s3(file_path, s3_key, extra_metadata=None): | |
try: | |
print(f"DEBUG: Uploading '{file_path}' to S3 as '{s3_key}' …") | |
extra_args = {"Metadata": { | |
str(k).replace(" ", "-").lower(): str(v) | |
for k,v in (extra_metadata or {}).items() | |
}} if extra_metadata else {} | |
s3_client.upload_file(file_path, S3_BUCKET_NAME, s3_key, ExtraArgs=extra_args) | |
url = f"https://{S3_BUCKET_NAME}.s3.{AWS_REGION}.amazonaws.com/{s3_key}" | |
print("✅ Uploaded to S3:", url) | |
return url | |
except Exception as e: | |
print("❌ S3 upload failed:", e) | |
traceback.print_exc() | |
return None | |
def s3_upload_with_timeout(file_path, s3_key, extra_metadata, timeout=60): | |
with ThreadPoolExecutor(max_workers=1) as ex: | |
fut = ex.submit(upload_to_s3, file_path, s3_key, extra_metadata) | |
try: | |
return fut.result(timeout=timeout) | |
except Exception as e: | |
print("❌ S3 upload timed out/failed:", e) | |
return None | |
def load_model(): | |
print("DEBUG: Loading YOLO model…") | |
m = YOLO("bests.pt").to("cuda" if torch.cuda.is_available() else "cpu") | |
return m | |
def detect_ppe_video(video_file, original_key): | |
if not video_file or "annotated" in os.path.basename(original_key).lower(): | |
return None, "No video or already annotated" | |
if not torch.cuda.is_available(): | |
return None, "CUDA GPU is required for video processing" | |
video_path = video_file.name | |
model = load_model() | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
return None, "Failed to open input video" | |
fps_cv = cap.get(cv2.CAP_PROP_FPS) | |
width_cv = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height_cv = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
if width_cv <= 0 or height_cv <= 0: | |
cap.release() | |
return None, "Invalid video dimensions" | |
# Prepare S3 key and final output | |
key_parts = original_key.split('/') | |
key_parts[0] = "annotated" | |
key_parts[-1] = "annotated.mp4" | |
processed_s3_key = "/".join(key_parts) | |
final_output_path = "/tmp/annotated.mp4" | |
counts = {k: 0 for k in [ | |
'Hardhat','Mask','NO-Hardhat','NO-Mask', | |
'NO-Safety Vest','Person','Safety Cone', | |
'Safety Vest','machinery','vehicle' | |
]} | |
tracked_ids = set() | |
with tempfile.TemporaryDirectory() as temp_dir: | |
temp_pipe_path = os.path.join(temp_dir, "pipe_encoded.mp4") | |
# 1) Pipe‐in rawvideo → encode with libx264 ultrafast into temp_pipe_path | |
vf_chain = "format=yuv420p,scale=in_range=full:out_range=limited" | |
ffmpeg_cmd = ( | |
f'ffmpeg -y ' | |
f'-f rawvideo -pix_fmt bgr24 -s {width_cv}x{height_cv} ' | |
f'-r {fps_cv} -i pipe: ' | |
f'-vf "{vf_chain}" ' | |
f'-color_primaries bt709 -color_trc bt709 ' | |
f'-colorspace bt709 -color_range tv ' | |
f'-c:v libx264 -preset ultrafast -pix_fmt yuv420p ' | |
f'"{temp_pipe_path}"' | |
) | |
proc = subprocess.Popen( | |
ffmpeg_cmd, | |
stdin=subprocess.PIPE, | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.PIPE, | |
shell=True, | |
bufsize=10**8, | |
) | |
# Drain stderr to log FFmpeg messages | |
def _drain(pipe): | |
for line in iter(pipe.readline, b''): | |
print("FFmpeg:", line.decode(errors="ignore").rstrip()) | |
threading.Thread(target=_drain, args=(proc.stderr,), daemon=True).start() | |
frame_count = 0 | |
batch = [] | |
# Read, detect, draw, and write frames | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame_count += 1 | |
batch.append(frame) | |
if len(batch) == 5: | |
results = model.track(batch, conf=0.4, iou=0.3, persist=True) | |
for i, out_frame in enumerate(batch): | |
# draw boxes & update counts | |
if results[i] and results[i].boxes is not None: | |
try: | |
tids = results[i].boxes.id.cpu().numpy() | |
except AttributeError: | |
tids = [None] * len(results[i].boxes.xyxy.cpu().numpy()) | |
for box, cls_id, tid in zip( | |
results[i].boxes.xyxy.cpu().numpy(), | |
results[i].boxes.cls.cpu().numpy(), | |
tids | |
): | |
if tid is not None and int(tid) in tracked_ids: | |
continue | |
if tid is not None: | |
tracked_ids.add(int(tid)) | |
x1, y1, x2, y2 = map(int, box) | |
lbl = list(counts.keys())[int(cls_id)] | |
counts[lbl] += 1 | |
cv2.rectangle(out_frame, (x1, y1), (x2, y2), (0,255,0), 2) | |
cv2.putText(out_frame, lbl, (x1, y1-5), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2) | |
# ensure FFmpeg is alive | |
if proc.poll() is not None: | |
err = proc.stderr.read().decode(errors="ignore") | |
raise RuntimeError(f"FFmpeg exited early: {err!r}") | |
# write frame | |
proc.stdin.write(out_frame.tobytes()) | |
batch.clear() | |
# Flush any remaining frames | |
if batch: | |
results = model.track(batch, conf=0.4, iou=0.3, persist=True) | |
for i, out_frame in enumerate(batch): | |
if results[i] and results[i].boxes is not None: | |
try: | |
tids = results[i].boxes.id.cpu().numpy() | |
except AttributeError: | |
tids = [None] * len(results[i].boxes.xyxy.cpu().numpy()) | |
for box, cls_id, tid in zip( | |
results[i].boxes.xyxy.cpu().numpy(), | |
results[i].boxes.cls.cpu().numpy(), | |
tids | |
): | |
if tid is not None and int(tid) in tracked_ids: | |
continue | |
if tid is not None: | |
tracked_ids.add(int(tid)) | |
x1, y1, x2, y2 = map(int, box) | |
lbl = list(counts.keys())[int(cls_id)] | |
counts[lbl] += 1 | |
cv2.rectangle(out_frame, (x1, y1), (x2, y2), (0,255,0), 2) | |
cv2.putText(out_frame, lbl, (x1, y1-5), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2) | |
if proc.poll() is not None: | |
err = proc.stderr.read().decode(errors="ignore") | |
raise RuntimeError(f"FFmpeg exited early: {err!r}") | |
proc.stdin.write(out_frame.tobytes()) | |
# Close pipe and wait | |
proc.stdin.close() | |
try: | |
retcode = proc.wait(timeout=30) | |
if retcode != 0: | |
err = proc.stderr.read().decode(errors="ignore") | |
return None, f"Pipe encoding failed (code={retcode}): {err}" | |
except subprocess.TimeoutExpired: | |
proc.terminate() | |
proc.wait() | |
return None, "Pipe encoding timed out" | |
# 2) Fallback: still inside the `with`, so temp_pipe_path exists | |
codec_name, fps, w, h, _ = get_video_properties(video_path) | |
resize_filter = '' | |
if w > 2032 or h > 2032: | |
resize_filter = '-vf "scale=w=min(2032,iw):h=min(2032,ih)"' | |
try: | |
run_ffmpeg_with_fallback(temp_pipe_path, final_output_path, resize_filter, fps, codec_name) | |
except Exception as e: | |
return None, f"Final FFmpeg pass failed: {e}" | |
# end of `with` — temp_dir is removed here, but final_output_path remains | |
cap.release() | |
if not os.path.exists(final_output_path): | |
return None, "Final conversion failed" | |
# Upload and return | |
s3_url = s3_upload_with_timeout(final_output_path, processed_s3_key, counts) | |
if not s3_url: | |
return None, "Upload failed" | |
summary = "\n".join(f"{k}: {v}" for k, v in counts.items() if v > 0) or "No detections found." | |
return s3_url, summary | |
def submit_job(video_file, original_key): | |
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(video_file.name)[1]) as tmp: | |
tmp.write(video_file.read()) | |
tmp_path = tmp.name | |
s3_input_key = f"input/{os.path.basename(tmp_path)}" | |
url = upload_to_s3(tmp_path, s3_input_key) | |
if not url: | |
return "S3 upload failed" | |
msg = {"bucket": S3_BUCKET_NAME, "object_key": s3_input_key, "presigned_url": url} | |
sqs_client.send_message(QueueUrl=SQS_QUEUE_URL, MessageBody=json.dumps(msg)) | |
return "Job submitted successfully!" | |
def sqs_worker(): | |
while True: | |
resp = sqs_client.receive_message( | |
QueueUrl=SQS_QUEUE_URL, | |
MaxNumberOfMessages=10, | |
WaitTimeSeconds=20 | |
) | |
msgs = resp.get('Messages', []) | |
for m in msgs: | |
try: | |
job = json.loads(m['Body']) | |
bucket, key = job["bucket"], job["object_key"] | |
local = f"/tmp/{os.path.basename(key)}" | |
s3_client.download_file(bucket, key, local) | |
# ← Capture the result here | |
with open(local, 'rb') as vf: | |
s3_url, summary = detect_ppe_video(vf, key) | |
if s3_url: | |
print(f"✅ Annotated video uploaded to S3: {s3_url}") | |
else: | |
print(f"❌ Annotation or upload failed: {summary}") | |
except Exception as e: | |
print("ERROR in SQS loop:", e) | |
traceback.print_exc() | |
finally: | |
# only delete after logging success/failure | |
sqs_client.delete_message( | |
QueueUrl=SQS_QUEUE_URL, | |
ReceiptHandle=m['ReceiptHandle'] | |
) | |
time.sleep(1) | |
with gr.Blocks() as demo: | |
gr.Markdown("# YOLOv8 PPE Detection with S3 Metadata") | |
video_input = gr.File(label="Upload Video", file_types=[".mp4", ".avi", ".mov"]) | |
original_key_input = gr.Textbox(label="S3 Key (original path)") | |
btn = gr.Button("Submit Job (Fire-and-Forget)") | |
out = gr.Textbox(label="Job ID") | |
btn.click(fn=submit_job, inputs=[video_input, original_key_input], outputs=[out]) | |
if __name__ == "__main__": | |
threading.Thread(target=sqs_worker, daemon=True).start() | |
demo.launch(server_name="0.0.0.0", server_port=7860) |