dms3_demo / utils /os_util.py
qilongyu
Add application file
446f9ef
# -*- coding:utf-8 –*-
import os
import re
import random
import urllib.request as request
from glob import glob
import cv2
import numpy as np
import requests
from utils.common import clock_custom, Color, colorprint, log_error
from utils.multiprogress import MultiThreading
from utils.labels import load_labels
def is_day(img_path):
pattern = re.compile('.*20[1-2][0-9][0-1][0-9][0-3][0-9]([0-2][0-9][0-5][0-9]).*')
res = pattern.match(img_path)
if res is None:
return -1
cur_time = int(res.group(1))
if 630 <= cur_time <= 1830:
return 1
else:
return 0
# 从目录获取所有的图片完整路径
def get_file_paths(folder, upper=False, sort=True, abs_path=True, mod='img'):
if mod == 'img':
extensions = ['jpg', 'jpeg', 'png', 'bmp']
elif mod == 'vdo':
extensions = ['mp4', 'avi', 'mov']
else:
extensions = [mod]
img_files = []
for ext in extensions:
ext = ext.upper() if upper else ext
files = glob('%s/*.%s' % (folder, ext)) # 通配符,找到所有以ext为后缀的文件
if not abs_path:
files = [os.path.basename(path) for path in files]
img_files += files
return sorted(img_files) if sort else img_files
def get_files_paths_batch(data_dir, upper=False, sort=True, abs_path=True, mod='img'):
subs = [os.path.join(data_dir, sub) for sub in os.listdir(data_dir)
if not sub.startswith('.') and os.path.isdir(os.path.join(data_dir, sub))]
print(subs)
all_paths = []
for sub in subs:
all_paths += get_file_paths(sub, upper, sort, abs_path, mod)
return all_paths
# 读取中文路径图片
def read_img(img_path):
img = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1)
return img
# 写中文路径图片
def save_img(save_path, img):
cv2.imencode('.jpg', img,)[1].tofile(save_path)
def read_online_image(img_url):
try:
response = request.urlopen(img_url)
img_array = np.array(bytearray(response.read()), dtype=np.uint8)
img = cv2.imdecode(img_array, -1)
return img
except Exception:
print('{} read failed!'.format(img_url))
return
# @clock_custom('[{elapsed:0.8f}s] {name}()')
# 速度快
def download_file(file_url, save_path):
if os.path.exists(save_path):
colorprint(Color.YELLOW, 'Image %s already exists. Skipping download.' % save_path)
return 0
try:
resp = requests.get(file_url)
file = resp.content
with open(save_path, 'wb') as fp:
fp.write(file)
return 1
except:
colorprint(Color.RED, 'Warning: can not download from {}'.format(file_url))
return 0
def download_files(url_list, save_dir, workers=8):
os.makedirs(save_dir, exist_ok=True)
def kernel(url):
name = os.path.basename(url)
save_path = os.path.join(save_dir, name)
res = download_file(url, save_path)
return res
exe = MultiThreading(url_list, workers)
result = exe.run(kernel)
print(f"Download {result.count(1)} files in '{save_dir}', {result.count(0)} failed.")
# @clock_custom('[{elapsed:0.8f}s] {name}()')
def download_image(img_url, save_path):
if os.path.exists(save_path):
print('Image %s already exists. Skipping download.' % save_path)
return
img = read_online_image(img_url)
if img is None:
return
save_img(save_path, img)
def download_video(vdo_url, save_path=None):
if save_path is None:
save_path = vdo_url.split('/')[-1]
# print("开始下载:%s" % os.path.basename(save_path))
if os.path.exists(save_path):
print('Video %s already exists. Skipping download.' % save_path)
return
r = requests.get(vdo_url, stream=True).content
with open(save_path, 'wb') as f:
f.write(r)
f.flush()
print("%s 下载完成!\n" % os.path.basename(save_path))
return
def check_images(img_dir, tmp_dir=None, batch=False):
if tmp_dir is None:
tmp_dir = os.path.join(img_dir, 'tmp')
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
if batch:
img_paths = get_files_paths_batch(img_dir, sort=False)
else:
img_paths = get_file_paths(img_dir, sort=False)
def check(img_path):
try:
img = cv2.imread(img_path)
size = img.shape
return 0
except Exception:
mv_cmd = f'mv {img_path} {tmp_dir}'
print(os.path.basename(img_path))
os.system(mv_cmd)
return 1
exe = MultiThreading(img_paths, 6)
res = exe.run(check)
print(f"total {len(img_paths)} images, {sum(res)} wrong.")
def divide_by_shape(img_dir, batch=False, b100=(1280, 720), b200=(1280, 960)):
tmp_dir = os.path.join(img_dir, 'tmp')
b100_dir = os.path.join(img_dir, 'b100')
b200_dir = os.path.join(img_dir, 'b200')
for sub in [tmp_dir, b200_dir, b100_dir]:
if not os.path.exists(sub):
os.makedirs(sub)
if batch:
img_paths = get_files_paths_batch(img_dir, sort=False)
else:
img_paths = get_file_paths(img_dir, sort=False)
def divide(img_path):
try:
img = cv2.imread(img_path)
h, w = img.shape[:2]
if (w, h) == b100:
mv_cmd = f'mv {img_path} {b100_dir}'
print(mv_cmd)
os.system(mv_cmd)
return 1
elif (w, h) == b200:
mv_cmd = f'mv {img_path} {b200_dir}'
print(mv_cmd)
os.system(mv_cmd)
return 2
else:
return 3
except Exception:
mv_cmd = f'mv {img_path} {tmp_dir}'
print(mv_cmd)
os.system(mv_cmd)
return 0
exe = MultiThreading(img_paths, 6)
res = list(exe.run(divide))
print(f"total {len(img_paths)} images, {res.count(1)} b100 {res.count(2)} b200 "
f"{res.count(3)} other {res.count(0)} wrong.")
def copy_files(ori_dir, dst_dir, file_type='img'):
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
if isinstance(ori_dir, str) and os.path.isdir(ori_dir):
print("load images, please wait ...")
img_paths = get_file_paths(ori_dir, abs_path=True, mod=file_type)
elif isinstance(ori_dir, list):
img_paths = ori_dir
else:
raise NotImplementedError(f"check input, '{ori_dir}' should be a dir or list of paths")
print(f"total {len(img_paths)} images")
def copy(img_path):
new_path = os.path.join(dst_dir, os.path.basename(img_path))
if os.path.exists(new_path):
return 0
cp_cmd = f"cp {img_path} {new_path}"
os.system(cp_cmd)
return 1
exe = MultiThreading(img_paths, 16)
res = exe.run(copy)
print(f"total {len(img_paths)} images, copy {res.count(1)} files, skip {res.count(0)} files")
def copy_minute_images(data_dir, save_dir, width=1280):
if not os.path.exists(save_dir):
os.makedirs(save_dir)
img_paths = get_file_paths(data_dir)
def copy(info):
i, img_path = info
img = cv2.imread(img_path)
w = img.shape[1]
if w != width:
return 0
cp_cmd = f"cp {img_path} {save_dir}"
print(i, cp_cmd)
# os.system(cp_cmd)
return 1
exe = MultiThreading(list(enumerate(img_paths)), 16)
res = exe.run(copy)
print(f"total {len(img_paths)} images, {res.count(1)} minute images")
def day_or_night(img_path, day=(630, 1830)):
pat_str = r'.+/202[0-1][0-1][0-9][0-3][0-9]([0-2][0-9][0-5][0-9])[0-9]+_.+'
pattern = re.compile(pat_str)
res = pattern.match(img_path)
if res is None:
return -1
cur_time = int(res.group(1))
if day[0] <= cur_time <= day[1]:
return 1
else:
return 0
def divide_by_time(img_dir, batch=False, day=(630, 1830)):
day_dir = os.path.join(img_dir, 'day')
night_dir = os.path.join(img_dir, 'night')
for sub in [day_dir, night_dir]:
if not os.path.exists(sub):
os.makedirs(sub)
if batch:
img_paths = get_files_paths_batch(img_dir, sort=False)
else:
img_paths = get_file_paths(img_dir, sort=False)
def divide(img_path):
r = day_or_night(img_path, day)
if r:
mv_cmd = f'mv {img_path} {day_dir}'
print(mv_cmd)
os.system(mv_cmd)
else:
mv_cmd = f'mv {img_path} {night_dir}'
print(mv_cmd)
os.system(mv_cmd)
return r
exe = MultiThreading(img_paths, 6)
res = list(exe.run(divide))
print(f"total {len(img_paths)} images, {res.count(1)} day {res.count(0)} night.")
def sample_images(img_dir, sample, mod='mv', save_dir=None):
img_paths = get_file_paths(img_dir, sort=False)
random.shuffle(img_paths)
sampled = random.sample(img_paths, sample)
sampled = [(idx, img_path) for idx, img_path in enumerate(sampled)]
if not save_dir:
save_dir = img_dir + '_sample'
if not os.path.exists(save_dir):
os.makedirs(save_dir)
def mv_img(info):
idx, img_path = info
img_name = os.path.basename(img_path)
new_path = os.path.join(save_dir, img_name)
cmd = f'{mod} {img_path} {new_path}'
print(idx, cmd)
os.system(cmd)
return 1
exe = MultiThreading(sampled, 12)
res = exe.run(mv_img)
print(sum(list(res)))
return sampled
def pick_files(label_file, data_dir, save_dir=None, label_cond=None, os_cmd='cp'):
label_dict = load_labels(label_file)
if save_dir is None:
save_dir = data_dir.rstrip('/') + '_pick'
os.makedirs(save_dir, exist_ok=True)
if label_cond is None:
picked = list(label_dict.keys())
else:
picked = [n for n, l in label_dict.items() if l == label_cond]
# picked = [n for n, l in label_dict.items() if l[5] == 1]
# picked = [n for n, l in label_dict.items()]
assert os_cmd in ['cp', 'mv']
def _pick(img_name):
img_path = os.path.join(data_dir, img_name)
if not os.path.exists(img_path):
log_error(f"{img_path} not exist.")
return 0
new_path = os.path.join(save_dir, img_name)
cmd = f'{os_cmd} {img_path} {new_path}'
os.system(cmd)
return 1
exe = MultiThreading(picked, workers=10)
res = exe.run(_pick)
print(f"total {len(picked)} items, {os_cmd} {res.count(1)} items, {res.count(0)} not exist.")
def load_images(input_data, workers=12):
if input_data and isinstance(input_data, list) and os.path.isfile(input_data[0]):
img_paths = input_data
elif os.path.isdir(input_data):
img_paths = get_file_paths(input_data, sort=False)
else:
raise NotImplementedError
def load(img_path):
try:
img = read_img(img_path)
return img_path, img
except Exception:
return img_path, None
exe = MultiThreading(img_paths, max(workers, 8))
res = exe.run(load)
out_paths = [r[0] for r in res]
assert img_paths == out_paths
cache_images = [r[1] for r in res if r[1] is not None]
assert len(cache_images) == len(img_paths), \
f"Not load complete! Input paths length {len(img_paths)} != load length {len(cache_images)}"
return tuple(cache_images)
def export_binary_files(inp_list, save_dir):
os.makedirs(save_dir, exist_ok=True)
save_file = save_dir.rstrip('/') + '.txt'
def export(info):
img_name, raw_img = info
new_name = os.path.splitext(img_name)[0] + '.raw'
save_path = os.path.join(save_dir, new_name)
relative_path = os.path.join(os.path.basename(save_dir), new_name)
raw_img.tofile(save_path)
return relative_path
exe = MultiThreading(inp_list)
res = exe.run(export)
with open(save_file, 'w') as f:
f.write('\n'.join(res))
print(f"Save {len(res)} binary files in '{save_file}'")
def sequence_to_file(lst, save_file):
if not lst:
return
assert isinstance(lst, (list, tuple, set))
lst = [str(l) for l in lst]
with open(save_file, "w") as f:
f.write('\n'.join(lst))
print(f"save {len(lst)} items in '{save_file}'")
if __name__ == '__main__':
img_dir = '/nfs/volume-236-2/qilongyu/person_seats/classify/images'
# copy_minute_images(img_dir, save_dir='/nfs/volume-236-2/iov_ann/minute_images')
# sample_occ_images(img_dir, sample=1, mod='cp', save_dir='/Users/didi/Desktop/视频分类/dataset/ps_occ')
# sample_images(img_dir, 1000, save_dir='/Users/didi/Desktop/人数座次/personData/fhm/fhm_1', mod='mv')
# data_dir = '/Users/didi/Desktop/视频分类/dataset/inner'
# save_dir = '/Users/didi/Desktop/视频分类/dataset/min_1208'
# for sub in os.listdir(data_dir):
# if sub.startswith('20201208'):
# cmd = f"mv {data_dir}/{sub} {save_dir}/{sub}"
# print(cmd)
# os.system(cmd)
# data_dir = '/Users/didi/Desktop/视频分类/dataset/anomaly_video2/ppp'
# print(os.listdir(data_dir))
# get_files_paths_batch(data_dir)
# download_video('http://100.69.239.80:8002/static/tac_permanent_ns_inner/20220629163931_991745085639127040_338345_173696_.mp4'
# )
# load_images('/Users/didi/Desktop/MTL/dataset/images')
check_images("/mnt/qly/dms3/images")