Spaces:
Running
on
Zero
Running
on
Zero
File size: 6,448 Bytes
e6af450 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# Copyright (c) 2023 OpenGVLab
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates.
# SPDX-License-Identifier: MIT
#
# This file has been modified by ByteDance Ltd. and/or its affiliates. on 2025-05-20.
#
# Original file was released under MIT, with the full license text
# available at https://github.com/OpenGVLab/InternVL/blob/main/LICENSE.
#
# This modified file is released under the same license.
import io
import os
import random
import re
import numpy as np
import decord
from PIL import Image
def get_frame_indices(num_frames, vlen, sample='rand', fix_start=None, input_fps=1, max_num_frames=-1):
if sample in ['rand', 'middle']: # uniform sampling
acc_samples = min(num_frames, vlen)
# split the video into `acc_samples` intervals, and sample from each interval.
intervals = np.linspace(start=0, stop=vlen, num=acc_samples + 1).astype(int)
ranges = []
for idx, interv in enumerate(intervals[:-1]):
ranges.append((interv, intervals[idx + 1] - 1))
if sample == 'rand':
try:
frame_indices = [random.choice(range(x[0], x[1])) for x in ranges]
except:
frame_indices = np.random.permutation(vlen)[:acc_samples]
frame_indices.sort()
frame_indices = list(frame_indices)
elif fix_start is not None:
frame_indices = [x[0] + fix_start for x in ranges]
elif sample == 'middle':
frame_indices = [(x[0] + x[1]) // 2 for x in ranges]
else:
raise NotImplementedError
if len(frame_indices) < num_frames: # padded with last frame
padded_frame_indices = [frame_indices[-1]] * num_frames
padded_frame_indices[:len(frame_indices)] = frame_indices
frame_indices = padded_frame_indices
elif 'fps' in sample: # fps0.5, sequentially sample frames at 0.5 fps
output_fps = float(sample[3:])
duration = float(vlen) / input_fps
delta = 1 / output_fps # gap between frames, this is also the clip length each frame represents
frame_seconds = np.arange(0 + delta / 2, duration + delta / 2, delta)
frame_indices = np.around(frame_seconds * input_fps).astype(int)
frame_indices = [e for e in frame_indices if e < vlen]
if max_num_frames > 0 and len(frame_indices) > max_num_frames:
frame_indices = frame_indices[:max_num_frames]
else:
raise ValueError
return frame_indices
def read_frames_decord(video_path, num_frames, sample='rand', fix_start=None, clip=None, min_num_frames=4):
video_reader = decord.VideoReader(video_path, num_threads=1)
vlen = len(video_reader)
fps = video_reader.get_avg_fps()
duration = vlen / float(fps)
if clip:
start, end = clip
duration = end - start
vlen = int(duration * fps)
start_index = int(start * fps)
t_num_frames = np.random.randint(min_num_frames, num_frames + 1)
frame_indices = get_frame_indices(
t_num_frames, vlen, sample=sample, fix_start=fix_start,
input_fps=fps
)
if clip:
frame_indices = [f + start_index for f in frame_indices]
frames = video_reader.get_batch(frame_indices).asnumpy() # (T, H, W, C), np.uint8
frames = [Image.fromarray(frames[i]) for i in range(frames.shape[0])]
return frames
def extract_frame_number(filename):
# Extract the numeric part from the filename using regular expressions
match = re.search(r'_(\d+).jpg$', filename)
return int(match.group(1)) if match else -1
def sort_frames(frame_paths):
# Extract filenames from each path and sort by their numeric part
return sorted(frame_paths, key=lambda x: extract_frame_number(os.path.basename(x)))
def read_frames_folder(video_path, num_frames, sample='rand', fix_start=None, min_num_frames=4):
image_list = sort_frames(list(os.listdir(video_path)))
frames = []
for image in image_list:
fp = os.path.join(video_path, image)
frame = Image.open(fp).convert('RGB')
frames.append(frame)
vlen = len(frames)
t_num_frames = np.random.randint(min_num_frames, num_frames + 1)
if vlen > t_num_frames:
frame_indices = get_frame_indices(
t_num_frames, vlen, sample=sample, fix_start=fix_start
)
frames = [frames[i] for i in frame_indices]
return frames
class FrameSampler:
def __init__(self, max_num_frames=-1, min_num_frames=8, sample='rand'):
self.max_num_frames = max_num_frames
self.min_num_frames = min_num_frames
self.sample = sample
def __call__(self, file_name):
fn = read_frames_folder if file_name.endswith('/') else read_frames_decord
frames = fn(file_name, num_frames=self.max_num_frames, min_num_frames=self.min_num_frames, sample=self.sample)
return frames
def decode_video_byte(video_bytes):
video_stream = io.BytesIO(video_bytes)
vr = decord.VideoReader(video_stream)
return vr
def sample_mp4_frames(mp4_p, n_frames=None, fps=None, return_frame_indices=False, random_sample=False):
if isinstance(mp4_p, str):
vr = decord.VideoReader(mp4_p, num_threads=1)
elif isinstance(mp4_p, decord.video_reader.VideoReader):
vr = mp4_p
video_fps = vr.get_avg_fps() # 获取视频的帧率
video_duration = len(vr) / video_fps
if n_frames is not None:
if random_sample:
frame_indices = sorted(random.sample(range(len(vr)), n_frames))
else:
frame_indices = np.linspace(0, len(vr)-1, n_frames, dtype=int).tolist()
else:
frame_indices = [int(i) for i in np.arange(0, len(vr)-1, video_fps/fps)]
frames = vr.get_batch(frame_indices).asnumpy() # 转换为 numpy 数组
frames = [Image.fromarray(frame).convert("RGB") for frame in frames]
if not return_frame_indices:
return frames, video_duration
else:
return frames, video_duration, frame_indices
def sample_mp4_frames_by_indices(mp4_p, frame_indices: list):
if isinstance(mp4_p, str):
vr = decord.VideoReader(mp4_p, num_threads=1)
elif isinstance(mp4_p, decord.video_reader.VideoReader):
vr = mp4_p
# sample the frames in frame_indices
frames = vr.get_batch(frame_indices).asnumpy() # 转换为 numpy 数组
frames = [Image.fromarray(frame).convert("RGB") for frame in frames]
return frames |