EC2BimefyModel / app.py
ShawXI's picture
Update app.py
8250630 verified
import os
import json
import tempfile
import time
import uuid
import cv2
import numpy as np
import gradio as gr
import boto3
import torch
import subprocess
import threading
import sys
import platform
import traceback
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from botocore.exceptions import NoCredentialsError, ClientError
from ultralytics import YOLO
from dotenv import load_dotenv
load_dotenv()
# force immediate flush
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
print("✅ DEBUG: Logging is now forced to flush immediately.")
print(f"DEBUG: PyTorch is using CUDA: {torch.cuda.is_available()}")
print(f"DEBUG: Available GPUs: {torch.cuda.device_count()}")
if torch.cuda.device_count() > 0:
print(f"DEBUG: Current GPU: {torch.cuda.get_device_name(0)}")
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME')
AWS_REGION = os.getenv('AWS_REGION', 'us-east-1')
SQS_QUEUE_URL = os.environ.get("SQS_QUEUE_URL")
s3_client = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION,
)
sqs_client = boto3.client(
'sqs',
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION,
)
def get_video_properties(video_path):
command = [
'ffprobe', '-v', 'error', '-select_streams', 'v:0',
'-show_entries', 'stream=codec_name,width,height,r_frame_rate,bit_rate',
'-of', 'json', video_path
]
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
info = json.loads(result.stdout)
stream = info['streams'][0]
codec_name = stream['codec_name']
width = stream['width']
height = stream['height']
fps = eval(stream['r_frame_rate'])
bit_rate = stream.get('bit_rate', None)
return codec_name, fps, width, height, int(bit_rate) if bit_rate else None
def run_ffmpeg_with_fallback(input_path, output_path, resize_filter, fps, codec_name):
"""
1) GPU‐only encode: software‐decode + h264_nvenc encode,
clamping dimensions to ≤2032px and embedding Rec.709 metadata
via bitstream filter with the correct option names.
"""
if not os.path.exists(input_path):
raise RuntimeError(f"Input file does not exist: {input_path}")
# 1) NVENC pass
clamp_filter = '-vf "scale=min(2032\\,iw):min(2032\\,ih)"'
nvenc_cmd = (
f'ffmpeg -y '
f'-i "{input_path}" '
f'{clamp_filter} '
f'-pix_fmt yuv420p '
f'-color_range tv '
f'-color_primaries bt709 '
f'-color_trc bt709 '
f'-colorspace bt709 '
f'-c:v h264_nvenc -preset p7 -tune hq -rc:v vbr_hq '
f'-cq:v 19 -qmin:v 19 -qmax:v 21 '
f'-b:v 0 -maxrate:v 130M -bufsize:v 130M '
f'-profile:v high -r {fps} '
f'-c:a aac -b:a 192k '
f'"{output_path}"'
)
print("Running GPU encoding:\n", nvenc_cmd)
res = subprocess.run(nvenc_cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
print("NVENC STDERR:", res.stderr)
if res.returncode != 0:
raise RuntimeError(f"GPU encoding failed (exit {res.returncode}): {res.stderr.strip()}")
if not os.path.exists(output_path) or os.path.getsize(output_path) == 0:
raise RuntimeError("GPU encoding produced no output or output is empty")
# 2) Patch the bitstream VUI using the correct field names:
# “colour_primaries” (British spelling), “transfer_characteristics”,
# and “matrix_coefficients” plus the full_range flag.
fixed_path = output_path.replace(".mp4", "_fixed.mp4")
bsf_cmd = (
f'ffmpeg -y '
f'-i "{output_path}" '
f'-c copy '
f'-bsf:v h264_metadata='
f'video_full_range_flag=0:'
f'colour_primaries=1:'
f'transfer_characteristics=1:'
f'matrix_coefficients=1 '
f'"{fixed_path}"'
)
print("Patching VUI with bitstream filter:\n", bsf_cmd)
res2 = subprocess.run(bsf_cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
print("BSF STDERR:", res2.stderr)
if res2.returncode != 0:
raise RuntimeError(f"Failed to patch metadata (exit {res2.returncode}): {res2.stderr.strip()}")
# Replace the original file with the patched one
os.replace(fixed_path, output_path)
print("✅ GPU encoding succeeded and Rec.709 VUI injected")
def upload_to_s3(file_path, s3_key, extra_metadata=None):
try:
print(f"DEBUG: Uploading '{file_path}' to S3 as '{s3_key}' …")
extra_args = {"Metadata": {
str(k).replace(" ", "-").lower(): str(v)
for k,v in (extra_metadata or {}).items()
}} if extra_metadata else {}
s3_client.upload_file(file_path, S3_BUCKET_NAME, s3_key, ExtraArgs=extra_args)
url = f"https://{S3_BUCKET_NAME}.s3.{AWS_REGION}.amazonaws.com/{s3_key}"
print("✅ Uploaded to S3:", url)
return url
except Exception as e:
print("❌ S3 upload failed:", e)
traceback.print_exc()
return None
def s3_upload_with_timeout(file_path, s3_key, extra_metadata, timeout=60):
with ThreadPoolExecutor(max_workers=1) as ex:
fut = ex.submit(upload_to_s3, file_path, s3_key, extra_metadata)
try:
return fut.result(timeout=timeout)
except Exception as e:
print("❌ S3 upload timed out/failed:", e)
return None
def load_model():
print("DEBUG: Loading YOLO model…")
m = YOLO("bests.pt").to("cuda" if torch.cuda.is_available() else "cpu")
return m
def detect_ppe_video(video_file, original_key):
if not video_file or "annotated" in os.path.basename(original_key).lower():
return None, "No video or already annotated"
if not torch.cuda.is_available():
return None, "CUDA GPU is required for video processing"
video_path = video_file.name
model = load_model()
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None, "Failed to open input video"
fps_cv = cap.get(cv2.CAP_PROP_FPS)
width_cv = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height_cv = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if width_cv <= 0 or height_cv <= 0:
cap.release()
return None, "Invalid video dimensions"
# Prepare S3 key and final output
key_parts = original_key.split('/')
key_parts[0] = "annotated"
key_parts[-1] = "annotated.mp4"
processed_s3_key = "/".join(key_parts)
final_output_path = "/tmp/annotated.mp4"
counts = {k: 0 for k in [
'Hardhat','Mask','NO-Hardhat','NO-Mask',
'NO-Safety Vest','Person','Safety Cone',
'Safety Vest','machinery','vehicle'
]}
tracked_ids = set()
with tempfile.TemporaryDirectory() as temp_dir:
temp_pipe_path = os.path.join(temp_dir, "pipe_encoded.mp4")
# 1) Pipe‐in rawvideo → encode with libx264 ultrafast into temp_pipe_path
vf_chain = "format=yuv420p,scale=in_range=full:out_range=limited"
ffmpeg_cmd = (
f'ffmpeg -y '
f'-f rawvideo -pix_fmt bgr24 -s {width_cv}x{height_cv} '
f'-r {fps_cv} -i pipe: '
f'-vf "{vf_chain}" '
f'-color_primaries bt709 -color_trc bt709 '
f'-colorspace bt709 -color_range tv '
f'-c:v libx264 -preset ultrafast -pix_fmt yuv420p '
f'"{temp_pipe_path}"'
)
proc = subprocess.Popen(
ffmpeg_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
shell=True,
bufsize=10**8,
)
# Drain stderr to log FFmpeg messages
def _drain(pipe):
for line in iter(pipe.readline, b''):
print("FFmpeg:", line.decode(errors="ignore").rstrip())
threading.Thread(target=_drain, args=(proc.stderr,), daemon=True).start()
frame_count = 0
batch = []
# Read, detect, draw, and write frames
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
batch.append(frame)
if len(batch) == 5:
results = model.track(batch, conf=0.4, iou=0.3, persist=True)
for i, out_frame in enumerate(batch):
# draw boxes & update counts
if results[i] and results[i].boxes is not None:
try:
tids = results[i].boxes.id.cpu().numpy()
except AttributeError:
tids = [None] * len(results[i].boxes.xyxy.cpu().numpy())
for box, cls_id, tid in zip(
results[i].boxes.xyxy.cpu().numpy(),
results[i].boxes.cls.cpu().numpy(),
tids
):
if tid is not None and int(tid) in tracked_ids:
continue
if tid is not None:
tracked_ids.add(int(tid))
x1, y1, x2, y2 = map(int, box)
lbl = list(counts.keys())[int(cls_id)]
counts[lbl] += 1
cv2.rectangle(out_frame, (x1, y1), (x2, y2), (0,255,0), 2)
cv2.putText(out_frame, lbl, (x1, y1-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2)
# ensure FFmpeg is alive
if proc.poll() is not None:
err = proc.stderr.read().decode(errors="ignore")
raise RuntimeError(f"FFmpeg exited early: {err!r}")
# write frame
proc.stdin.write(out_frame.tobytes())
batch.clear()
# Flush any remaining frames
if batch:
results = model.track(batch, conf=0.4, iou=0.3, persist=True)
for i, out_frame in enumerate(batch):
if results[i] and results[i].boxes is not None:
try:
tids = results[i].boxes.id.cpu().numpy()
except AttributeError:
tids = [None] * len(results[i].boxes.xyxy.cpu().numpy())
for box, cls_id, tid in zip(
results[i].boxes.xyxy.cpu().numpy(),
results[i].boxes.cls.cpu().numpy(),
tids
):
if tid is not None and int(tid) in tracked_ids:
continue
if tid is not None:
tracked_ids.add(int(tid))
x1, y1, x2, y2 = map(int, box)
lbl = list(counts.keys())[int(cls_id)]
counts[lbl] += 1
cv2.rectangle(out_frame, (x1, y1), (x2, y2), (0,255,0), 2)
cv2.putText(out_frame, lbl, (x1, y1-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2)
if proc.poll() is not None:
err = proc.stderr.read().decode(errors="ignore")
raise RuntimeError(f"FFmpeg exited early: {err!r}")
proc.stdin.write(out_frame.tobytes())
# Close pipe and wait
proc.stdin.close()
try:
retcode = proc.wait(timeout=30)
if retcode != 0:
err = proc.stderr.read().decode(errors="ignore")
return None, f"Pipe encoding failed (code={retcode}): {err}"
except subprocess.TimeoutExpired:
proc.terminate()
proc.wait()
return None, "Pipe encoding timed out"
# 2) Fallback: still inside the `with`, so temp_pipe_path exists
codec_name, fps, w, h, _ = get_video_properties(video_path)
resize_filter = ''
if w > 2032 or h > 2032:
resize_filter = '-vf "scale=w=min(2032,iw):h=min(2032,ih)"'
try:
run_ffmpeg_with_fallback(temp_pipe_path, final_output_path, resize_filter, fps, codec_name)
except Exception as e:
return None, f"Final FFmpeg pass failed: {e}"
# end of `with` — temp_dir is removed here, but final_output_path remains
cap.release()
if not os.path.exists(final_output_path):
return None, "Final conversion failed"
# Upload and return
s3_url = s3_upload_with_timeout(final_output_path, processed_s3_key, counts)
if not s3_url:
return None, "Upload failed"
summary = "\n".join(f"{k}: {v}" for k, v in counts.items() if v > 0) or "No detections found."
return s3_url, summary
def submit_job(video_file, original_key):
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(video_file.name)[1]) as tmp:
tmp.write(video_file.read())
tmp_path = tmp.name
s3_input_key = f"input/{os.path.basename(tmp_path)}"
url = upload_to_s3(tmp_path, s3_input_key)
if not url:
return "S3 upload failed"
msg = {"bucket": S3_BUCKET_NAME, "object_key": s3_input_key, "presigned_url": url}
sqs_client.send_message(QueueUrl=SQS_QUEUE_URL, MessageBody=json.dumps(msg))
return "Job submitted successfully!"
def sqs_worker():
while True:
resp = sqs_client.receive_message(
QueueUrl=SQS_QUEUE_URL,
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)
msgs = resp.get('Messages', [])
for m in msgs:
try:
job = json.loads(m['Body'])
bucket, key = job["bucket"], job["object_key"]
local = f"/tmp/{os.path.basename(key)}"
s3_client.download_file(bucket, key, local)
# ← Capture the result here
with open(local, 'rb') as vf:
s3_url, summary = detect_ppe_video(vf, key)
if s3_url:
print(f"✅ Annotated video uploaded to S3: {s3_url}")
else:
print(f"❌ Annotation or upload failed: {summary}")
except Exception as e:
print("ERROR in SQS loop:", e)
traceback.print_exc()
finally:
# only delete after logging success/failure
sqs_client.delete_message(
QueueUrl=SQS_QUEUE_URL,
ReceiptHandle=m['ReceiptHandle']
)
time.sleep(1)
with gr.Blocks() as demo:
gr.Markdown("# YOLOv8 PPE Detection with S3 Metadata")
video_input = gr.File(label="Upload Video", file_types=[".mp4", ".avi", ".mov"])
original_key_input = gr.Textbox(label="S3 Key (original path)")
btn = gr.Button("Submit Job (Fire-and-Forget)")
out = gr.Textbox(label="Job ID")
btn.click(fn=submit_job, inputs=[video_input, original_key_input], outputs=[out])
if __name__ == "__main__":
threading.Thread(target=sqs_worker, daemon=True).start()
demo.launch(server_name="0.0.0.0", server_port=7860)