LVP / utils /video_utils.py
kiwhansong's picture
add demo
142a1ac
import av
from pathlib import Path
import io
from PIL import Image
def write_numpy_to_mp4(video_data, output_path, fps=30):
"""
Write a numpy array into a mp4 file using pyav.
Args:
video_data (numpy.ndarray): The video data to write. Should be of shape (num_frames, height, width, channels).
output_path (str): The path to the output mp4 file.
fps (int): Frames per second for the output video.
"""
num_frames, height, width, channels = video_data.shape
if channels != 3:
raise ValueError("Video data should have 3 channels (RGB).")
output_dir = Path(output_path).parent
if not output_dir.exists():
raise FileNotFoundError(f"The directory {output_dir} does not exist.")
container = av.open(output_path, mode="w")
stream = container.add_stream("h264", rate=fps)
stream.width = width
stream.height = height
stream.pix_fmt = "yuv420p"
for frame in video_data:
frame = av.VideoFrame.from_ndarray(frame, format="rgb24")
for packet in stream.encode(frame):
container.mux(packet)
# Flush the encoder
for packet in stream.encode():
container.mux(packet)
container.close()
def numpy_to_mp4_bytes(video_data, fps=30):
"""
Convert a numpy array to MP4 bytes in memory using PyAV for better efficiency.
Args:
video_data (numpy.ndarray): The video data to convert. Should be of shape (num_frames, height, width, channels).
fps (int): Frames per second for the output video.
Returns:
bytes: The MP4 video data as bytes.
"""
if video_data.ndim != 4 or video_data.shape[-1] != 3:
raise ValueError(
"Video data should be of shape (num_frames, height, width, 3) for RGB video."
)
num_frames, height, width, channels = video_data.shape
# Check that dimensions are even (required by many players and codecs)
if width % 2 != 0 or height % 2 != 0:
raise ValueError(
f"Video dimensions must be even. Got width={width}, height={height}"
)
# Create an in-memory buffer
buffer = io.BytesIO()
container = av.open(buffer, mode="w", format="mp4")
# Add video stream with more compatible settings
stream = container.add_stream("h264", rate=fps)
stream.width = width
stream.height = height
stream.pix_fmt = "yuv420p"
# Set codec options with correct syntax for libopenh264
# Note: profile must be an integer value, not a string name
stream.options = {
"profile": "66", # 66 = Baseline profile in H.264
"level": "30", # 30 = Level 3.0 (must be integer value)
"preset": "medium",
"crf": "23",
}
# Encode frames directly from numpy array
for frame_data in video_data:
frame = av.VideoFrame.from_ndarray(frame_data, format="rgb24")
for packet in stream.encode(frame):
container.mux(packet)
# Flush the encoder
for packet in stream.encode():
container.mux(packet)
# Close the container and get the buffer content
container.close()
buffer.seek(0)
return buffer.getvalue()