Spaces:
Running
Running
File size: 2,915 Bytes
2012550 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
import cv2
from .processor import MediaProcessor
import subprocess
class LiveStreamProcessor:
def __init__(self, detector, censor):
self.detector = detector
self.censor = censor
self.processor = MediaProcessor(detector, censor)
def process_stream(self,
source,
output_type=None,
output_dest=None,
conf_thresh=0.4,
preview_stream=False):
capture = cv2.VideoCapture(source)
if not capture.isOpened():
raise ValueError(f"Could not open video source {source}")
width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(capture.get(cv2.CAP_PROP_FPS))
output = self.initialize_output(output_type, output_dest, width, height, fps)
try:
while True:
ret, frame = capture.read()
if not ret:
break
detections = self.detector.detect(frame)
for bbox in detections:
if bbox[4] >= conf_thresh:
frame = self.censor.apply(frame, bbox)
if output_type == "rtmp":
output.stdin.write(frame.tobytes())
else:
raise ValueError(f"Invalid output_type: {output_type}, try 'rtmp'?")
if preview_stream:
cv2.imshow("Stream Preview", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
finally:
capture.release()
if output:
self.cleanup_output(output, output_type)
cv2.destroyAllWindows()
def initialize_output(self, output_type, output_dest, width, height, fps):
if output_dest == None:
raise ValueError("output_dest cannot be None, provide valid output destination.")
if output_type == "rtmp":
command = [
"ffmpeg",
"-y", # Overwrite output files
"-f", "rawvideo",
"-vcodec", "rawvideo",
"-pix_fmt", "bgr24",
"-s", f"{width}x{height}",
"-r", str(fps),
"-i", "-", # Input from pipe
"-c:v", "libx264",
"-pix_fmt", "yuv420p",
"-preset", "ultrafast",
"-f", "flv",
output_dest # RTMP URL
]
return subprocess.Popen(command, stdin=subprocess.PIPE)
return None
def cleanup_output(self, output, output_type):
if output_type == "rtmp":
output.stdin.close()
output.wait()
|