NDLOCR / cli /core /utils.py
3v324v23's picture
Add files
history blame
No virus
7.77 kB
# Copyright (c) 2022, National Diet Library, Japan
# This software is released under the CC BY 4.0.
# https://creativecommons.org/licenses/by/4.0/
import copy
import datetime
import glob
import os
import sys
import yaml
def parse_cfg(cfg_dict):
cfg_dict : dict
infer_cfg : dict
infer_cfg = copy.deepcopy(cfg_dict)
# add inference config parameters from yml config file
yml_config = None
if not os.path.isfile(cfg_dict['config_file']):
print('[ERROR] Config yml file not found.', file=sys.stderr)
return None
with open(cfg_dict['config_file'], 'r') as yml:
yml_config = yaml.safe_load(yml)
if type(yml_config) is not dict:
print('[ERROR] Config yml file read error.', file=sys.stderr)
return None
# save_xml will be ignored when last proc does not output xml data
if (infer_cfg['proc_range'] != '0..3') and (infer_cfg['save_xml'] or infer_cfg['save_image']):
print('[WARNING] save_xml and save_image flags are ignored because this is partial execution.')
print(' All output of last proc will be saved in output directory.')
# parse start/end indices of inference process
start = int(infer_cfg['proc_range'][0])
end = int(infer_cfg['proc_range'][-1])
if start > end:
print('[ERROR] Value of proc_range must be [x..y : x <= y] .', file=sys.stderr)
return None
infer_cfg['proc_range'] = {
'start': start,
'end': end
if (start != 0) or (end != 3):
infer_cfg['partial_infer'] = True
infer_cfg['partial_infer'] = False
# create input_dirs from input_root
# input_dirs is list of dirs that contain img (and xml) dir
infer_cfg['input_root'] = os.path.abspath(infer_cfg['input_root'])
infer_cfg['output_root'] = os.path.abspath(infer_cfg['output_root'])
if infer_cfg['input_structure'] in ['s']:
# - Sigle input dir mode
# input_root
# ├── xml
# │ └── R[7桁連番].xml※XMLデータ
# └── img
# └── R[7桁連番]_pp.jp2※画像データ
# validation check for input dir structure
if not os.path.isdir(os.path.join(infer_cfg['input_root'], 'img')):
print('[ERROR] Input img diretctory not found in {}'.format(infer_cfg['input_root']), file=sys.stderr)
return None
if (start > 2) and (not os.path.isdir(os.path.join(infer_cfg['input_root'], 'xml'))):
print('[ERROR] Input xml diretctory not found in {}'.format(infer_cfg['input_root']), file=sys.stderr)
return None
infer_cfg['input_dirs'] = [infer_cfg['input_root']]
elif infer_cfg['input_structure'] in ['i']:
# - Partial inference mode
# input_root
# └── PID
# ├── xml
# │ └── R[7桁連番].xml※XMLデータ
# └── img
# └── R[7桁連番]_pp.jp2※画像データ
infer_cfg['input_dirs'] = []
for input_dir in glob.glob(os.path.join(infer_cfg['input_root'], '*')):
if os.path.isdir(input_dir):
if not os.path.isdir(os.path.join(input_dir, 'img')):
print('[WARNING] Input directory {0} is skipped(no img diretctory)'.format(input_dir))
if (start > 1) and (not os.path.isdir(os.path.join(input_dir, 'xml'))):
print('[WARNING] Input directory {0} is skipped(no xml diretctory)'.format(input_dir))
elif infer_cfg['input_structure'] in ['t']:
# - ToshoData mode
# input_root
# └── tosho_19XX_bunkei
# └── R[7桁連番]_pp.jp2※画像データ
infer_cfg['input_dirs'] = []
for input_dir in glob.glob(os.path.join(infer_cfg['input_root'], '*')):
if os.path.isdir(input_dir):
if 'img' in [os.path.basename(d) for d in infer_cfg['input_dirs']]:
print('[WARNING] This input structure might be single input(img diretctory found)')
elif infer_cfg['input_structure'] in ['w']:
# - Work station input mode
# input_root
# └── workstation
# └── [collect(3桁数字)、またはdigital(3桁数字)]フォルダ
# └── [15桁連番]フォルダ※PID上1桁目
# └── [3桁連番]フォルダ※PID上2~4桁目
# └── [3桁連番]フォルダ※PID上5~7桁目
# └── R[7桁連番]_contents.jp2※画像データ
# recursive function to get input_dirs in workstation mode
def get_input_dirs(path, depth):
depth += 1
ret_list = []
current_list = []
for input_dir in glob.glob(os.path.join(path, '*')):
if os.path.isdir(input_dir):
if depth > 3:
return current_list
if (depth < 2) and (len(current_list) == 0):
print('[ERROR] Input directory structure dose not match workstation mode', file=sys.stderr)
return []
for dir in current_list:
tmp_list = get_input_dirs(dir, depth)
return ret_list
# check if workstation directory exist
work_dir = os.path.join(infer_cfg['input_root'], 'workstation')
if not os.path.isdir(work_dir):
print('[ERROR] \'workstation\' directory not found', file=sys.stderr)
return None
# get input dir list
infer_cfg['input_dirs'] = get_input_dirs(work_dir, 0)
elif infer_cfg['input_structure'] in ['f']:
# - Image file input mode
# input_root is equal to input image file path
infer_cfg['input_dirs'] = [infer_cfg['input_root']]
print('[ERROR] Unexpected input directory structure type: {0}.'.format(infer_cfg['input_structure']), file=sys.stderr)
return None
return infer_cfg
def save_xml(xml_to_save, path):
path : str
print('### save xml : {}###'.format(path))
xml_to_save.write(path, encoding='utf-8', xml_declaration=True)
except OSError as err:
print("[ERROR] XML save error : {0}".format(err), file=sys.stderr)
raise OSError
def mkdir_with_duplication_check(dir_path):
dir_path_to_create = dir_path
# prepare output root derectory
while os.path.isdir(dir_path_to_create):
print('[WARNING] Directory {0} already exist.'.format(dir_path))
now = datetime.datetime.now()
time_stamp = now.strftime('_%Y%m%d%H%M%S')
dir_path_to_create += time_stamp
if dir_path_to_create != dir_path:
print('[WARNING] Directory is changed to {0}.'.format(dir_path_to_create))
return dir_path_to_create