import os import json import tempfile import time import uuid import cv2 import numpy as np import gradio as gr import boto3 import torch import subprocess import threading import sys import platform import traceback from concurrent.futures import ThreadPoolExecutor, TimeoutError from botocore.exceptions import NoCredentialsError, ClientError from ultralytics import YOLO from dotenv import load_dotenv load_dotenv() # force immediate flush sys.stdout.reconfigure(line_buffering=True) sys.stderr.reconfigure(line_buffering=True) print("✅ DEBUG: Logging is now forced to flush immediately.") print(f"DEBUG: PyTorch is using CUDA: {torch.cuda.is_available()}") print(f"DEBUG: Available GPUs: {torch.cuda.device_count()}") if torch.cuda.device_count() > 0: print(f"DEBUG: Current GPU: {torch.cuda.get_device_name(0)}") AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID') AWS_SECRET_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME') AWS_REGION = os.getenv('AWS_REGION', 'us-east-1') SQS_QUEUE_URL = os.environ.get("SQS_QUEUE_URL") s3_client = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=AWS_REGION, ) sqs_client = boto3.client( 'sqs', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=AWS_REGION, ) def get_video_properties(video_path): command = [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=codec_name,width,height,r_frame_rate,bit_rate', '-of', 'json', video_path ] result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) info = json.loads(result.stdout) stream = info['streams'][0] codec_name = stream['codec_name'] width = stream['width'] height = stream['height'] fps = eval(stream['r_frame_rate']) bit_rate = stream.get('bit_rate', None) return codec_name, fps, width, height, int(bit_rate) if bit_rate else None def run_ffmpeg_with_fallback(input_path, output_path, resize_filter, fps, codec_name): """ 1) GPU‐only encode: software‐decode + h264_nvenc encode, clamping dimensions to ≤2032px and embedding Rec.709 metadata via bitstream filter with the correct option names. """ if not os.path.exists(input_path): raise RuntimeError(f"Input file does not exist: {input_path}") # 1) NVENC pass clamp_filter = '-vf "scale=min(2032\\,iw):min(2032\\,ih)"' nvenc_cmd = ( f'ffmpeg -y ' f'-i "{input_path}" ' f'{clamp_filter} ' f'-pix_fmt yuv420p ' f'-color_range tv ' f'-color_primaries bt709 ' f'-color_trc bt709 ' f'-colorspace bt709 ' f'-c:v h264_nvenc -preset p7 -tune hq -rc:v vbr_hq ' f'-cq:v 19 -qmin:v 19 -qmax:v 21 ' f'-b:v 0 -maxrate:v 130M -bufsize:v 130M ' f'-profile:v high -r {fps} ' f'-c:a aac -b:a 192k ' f'"{output_path}"' ) print("Running GPU encoding:\n", nvenc_cmd) res = subprocess.run(nvenc_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) print("NVENC STDERR:", res.stderr) if res.returncode != 0: raise RuntimeError(f"GPU encoding failed (exit {res.returncode}): {res.stderr.strip()}") if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: raise RuntimeError("GPU encoding produced no output or output is empty") # 2) Patch the bitstream VUI using the correct field names: # “colour_primaries” (British spelling), “transfer_characteristics”, # and “matrix_coefficients” plus the full_range flag. fixed_path = output_path.replace(".mp4", "_fixed.mp4") bsf_cmd = ( f'ffmpeg -y ' f'-i "{output_path}" ' f'-c copy ' f'-bsf:v h264_metadata=' f'video_full_range_flag=0:' f'colour_primaries=1:' f'transfer_characteristics=1:' f'matrix_coefficients=1 ' f'"{fixed_path}"' ) print("Patching VUI with bitstream filter:\n", bsf_cmd) res2 = subprocess.run(bsf_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) print("BSF STDERR:", res2.stderr) if res2.returncode != 0: raise RuntimeError(f"Failed to patch metadata (exit {res2.returncode}): {res2.stderr.strip()}") # Replace the original file with the patched one os.replace(fixed_path, output_path) print("✅ GPU encoding succeeded and Rec.709 VUI injected") def upload_to_s3(file_path, s3_key, extra_metadata=None): try: print(f"DEBUG: Uploading '{file_path}' to S3 as '{s3_key}' …") extra_args = {"Metadata": { str(k).replace(" ", "-").lower(): str(v) for k,v in (extra_metadata or {}).items() }} if extra_metadata else {} s3_client.upload_file(file_path, S3_BUCKET_NAME, s3_key, ExtraArgs=extra_args) url = f"https://{S3_BUCKET_NAME}.s3.{AWS_REGION}.amazonaws.com/{s3_key}" print("✅ Uploaded to S3:", url) return url except Exception as e: print("❌ S3 upload failed:", e) traceback.print_exc() return None def s3_upload_with_timeout(file_path, s3_key, extra_metadata, timeout=60): with ThreadPoolExecutor(max_workers=1) as ex: fut = ex.submit(upload_to_s3, file_path, s3_key, extra_metadata) try: return fut.result(timeout=timeout) except Exception as e: print("❌ S3 upload timed out/failed:", e) return None def load_model(): print("DEBUG: Loading YOLO model…") m = YOLO("bests.pt").to("cuda" if torch.cuda.is_available() else "cpu") return m def detect_ppe_video(video_file, original_key): if not video_file or "annotated" in os.path.basename(original_key).lower(): return None, "No video or already annotated" if not torch.cuda.is_available(): return None, "CUDA GPU is required for video processing" video_path = video_file.name model = load_model() cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None, "Failed to open input video" fps_cv = cap.get(cv2.CAP_PROP_FPS) width_cv = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height_cv = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) if width_cv <= 0 or height_cv <= 0: cap.release() return None, "Invalid video dimensions" # Prepare S3 key and final output key_parts = original_key.split('/') key_parts[0] = "annotated" key_parts[-1] = "annotated.mp4" processed_s3_key = "/".join(key_parts) final_output_path = "/tmp/annotated.mp4" counts = {k: 0 for k in [ 'Hardhat','Mask','NO-Hardhat','NO-Mask', 'NO-Safety Vest','Person','Safety Cone', 'Safety Vest','machinery','vehicle' ]} tracked_ids = set() with tempfile.TemporaryDirectory() as temp_dir: temp_pipe_path = os.path.join(temp_dir, "pipe_encoded.mp4") # 1) Pipe‐in rawvideo → encode with libx264 ultrafast into temp_pipe_path vf_chain = "format=yuv420p,scale=in_range=full:out_range=limited" ffmpeg_cmd = ( f'ffmpeg -y ' f'-f rawvideo -pix_fmt bgr24 -s {width_cv}x{height_cv} ' f'-r {fps_cv} -i pipe: ' f'-vf "{vf_chain}" ' f'-color_primaries bt709 -color_trc bt709 ' f'-colorspace bt709 -color_range tv ' f'-c:v libx264 -preset ultrafast -pix_fmt yuv420p ' f'"{temp_pipe_path}"' ) proc = subprocess.Popen( ffmpeg_cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, shell=True, bufsize=10**8, ) # Drain stderr to log FFmpeg messages def _drain(pipe): for line in iter(pipe.readline, b''): print("FFmpeg:", line.decode(errors="ignore").rstrip()) threading.Thread(target=_drain, args=(proc.stderr,), daemon=True).start() frame_count = 0 batch = [] # Read, detect, draw, and write frames while True: ret, frame = cap.read() if not ret: break frame_count += 1 batch.append(frame) if len(batch) == 5: results = model.track(batch, conf=0.4, iou=0.3, persist=True) for i, out_frame in enumerate(batch): # draw boxes & update counts if results[i] and results[i].boxes is not None: try: tids = results[i].boxes.id.cpu().numpy() except AttributeError: tids = [None] * len(results[i].boxes.xyxy.cpu().numpy()) for box, cls_id, tid in zip( results[i].boxes.xyxy.cpu().numpy(), results[i].boxes.cls.cpu().numpy(), tids ): if tid is not None and int(tid) in tracked_ids: continue if tid is not None: tracked_ids.add(int(tid)) x1, y1, x2, y2 = map(int, box) lbl = list(counts.keys())[int(cls_id)] counts[lbl] += 1 cv2.rectangle(out_frame, (x1, y1), (x2, y2), (0,255,0), 2) cv2.putText(out_frame, lbl, (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2) # ensure FFmpeg is alive if proc.poll() is not None: err = proc.stderr.read().decode(errors="ignore") raise RuntimeError(f"FFmpeg exited early: {err!r}") # write frame proc.stdin.write(out_frame.tobytes()) batch.clear() # Flush any remaining frames if batch: results = model.track(batch, conf=0.4, iou=0.3, persist=True) for i, out_frame in enumerate(batch): if results[i] and results[i].boxes is not None: try: tids = results[i].boxes.id.cpu().numpy() except AttributeError: tids = [None] * len(results[i].boxes.xyxy.cpu().numpy()) for box, cls_id, tid in zip( results[i].boxes.xyxy.cpu().numpy(), results[i].boxes.cls.cpu().numpy(), tids ): if tid is not None and int(tid) in tracked_ids: continue if tid is not None: tracked_ids.add(int(tid)) x1, y1, x2, y2 = map(int, box) lbl = list(counts.keys())[int(cls_id)] counts[lbl] += 1 cv2.rectangle(out_frame, (x1, y1), (x2, y2), (0,255,0), 2) cv2.putText(out_frame, lbl, (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2) if proc.poll() is not None: err = proc.stderr.read().decode(errors="ignore") raise RuntimeError(f"FFmpeg exited early: {err!r}") proc.stdin.write(out_frame.tobytes()) # Close pipe and wait proc.stdin.close() try: retcode = proc.wait(timeout=30) if retcode != 0: err = proc.stderr.read().decode(errors="ignore") return None, f"Pipe encoding failed (code={retcode}): {err}" except subprocess.TimeoutExpired: proc.terminate() proc.wait() return None, "Pipe encoding timed out" # 2) Fallback: still inside the `with`, so temp_pipe_path exists codec_name, fps, w, h, _ = get_video_properties(video_path) resize_filter = '' if w > 2032 or h > 2032: resize_filter = '-vf "scale=w=min(2032,iw):h=min(2032,ih)"' try: run_ffmpeg_with_fallback(temp_pipe_path, final_output_path, resize_filter, fps, codec_name) except Exception as e: return None, f"Final FFmpeg pass failed: {e}" # end of `with` — temp_dir is removed here, but final_output_path remains cap.release() if not os.path.exists(final_output_path): return None, "Final conversion failed" # Upload and return s3_url = s3_upload_with_timeout(final_output_path, processed_s3_key, counts) if not s3_url: return None, "Upload failed" summary = "\n".join(f"{k}: {v}" for k, v in counts.items() if v > 0) or "No detections found." return s3_url, summary def submit_job(video_file, original_key): with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(video_file.name)[1]) as tmp: tmp.write(video_file.read()) tmp_path = tmp.name s3_input_key = f"input/{os.path.basename(tmp_path)}" url = upload_to_s3(tmp_path, s3_input_key) if not url: return "S3 upload failed" msg = {"bucket": S3_BUCKET_NAME, "object_key": s3_input_key, "presigned_url": url} sqs_client.send_message(QueueUrl=SQS_QUEUE_URL, MessageBody=json.dumps(msg)) return "Job submitted successfully!" def sqs_worker(): while True: resp = sqs_client.receive_message( QueueUrl=SQS_QUEUE_URL, MaxNumberOfMessages=10, WaitTimeSeconds=20 ) msgs = resp.get('Messages', []) for m in msgs: try: job = json.loads(m['Body']) bucket, key = job["bucket"], job["object_key"] local = f"/tmp/{os.path.basename(key)}" s3_client.download_file(bucket, key, local) # ← Capture the result here with open(local, 'rb') as vf: s3_url, summary = detect_ppe_video(vf, key) if s3_url: print(f"✅ Annotated video uploaded to S3: {s3_url}") else: print(f"❌ Annotation or upload failed: {summary}") except Exception as e: print("ERROR in SQS loop:", e) traceback.print_exc() finally: # only delete after logging success/failure sqs_client.delete_message( QueueUrl=SQS_QUEUE_URL, ReceiptHandle=m['ReceiptHandle'] ) time.sleep(1) with gr.Blocks() as demo: gr.Markdown("# YOLOv8 PPE Detection with S3 Metadata") video_input = gr.File(label="Upload Video", file_types=[".mp4", ".avi", ".mov"]) original_key_input = gr.Textbox(label="S3 Key (original path)") btn = gr.Button("Submit Job (Fire-and-Forget)") out = gr.Textbox(label="Job ID") btn.click(fn=submit_job, inputs=[video_input, original_key_input], outputs=[out]) if __name__ == "__main__": threading.Thread(target=sqs_worker, daemon=True).start() demo.launch(server_name="0.0.0.0", server_port=7860)