|
|
|
import torch |
|
import gc |
|
from pathlib import Path |
|
from pydub import AudioSegment |
|
|
|
|
|
import os |
|
import csv |
|
import json |
|
from typing import List, Tuple, Optional, Set |
|
import argparse |
|
import time |
|
import sys |
|
from nemo.collections.asr.models import ASRModel |
|
import subprocess |
|
import shutil |
|
|
|
|
|
MODEL_NAME = "nvidia/parakeet-tdt-0.6b-v2" |
|
TARGET_SAMPLE_RATE = 16000 |
|
|
|
LONG_AUDIO_THRESHOLD_SECONDS = 480 |
|
VERY_LONG_AUDIO_THRESHOLD_SECONDS = 10800 |
|
|
|
CHUNK_LENGTH_SECONDS = 1800 |
|
CHUNK_OVERLAP_SECONDS = 60 |
|
|
|
MAX_SEGMENT_LENGTH_SECONDS = 15 |
|
MAX_SEGMENT_CHARS = 100 |
|
MIN_SEGMENT_GAP_SECONDS = 0.3 |
|
|
|
MAX_VTT_SIZE_BYTES = 10 * 1024 * 1024 |
|
|
|
SENTENCE_ENDINGS = ['.', '!', '?', '。', '!', '?'] |
|
SENTENCE_PAUSES = [',', '、', ';', ';', ':', ':'] |
|
|
|
INPUT_PRIORITY_EXTENSIONS: List[str] = ['.wav', '.mp3', '.mp4'] |
|
|
|
DEFAULT_OUTPUT_FORMATS: List[str] = ["csv", "srt", "vtt", "json", "lrc"] |
|
|
|
|
|
|
|
def preprocess_audio_cli(audio_path_str: str, output_dir_for_temp_files: str) -> Tuple[Optional[str], Optional[str], Optional[float]]: |
|
""" |
|
オーディオファイルの前処理(リサンプリング、モノラル変換)を行います。 |
|
成功した場合、(処理済みファイルパス, 表示用名, 音声長) を返します。 |
|
失敗した場合、(None, None, None) を返します。 |
|
""" |
|
try: |
|
audio_file_path = Path(audio_path_str) |
|
original_path_name = audio_file_path.name |
|
audio_name_stem = audio_file_path.stem |
|
|
|
print(f" 音声ファイルをロード中: {original_path_name}") |
|
|
|
|
|
duration_sec = get_audio_duration_with_ffprobe(audio_path_str) |
|
if duration_sec is None: |
|
print("エラー: ffprobeで音声長の取得に失敗しました") |
|
return None, None, None |
|
|
|
print(f" 音声長: {duration_sec:.2f} 秒") |
|
|
|
|
|
file_size = Path(audio_path_str).stat().st_size |
|
file_size_gb = file_size / (1024**3) |
|
print(f" ファイルサイズ: {file_size_gb:.2f} GB") |
|
|
|
|
|
if file_size > 4 * 1024**3 or duration_sec > VERY_LONG_AUDIO_THRESHOLD_SECONDS: |
|
print(f" 大容量ファイル({file_size_gb:.2f}GB, {duration_sec/3600:.2f}時間)のため、ffmpegで直接チャンク分割処理を行います。") |
|
|
|
temp_mono_path = Path(output_dir_for_temp_files) / f"{audio_name_stem}_mono_temp.wav" |
|
try: |
|
cmd = [ |
|
'ffmpeg', '-y', '-i', audio_path_str, |
|
'-ac', '1', |
|
'-ar', str(TARGET_SAMPLE_RATE), |
|
str(temp_mono_path) |
|
] |
|
subprocess.run(cmd, capture_output=True, check=True) |
|
return temp_mono_path.as_posix(), f"{original_path_name} (大容量・モノラル)", duration_sec |
|
except subprocess.CalledProcessError as e: |
|
print(f" ffmpegでのモノラル変換に失敗: {e}") |
|
return audio_path_str, f"{original_path_name} (大容量)", duration_sec |
|
|
|
|
|
try: |
|
audio = AudioSegment.from_file(audio_path_str) |
|
except Exception as pydub_e: |
|
if "4GB" in str(pydub_e) or "Unable to process" in str(pydub_e): |
|
print(f" pydubで4GB制限エラー。ffmpegで処理します: {pydub_e}") |
|
return audio_path_str, f"{original_path_name} (大容量)", duration_sec |
|
else: |
|
raise pydub_e |
|
|
|
resampled = False |
|
mono_converted = False |
|
|
|
|
|
if audio.frame_rate != TARGET_SAMPLE_RATE: |
|
try: |
|
print(f" リサンプリング中: {audio.frame_rate}Hz -> {TARGET_SAMPLE_RATE}Hz") |
|
audio = audio.set_frame_rate(TARGET_SAMPLE_RATE) |
|
resampled = True |
|
except Exception as resample_e: |
|
print(f"エラー: 音声のリサンプリングに失敗しました: {resample_e}") |
|
return None, None, None |
|
|
|
|
|
if audio.channels > 1: |
|
try: |
|
print(f" モノラルに変換中 ({audio.channels}ch -> 1ch)") |
|
audio = audio.set_channels(1) |
|
mono_converted = True |
|
except Exception as mono_e: |
|
print(f"エラー: 音声のモノラル変換に失敗しました: {mono_e}") |
|
return None, None, None |
|
elif audio.channels == 1: |
|
print(" 音声は既にモノラルです。") |
|
|
|
processed_temp_file_path_obj = None |
|
|
|
if resampled or mono_converted: |
|
try: |
|
|
|
import re |
|
safe_stem = re.sub(r'[^\w\-_\.]', '_', audio_name_stem) |
|
temp_suffix = "_preprocessed_temp.wav" |
|
processed_temp_file_path_obj = Path(output_dir_for_temp_files, f"{safe_stem}{temp_suffix}") |
|
|
|
print(f" 前処理済み音声の一時保存先: {processed_temp_file_path_obj.name}") |
|
audio.export(processed_temp_file_path_obj, format="wav") |
|
|
|
path_for_transcription = processed_temp_file_path_obj.as_posix() |
|
display_name_for_info = f"{original_path_name} (前処理済み)" |
|
except Exception as export_e: |
|
print(f"エラー: 前処理済み音声のエクスポートに失敗しました: {export_e}") |
|
if processed_temp_file_path_obj and processed_temp_file_path_obj.exists(): |
|
try: |
|
os.remove(processed_temp_file_path_obj) |
|
except OSError: |
|
pass |
|
return None, None, None |
|
else: |
|
|
|
print(" 前処理は不要でした。元のファイルを使用します。") |
|
path_for_transcription = audio_path_str |
|
display_name_for_info = original_path_name |
|
|
|
return path_for_transcription, display_name_for_info, duration_sec |
|
|
|
except FileNotFoundError: |
|
print(f"エラー: 音声ファイルが見つかりません: {audio_path_str}") |
|
return None, None, None |
|
except Exception as load_e: |
|
print(f"エラー: 音声ファイル '{original_path_name}' のロード/デコードに失敗しました: {load_e}") |
|
return None, None, None |
|
|
|
def get_audio_duration_with_ffprobe(audio_path_str: str) -> Optional[float]: |
|
"""ffprobeを使用して音声ファイルの長さを取得(4GB制限なし)""" |
|
try: |
|
|
|
if not shutil.which('ffprobe'): |
|
print("警告: ffprobeが見つかりません。pydubでの処理を試行します。") |
|
return None |
|
|
|
cmd = [ |
|
'ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', |
|
'-of', 'csv=p=0', audio_path_str |
|
] |
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) |
|
|
|
if result.returncode == 0 and result.stdout.strip(): |
|
duration = float(result.stdout.strip()) |
|
return duration |
|
else: |
|
print(f"ffprobeエラー: {result.stderr}") |
|
return None |
|
|
|
except subprocess.TimeoutExpired: |
|
print("ffprobeがタイムアウトしました") |
|
return None |
|
except Exception as e: |
|
print(f"ffprobeでの音声長取得エラー: {e}") |
|
return None |
|
|
|
|
|
def find_natural_break_point(text: str, max_length: int) -> int: |
|
"""テキスト内で自然な区切り点を探す""" |
|
if len(text) <= max_length: |
|
return len(text) |
|
|
|
|
|
for i in range(max_length, 0, -1): |
|
if i < len(text) and text[i] in SENTENCE_ENDINGS: |
|
return i + 1 |
|
|
|
|
|
for i in range(max_length, 0, -1): |
|
if i < len(text) and text[i] in SENTENCE_PAUSES: |
|
return i + 1 |
|
|
|
|
|
for i in range(max_length, 0, -1): |
|
if i < len(text) and text[i].isspace(): |
|
return i + 1 |
|
|
|
|
|
return max_length |
|
|
|
def split_segment(segment: dict, max_length_seconds: float, max_chars: int) -> List[dict]: |
|
"""セグメントを自然な区切りで分割する""" |
|
if (segment['end'] - segment['start']) <= max_length_seconds and len(segment['segment']) <= max_chars: |
|
return [segment] |
|
|
|
result = [] |
|
current_text = segment['segment'] |
|
current_start = segment['start'] |
|
total_duration = segment['end'] - segment['start'] |
|
|
|
while current_text: |
|
|
|
break_point = find_natural_break_point(current_text, max_chars) |
|
|
|
|
|
text_ratio = break_point / len(segment['segment']) |
|
segment_duration = total_duration * text_ratio |
|
|
|
|
|
if segment_duration > max_length_seconds: |
|
time_ratio = max_length_seconds / total_duration |
|
break_point = int(len(segment['segment']) * time_ratio) |
|
break_point = find_natural_break_point(current_text, break_point) |
|
segment_duration = max_length_seconds |
|
|
|
|
|
new_segment = { |
|
'start': current_start, |
|
'end': current_start + segment_duration, |
|
'segment': current_text[:break_point].strip() |
|
} |
|
result.append(new_segment) |
|
|
|
|
|
current_text = current_text[break_point:].strip() |
|
current_start = new_segment['end'] |
|
|
|
return result |
|
|
|
def transcribe_audio_cli( |
|
transcribe_path_str: str, |
|
model: ASRModel, |
|
duration_sec: float, |
|
device: str |
|
) -> Tuple[Optional[List], Optional[List], Optional[List]]: |
|
long_audio_settings_applied = False |
|
original_model_dtype = model.dtype |
|
|
|
try: |
|
if device == 'cuda': |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
|
|
model.to(device) |
|
|
|
|
|
if duration_sec > LONG_AUDIO_THRESHOLD_SECONDS: |
|
try: |
|
print(f" 情報: 音声長 ({duration_sec:.0f}s) が閾値 ({LONG_AUDIO_THRESHOLD_SECONDS}s) を超えるため、長尺音声向け設定を適用します。") |
|
model.change_attention_model( |
|
self_attention_model="rel_pos_local_attn", |
|
att_context_size=[128, 128] |
|
) |
|
model.change_subsampling_conv_chunking_factor(1) |
|
long_audio_settings_applied = True |
|
if device == 'cuda': |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
except Exception as setting_e: |
|
print(f" 警告: 長尺音声向け設定の適用に失敗しました: {setting_e}。デフォルト設定で続行します。") |
|
|
|
if device == 'cuda' and torch.cuda.is_bf16_supported(): |
|
print(" 情報: モデルを bfloat16 に変換して推論を実行します。") |
|
model.to(torch.bfloat16) |
|
elif model.dtype != original_model_dtype: |
|
model.to(original_model_dtype) |
|
|
|
print(f" 文字起こしを実行中 (デバイス: {device}, モデルdtype: {model.dtype})...") |
|
output = model.transcribe( |
|
[transcribe_path_str], |
|
timestamps=True, |
|
batch_size=2 |
|
) |
|
|
|
if not output or not isinstance(output, list) or not output[0] or \ |
|
not hasattr(output[0], 'timestamp') or not output[0].timestamp or \ |
|
'segment' not in output[0].timestamp: |
|
print(" エラー: 文字起こしに失敗したか、予期しない出力形式です。") |
|
return None, None, None |
|
|
|
segment_timestamps = output[0].timestamp['segment'] |
|
|
|
|
|
processed_segments = [] |
|
current_segment = None |
|
|
|
for ts in segment_timestamps: |
|
if current_segment is None: |
|
current_segment = ts |
|
else: |
|
|
|
time_gap = ts['start'] - current_segment['end'] |
|
current_text = current_segment['segment'] |
|
next_text = ts['segment'] |
|
|
|
|
|
should_merge = ( |
|
time_gap < MIN_SEGMENT_GAP_SECONDS and |
|
len(current_text) + len(next_text) < MAX_SEGMENT_CHARS and |
|
(current_segment['end'] - current_segment['start']) < MAX_SEGMENT_LENGTH_SECONDS and |
|
(ts['end'] - ts['start']) < MAX_SEGMENT_LENGTH_SECONDS and |
|
not any(current_text.strip().endswith(p) for p in SENTENCE_ENDINGS) |
|
) |
|
|
|
if should_merge: |
|
current_segment['end'] = ts['end'] |
|
current_segment['segment'] += ' ' + ts['segment'] |
|
else: |
|
|
|
split_segments = split_segment(current_segment, MAX_SEGMENT_LENGTH_SECONDS, MAX_SEGMENT_CHARS) |
|
processed_segments.extend(split_segments) |
|
current_segment = ts |
|
|
|
if current_segment is not None: |
|
|
|
split_segments = split_segment(current_segment, MAX_SEGMENT_LENGTH_SECONDS, MAX_SEGMENT_CHARS) |
|
processed_segments.extend(split_segments) |
|
|
|
|
|
vis_data = [[f"{ts['start']:.2f}", f"{ts['end']:.2f}", ts['segment']] for ts in processed_segments] |
|
raw_times_data = [[ts['start'], ts['end']] for ts in processed_segments] |
|
|
|
|
|
word_timestamps_raw = output[0].timestamp.get("word", []) |
|
word_vis_data = [] |
|
|
|
for w in word_timestamps_raw: |
|
if not isinstance(w, dict) or not all(k in w for k in ['start', 'end', 'word']): |
|
continue |
|
|
|
|
|
word_start = float(w['start']) |
|
word_end = float(w['end']) |
|
|
|
|
|
for seg in processed_segments: |
|
if word_start >= seg['start'] - 0.05 and word_end <= seg['end'] + 0.05: |
|
word_vis_data.append([f"{word_start:.2f}", f"{word_end:.2f}", w["word"]]) |
|
break |
|
|
|
print(" 文字起こし完了。") |
|
return vis_data, raw_times_data, word_vis_data |
|
|
|
except torch.cuda.OutOfMemoryError as oom_e: |
|
print(f" 致命的エラー: CUDAメモリ不足です。 {oom_e}") |
|
print(" バッチサイズを小さくする、他のGPU利用アプリを終了するなどの対策を試みてください。") |
|
return None, None, None |
|
except Exception as e: |
|
print(f" エラー: 文字起こし処理中に予期せぬエラーが発生しました: {e}") |
|
import traceback |
|
traceback.print_exc() |
|
return None, None, None |
|
finally: |
|
if long_audio_settings_applied: |
|
try: |
|
print(" 長尺音声向け設定を元に戻しています。") |
|
model.change_attention_model(self_attention_model="rel_pos") |
|
model.change_subsampling_conv_chunking_factor(-1) |
|
except Exception as revert_e: |
|
print(f" 警告: 長尺音声設定の復元に失敗: {revert_e}") |
|
|
|
model.to(original_model_dtype) |
|
if model.device.type != 'cpu': |
|
model.cpu() |
|
|
|
if device == 'cuda': |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
|
|
|
|
|
|
def save_transcripts_cli(output_dir_str: str, audio_file_stem: str, |
|
vis_data: List, word_vis_data: List, formats: Optional[List[str]] = None): |
|
if formats is None: |
|
formats_to_save = DEFAULT_OUTPUT_FORMATS |
|
else: |
|
formats_to_save = formats |
|
|
|
output_dir_path = Path(output_dir_str) |
|
output_dir_path.mkdir(parents=True, exist_ok=True) |
|
saved_files_count = 0 |
|
|
|
print(f" 結果を保存中 (対象形式: {', '.join(formats_to_save)})...") |
|
try: |
|
if "csv" in formats_to_save: |
|
csv_file_path = output_dir_path / f"{audio_file_stem}.csv" |
|
csv_headers = ["Start (s)", "End (s)", "Segment"] |
|
with open(csv_file_path, 'w', newline='', encoding='utf-8') as f: |
|
writer = csv.writer(f); writer.writerow(csv_headers); writer.writerows(vis_data) |
|
print(f" CSVファイルを保存: {csv_file_path.name}"); saved_files_count +=1 |
|
if "srt" in formats_to_save: |
|
srt_file_path = output_dir_path / f"{audio_file_stem}.srt" |
|
write_srt(vis_data, srt_file_path) |
|
print(f" SRTファイルを保存: {srt_file_path.name}"); saved_files_count +=1 |
|
if "vtt" in formats_to_save: |
|
vtt_file_path = output_dir_path / f"{audio_file_stem}.vtt" |
|
try: |
|
write_vtt(vis_data, word_vis_data, vtt_file_path) |
|
print(f" VTTファイルを保存: {vtt_file_path.name}"); saved_files_count +=1 |
|
except ValueError as e: |
|
if "VTTファイルサイズが制限を超えました" in str(e): |
|
print(f" エラー: {e}") |
|
|
|
if vtt_file_path.exists(): |
|
vtt_file_path.unlink() |
|
raise |
|
if "json" in formats_to_save: |
|
json_file_path = output_dir_path / f"{audio_file_stem}.json" |
|
write_json(vis_data, word_vis_data, json_file_path) |
|
print(f" JSONファイルを保存: {json_file_path.name}"); saved_files_count +=1 |
|
if "lrc" in formats_to_save: |
|
lrc_file_path = output_dir_path / f"{audio_file_stem}.lrc" |
|
write_lrc(vis_data, lrc_file_path) |
|
print(f" LRCファイルを保存: {lrc_file_path.name}"); saved_files_count +=1 |
|
|
|
if saved_files_count == 0 and formats_to_save: |
|
print(f" 警告: 指定されたフォーマット {formats_to_save} でのファイルの保存は行われませんでした。") |
|
except Exception as e: |
|
print(f" エラー: 文字起こしファイルの保存中にエラーが発生しました: {e}") |
|
raise |
|
|
|
|
|
def write_srt(segments: List, path: Path): |
|
def sec2srt(t_float: float) -> str: |
|
h, rem = divmod(int(t_float), 3600); m, s = divmod(rem, 60) |
|
ms = int((t_float - int(t_float)) * 1000) |
|
return f"{h:02}:{m:02}:{s:02},{ms:03}" |
|
with open(path, "w", encoding="utf-8") as f: |
|
for i, seg_list in enumerate(segments, 1): |
|
f.write(f"{i}\n{sec2srt(float(seg_list[0]))} --> {sec2srt(float(seg_list[1]))}\n{seg_list[2]}\n\n") |
|
|
|
def write_vtt(segments: List, words: List, path: Path): |
|
def sec2vtt(t_float: float) -> str: |
|
h, rem = divmod(int(t_float), 3600) |
|
m, s = divmod(rem, 60) |
|
ms = int((t_float - int(t_float)) * 1000) |
|
return f"{h:02}:{m:02}:{s:02}.{ms:03}" |
|
|
|
with open(path, "w", encoding="utf-8") as f: |
|
f.write("WEBVTT\n\n") |
|
f.write("STYLE\n") |
|
f.write("::cue(.current) { color: #ffff00; font-weight: bold; }\n") |
|
f.write("::cue(.past) { color: #888888; }\n") |
|
f.write("::cue(.future) { color: #ffffff; }\n") |
|
f.write("::cue(.line) { background: rgba(0,0,0,0.7); padding: 4px; }\n\n") |
|
|
|
if not words: |
|
|
|
for i, seg_list in enumerate(segments, 1): |
|
f.write(f"NOTE Segment {i}\n") |
|
f.write(f"{sec2vtt(float(seg_list[0]))} --> {sec2vtt(float(seg_list[1]))}\n{seg_list[2]}\n\n") |
|
|
|
|
|
current_size = f.tell() |
|
if current_size > MAX_VTT_SIZE_BYTES: |
|
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") |
|
raise ValueError("VTTファイルサイズが制限を超えました") |
|
return |
|
|
|
|
|
for seg_data in segments: |
|
seg_start = float(seg_data[0]) |
|
seg_end = float(seg_data[1]) |
|
|
|
|
|
segment_words = [] |
|
for word_idx, word_data in enumerate(words): |
|
word_start = float(word_data[0]) |
|
word_end = float(word_data[1]) |
|
if word_start >= seg_start - 0.1 and word_end <= seg_end + 0.1: |
|
segment_words.append((word_idx, word_data)) |
|
|
|
if not segment_words: |
|
continue |
|
|
|
|
|
all_words = [w_data[2] for _, w_data in segment_words] |
|
|
|
|
|
first_word_start = float(segment_words[0][1][0]) |
|
if seg_start < first_word_start - 0.05: |
|
f.write(f"{sec2vtt(seg_start)} --> {sec2vtt(first_word_start)}\n") |
|
f.write(f'<c.line>{" ".join(f"<c.future>{w}</c>" for w in all_words)}</c>\n\n') |
|
|
|
|
|
current_size = f.tell() |
|
if current_size > MAX_VTT_SIZE_BYTES: |
|
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") |
|
raise ValueError("VTTファイルサイズが制限を超えました") |
|
|
|
|
|
for local_idx, (_, word_data) in enumerate(segment_words): |
|
w_start = float(word_data[0]) |
|
w_end = float(word_data[1]) |
|
|
|
|
|
f.write(f"{sec2vtt(w_start)} --> {sec2vtt(w_end)}\n") |
|
|
|
|
|
line_parts = [] |
|
for i, w in enumerate(all_words): |
|
if i == local_idx: |
|
line_parts.append(f'<c.current>{w}</c>') |
|
elif i < local_idx: |
|
line_parts.append(f'<c.past>{w}</c>') |
|
else: |
|
line_parts.append(f'<c.future>{w}</c>') |
|
|
|
f.write(f'<c.line>{" ".join(line_parts)}</c>\n\n') |
|
|
|
|
|
current_size = f.tell() |
|
if current_size > MAX_VTT_SIZE_BYTES: |
|
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") |
|
raise ValueError("VTTファイルサイズが制限を超えました") |
|
|
|
|
|
if local_idx < len(segment_words) - 1: |
|
next_word_start = float(segment_words[local_idx + 1][1][0]) |
|
gap_duration = next_word_start - w_end |
|
|
|
if gap_duration > 0.05: |
|
f.write(f"{sec2vtt(w_end)} --> {sec2vtt(next_word_start)}\n") |
|
f.write(f'<c.line>{" ".join(f"<c.past>{w}</c>" if i <= local_idx else f"<c.future>{w}</c>" for i, w in enumerate(all_words))}</c>\n\n') |
|
|
|
|
|
current_size = f.tell() |
|
if current_size > MAX_VTT_SIZE_BYTES: |
|
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") |
|
raise ValueError("VTTファイルサイズが制限を超えました") |
|
|
|
|
|
last_word_end = float(segment_words[-1][1][1]) |
|
if last_word_end < seg_end - 0.05: |
|
f.write(f"{sec2vtt(last_word_end)} --> {sec2vtt(seg_end)}\n") |
|
f.write(f'<c.line>{" ".join(f"<c.past>{w}</c>" for w in all_words)}</c>\n\n') |
|
|
|
|
|
current_size = f.tell() |
|
if current_size > MAX_VTT_SIZE_BYTES: |
|
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") |
|
raise ValueError("VTTファイルサイズが制限を超えました") |
|
|
|
def write_json(segments: List, words: List, path: Path): |
|
result = {"segments": []}; word_idx = 0 |
|
for seg_data in segments: |
|
s_start_time = float(seg_data[0]); s_end_time = float(seg_data[1]); s_text = seg_data[2] |
|
segment_words_list: List[dict] = []; temp_current_word_idx = word_idx |
|
if words: |
|
while temp_current_word_idx < len(words): |
|
w_data = words[temp_current_word_idx]; w_start_time = float(w_data[0]); w_end_time = float(w_data[1]) |
|
if w_start_time >= s_start_time and w_end_time <= s_end_time + 0.1: |
|
segment_words_list.append({"start": w_start_time, "end": w_end_time, "word": w_data[2]}) |
|
temp_current_word_idx += 1 |
|
elif w_start_time < s_start_time : |
|
temp_current_word_idx += 1 |
|
elif w_start_time > s_end_time: |
|
break |
|
else: |
|
temp_current_word_idx += 1 |
|
word_idx = temp_current_word_idx |
|
result["segments"].append({"start": s_start_time, "end": s_end_time, "text": s_text, "words": segment_words_list}) |
|
with open(path, "w", encoding="utf-8") as f: |
|
json.dump(result, f, ensure_ascii=False, indent=2) |
|
|
|
def write_lrc(segments: List, path: Path): |
|
def sec2lrc(t_float: float) -> str: |
|
m, s = divmod(float(t_float), 60) |
|
return f"[{int(m):02d}:{s:05.2f}]" |
|
with open(path, "w", encoding="utf-8") as f: |
|
for seg_list in segments: |
|
f.write(f"{sec2lrc(float(seg_list[0]))}{seg_list[2]}\n") |
|
|
|
|
|
def split_audio_with_overlap_cli( |
|
audio_path_str: str, |
|
output_dir_for_chunks: str, |
|
chunk_length_sec: int = CHUNK_LENGTH_SECONDS, |
|
overlap_sec: int = CHUNK_OVERLAP_SECONDS |
|
) -> List[str]: |
|
print(f" 音声分割中: 基本チャンク長 {chunk_length_sec}s, オーバーラップ {overlap_sec}s") |
|
|
|
|
|
file_size = Path(audio_path_str).stat().st_size |
|
file_size_gb = file_size / (1024**3) |
|
|
|
|
|
if file_size > 4 * 1024**3: |
|
print(f" 大容量ファイル({file_size_gb:.2f}GB)のため、ffmpegで分割処理を実行します。") |
|
return split_audio_with_ffmpeg(audio_path_str, output_dir_for_chunks, chunk_length_sec, overlap_sec) |
|
|
|
|
|
try: |
|
audio = AudioSegment.from_file(audio_path_str) |
|
except Exception as e: |
|
if "4GB" in str(e) or "Unable to process" in str(e): |
|
print(f" pydubで4GB制限エラー。ffmpegで処理します: {e}") |
|
return split_audio_with_ffmpeg(audio_path_str, output_dir_for_chunks, chunk_length_sec, overlap_sec) |
|
else: |
|
print(f" エラー: 音声ファイル '{Path(audio_path_str).name}' のロード中にエラー(分割処理): {e}") |
|
return [] |
|
|
|
|
|
duration_ms = len(audio); chunk_length_ms = chunk_length_sec * 1000; overlap_ms = overlap_sec * 1000 |
|
chunk_paths_list: List[str] = []; start_ms = 0; chunk_idx = 0 |
|
audio_file_stem = Path(audio_path_str).stem |
|
while start_ms < duration_ms: |
|
actual_chunk_start_ms = max(0, start_ms - (overlap_ms if start_ms > 0 else 0) ) |
|
base_chunk_end_ms = start_ms + chunk_length_ms |
|
actual_chunk_end_ms = min(base_chunk_end_ms + (overlap_ms if base_chunk_end_ms < duration_ms else 0), duration_ms) |
|
if actual_chunk_start_ms >= actual_chunk_end_ms : |
|
if start_ms >= duration_ms: break |
|
print(f" 警告: チャンク計算で予期せぬ状態。スキップします。") |
|
start_ms += chunk_length_ms; continue |
|
chunk_segment = audio[actual_chunk_start_ms:actual_chunk_end_ms] |
|
chunk_file_name = f"{audio_file_stem}_chunk_{chunk_idx:03d}_temp.wav" |
|
chunk_file_path_obj = Path(output_dir_for_chunks, chunk_file_name) |
|
try: |
|
chunk_segment.export(chunk_file_path_obj, format="wav") |
|
chunk_paths_list.append(chunk_file_path_obj.as_posix()) |
|
except Exception as export_chunk_e: |
|
print(f" エラー: 一時チャンクファイル {chunk_file_name} のエクスポートに失敗: {export_chunk_e}") |
|
start_ms += chunk_length_ms; chunk_idx += 1 |
|
print(f" 音声を {len(chunk_paths_list)} 個のチャンクに分割しました。") |
|
return chunk_paths_list |
|
|
|
def split_audio_with_ffmpeg( |
|
audio_path_str: str, |
|
output_dir_for_chunks: str, |
|
chunk_length_sec: int, |
|
overlap_sec: int |
|
) -> List[str]: |
|
"""ffmpegを使用して大容量ファイルを分割""" |
|
try: |
|
if not shutil.which('ffmpeg'): |
|
print("エラー: ffmpegが見つかりません。4GB以上のファイルを処理するにはffmpegが必要です。") |
|
return [] |
|
|
|
|
|
duration_sec = get_audio_duration_with_ffprobe(audio_path_str) |
|
if duration_sec is None: |
|
print("エラー: ffmpegでの分割処理で音声長を取得できませんでした") |
|
return [] |
|
|
|
chunk_paths_list: List[str] = [] |
|
audio_file_stem = Path(audio_path_str).stem |
|
start_sec = 0 |
|
chunk_idx = 0 |
|
|
|
while start_sec < duration_sec: |
|
|
|
actual_start_sec = max(0, start_sec - (overlap_sec if start_sec > 0 else 0)) |
|
base_end_sec = start_sec + chunk_length_sec |
|
actual_end_sec = min(base_end_sec + (overlap_sec if base_end_sec < duration_sec else 0), duration_sec) |
|
|
|
if actual_start_sec >= actual_end_sec: |
|
break |
|
|
|
chunk_duration = actual_end_sec - actual_start_sec |
|
chunk_file_name = f"{audio_file_stem}_chunk_{chunk_idx:03d}_temp.wav" |
|
chunk_file_path = Path(output_dir_for_chunks) / chunk_file_name |
|
|
|
|
|
cmd = [ |
|
'ffmpeg', '-y', '-loglevel', 'error', |
|
'-ss', str(actual_start_sec), |
|
'-i', audio_path_str, |
|
'-t', str(chunk_duration), |
|
'-acodec', 'pcm_s16le', |
|
'-ar', str(TARGET_SAMPLE_RATE), |
|
'-ac', '1', |
|
str(chunk_file_path) |
|
] |
|
|
|
try: |
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) |
|
if result.returncode == 0: |
|
chunk_paths_list.append(chunk_file_path.as_posix()) |
|
print(f" チャンク {chunk_idx+1}: {actual_start_sec:.1f}s - {actual_end_sec:.1f}s -> {chunk_file_name}") |
|
else: |
|
print(f" エラー: チャンク {chunk_idx} の生成に失敗: {result.stderr}") |
|
except subprocess.TimeoutExpired: |
|
print(f" エラー: チャンク {chunk_idx} の生成がタイムアウトしました") |
|
|
|
start_sec += chunk_length_sec |
|
chunk_idx += 1 |
|
|
|
print(f" ffmpegで音声を {len(chunk_paths_list)} 個のチャンクに分割しました。") |
|
return chunk_paths_list |
|
|
|
except Exception as e: |
|
print(f" エラー: ffmpegでの音声分割中にエラー: {e}") |
|
return [] |
|
|
|
|
|
def process_single_file( |
|
input_file_path_obj: Path, |
|
asr_model_instance: ASRModel, |
|
device_to_use: str, |
|
output_formats_list: List[str] |
|
) -> bool: |
|
input_file_stem = input_file_path_obj.stem |
|
output_and_temp_dir_str = input_file_path_obj.parent.as_posix() |
|
|
|
file_processing_start_time = time.time() |
|
actual_audio_duration_sec: Optional[float] = None |
|
success_status = False |
|
|
|
temp_preprocessed_audio_path_str: Optional[str] = None |
|
temp_chunk_file_paths_str_list: List[str] = [] |
|
|
|
try: |
|
print(f"--- ステップ1/3: {input_file_stem} の音声前処理 ---") |
|
processed_path_for_asr, _, duration_sec_val = preprocess_audio_cli( |
|
input_file_path_obj.as_posix(), output_and_temp_dir_str |
|
) |
|
if not processed_path_for_asr or duration_sec_val is None: |
|
raise Exception("Preprocessing failed") |
|
|
|
actual_audio_duration_sec = duration_sec_val |
|
if processed_path_for_asr != input_file_path_obj.as_posix(): |
|
temp_preprocessed_audio_path_str = processed_path_for_asr |
|
|
|
print(f"--- ステップ2/3: {input_file_stem} の文字起こし (音声長: {actual_audio_duration_sec:.2f}秒) ---") |
|
final_vis_data: Optional[List] = None |
|
final_word_vis_data: Optional[List] = None |
|
|
|
if actual_audio_duration_sec > VERY_LONG_AUDIO_THRESHOLD_SECONDS: |
|
print(f" 情報: 音声長が{VERY_LONG_AUDIO_THRESHOLD_SECONDS/3600:.1f}時間を超えるため、分割処理します。") |
|
chunk_file_paths_str = split_audio_with_overlap_cli( |
|
processed_path_for_asr, output_and_temp_dir_str, |
|
chunk_length_sec=CHUNK_LENGTH_SECONDS, overlap_sec=CHUNK_OVERLAP_SECONDS |
|
) |
|
if not chunk_file_paths_str: |
|
raise Exception(f"{input_file_path_obj.name} のチャンク分割に失敗しました。") |
|
temp_chunk_file_paths_str_list = chunk_file_paths_str[:] |
|
all_vis_data_merged: List[List[str]] = [] |
|
all_word_vis_data_merged: List[List[str]] = [] |
|
current_global_time_offset_sec = 0.0 |
|
last_global_segment_end_time_sec = 0.0 |
|
|
|
|
|
if device_to_use == 'cuda': |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
print(f" 初期GPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB") |
|
|
|
for i, chunk_file_path_str in enumerate(temp_chunk_file_paths_str_list): |
|
print(f" チャンク {i+1}/{len(temp_chunk_file_paths_str_list)} ({Path(chunk_file_path_str).name}) を処理中...") |
|
try: |
|
|
|
if device_to_use == 'cuda': |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
print(f" チャンク処理前のGPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB") |
|
|
|
estimated_chunk_duration_for_asr_settings = CHUNK_LENGTH_SECONDS + CHUNK_OVERLAP_SECONDS |
|
vis_data_chunk, _, word_vis_data_chunk = transcribe_audio_cli( |
|
chunk_file_path_str, asr_model_instance, |
|
estimated_chunk_duration_for_asr_settings, device_to_use |
|
) |
|
|
|
|
|
if device_to_use == 'cuda': |
|
print(f" チャンク処理後のGPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB") |
|
|
|
if not vis_data_chunk: |
|
print(f" 警告: チャンク {Path(chunk_file_path_str).name} の文字起こしに失敗。スキップします。") |
|
current_global_time_offset_sec += CHUNK_LENGTH_SECONDS - (CHUNK_OVERLAP_SECONDS if i < len(temp_chunk_file_paths_str_list) - 1 else 0) |
|
continue |
|
|
|
|
|
for seg_row_list in vis_data_chunk: |
|
s_local_sec = float(seg_row_list[0]) |
|
e_local_sec = float(seg_row_list[1]) |
|
text_seg = seg_row_list[2] |
|
s_global_sec = s_local_sec + current_global_time_offset_sec |
|
e_global_sec = e_local_sec + current_global_time_offset_sec |
|
if s_global_sec >= last_global_segment_end_time_sec - 0.1: |
|
all_vis_data_merged.append([f"{s_global_sec:.2f}", f"{e_global_sec:.2f}", text_seg]) |
|
last_global_segment_end_time_sec = max(last_global_segment_end_time_sec, e_global_sec) |
|
|
|
temp_last_word_global_end_time_sec = float(all_word_vis_data_merged[-1][1]) if all_word_vis_data_merged else 0.0 |
|
if word_vis_data_chunk: |
|
for word_row_list in word_vis_data_chunk: |
|
w_s_local_sec = float(word_row_list[0]) |
|
w_e_local_sec = float(word_row_list[1]) |
|
text_word = word_row_list[2] |
|
w_s_global_sec = w_s_local_sec + current_global_time_offset_sec |
|
w_e_global_sec = w_e_local_sec + current_global_time_offset_sec |
|
if w_s_global_sec >= temp_last_word_global_end_time_sec - 0.05: |
|
all_word_vis_data_merged.append([f"{w_s_global_sec:.2f}", f"{w_e_global_sec:.2f}", text_word]) |
|
temp_last_word_global_end_time_sec = max(temp_last_word_global_end_time_sec, w_e_global_sec) |
|
|
|
if i < len(temp_chunk_file_paths_str_list) - 1: |
|
current_global_time_offset_sec += (CHUNK_LENGTH_SECONDS - CHUNK_OVERLAP_SECONDS) |
|
|
|
|
|
if device_to_use == 'cuda': |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
print(f" メモリクリア後のGPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB") |
|
|
|
except Exception as chunk_proc_e: |
|
print(f" エラー: チャンク {Path(chunk_file_path_str).name} の処理中にエラー: {chunk_proc_e}") |
|
if i < len(temp_chunk_file_paths_str_list) - 1: |
|
current_global_time_offset_sec += (CHUNK_LENGTH_SECONDS - CHUNK_OVERLAP_SECONDS) |
|
|
|
final_vis_data = all_vis_data_merged |
|
final_word_vis_data = all_word_vis_data_merged |
|
if not final_vis_data: |
|
raise Exception("チャンク処理後、有効な文字起こしデータが得られませんでした。") |
|
else: |
|
vis_data_single, _, word_vis_data_single = transcribe_audio_cli( |
|
processed_path_for_asr, asr_model_instance, actual_audio_duration_sec, device_to_use |
|
) |
|
if not vis_data_single: |
|
raise Exception(f"{input_file_path_obj.name} の文字起こしに失敗しました。") |
|
final_vis_data = vis_data_single |
|
final_word_vis_data = word_vis_data_single |
|
|
|
if final_vis_data: |
|
print(f"--- ステップ3/3: {input_file_stem} の文字起こし結果保存 ---") |
|
save_transcripts_cli(output_and_temp_dir_str, input_file_stem, |
|
final_vis_data, final_word_vis_data if final_word_vis_data else [], |
|
formats=output_formats_list) |
|
success_status = True |
|
else: |
|
print(f"情報: {input_file_path_obj.name} の文字起こし結果が空のため、ファイルは保存しませんでした。") |
|
success_status = True |
|
except Exception as e: |
|
print(f"エラー: ファイル {input_file_path_obj.name} の処理中にエラーが発生しました: {e}") |
|
success_status = False |
|
finally: |
|
file_processing_end_time = time.time() |
|
time_taken_seconds = file_processing_end_time - file_processing_start_time |
|
proc_m = int(time_taken_seconds // 60) |
|
proc_s = time_taken_seconds % 60 |
|
|
|
summary_message = f" --- {input_file_stem}: 処理サマリー ---\n" |
|
if actual_audio_duration_sec is not None: |
|
audio_m = int(actual_audio_duration_sec // 60) |
|
audio_s = actual_audio_duration_sec % 60 |
|
summary_message += f" 音声長: {audio_m}分{audio_s:.2f}秒 ({actual_audio_duration_sec:.2f}秒)\n" |
|
else: |
|
summary_message += " 音声長: 不明 (前処理で失敗した可能性があります)\n" |
|
summary_message += f" このファイルの総処理時間: {proc_m}分{proc_s:.2f}秒 ({time_taken_seconds:.2f}秒)\n" |
|
summary_message += f" 処理ステータス: {'成功' if success_status else '失敗'}" |
|
print(summary_message) |
|
|
|
if temp_preprocessed_audio_path_str and Path(temp_preprocessed_audio_path_str).exists(): |
|
try: os.remove(temp_preprocessed_audio_path_str); print(f" 一時ファイル {Path(temp_preprocessed_audio_path_str).name} を削除しました。") |
|
except OSError as e_os: print(f" 警告: 一時ファイル {Path(temp_preprocessed_audio_path_str).name} の削除に失敗: {e_os}") |
|
for chunk_f_str in temp_chunk_file_paths_str_list: |
|
if Path(chunk_f_str).exists(): |
|
try: os.remove(chunk_f_str); print(f" 一時チャンクファイル {Path(chunk_f_str).name} を削除しました。") |
|
except OSError as e_os_chunk: print(f" 警告: 一時チャンクファイル {Path(chunk_f_str).name} の削除に失敗: {e_os_chunk}") |
|
|
|
return success_status |
|
|
|
|
|
def batch_process_directory( |
|
target_dir_str: str, |
|
asr_model_instance: ASRModel, |
|
device_to_use: str, |
|
output_formats: Optional[List[str]] = None |
|
): |
|
batch_start_time = time.time() |
|
if output_formats is None: |
|
output_formats_to_use = DEFAULT_OUTPUT_FORMATS |
|
else: |
|
output_formats_to_use = output_formats |
|
|
|
target_dir_path = Path(target_dir_str) |
|
if not target_dir_path.is_dir(): |
|
print(f"エラー: 指定されたパス '{target_dir_str}' は有効なディレクトリではありません。"); return |
|
|
|
print(f"処理対象ディレクトリ: {target_dir_path.resolve()}") |
|
print(f"入力ファイルの探索優先順位: {', '.join(INPUT_PRIORITY_EXTENSIONS)}") |
|
print(f"出力ファイル形式: {', '.join(output_formats_to_use)}") |
|
|
|
all_files_in_dir = list(target_dir_path.iterdir()) |
|
potential_stems: Set[str] = set() |
|
for f_path_obj in all_files_in_dir: |
|
if f_path_obj.is_file() and f_path_obj.suffix.lower() in INPUT_PRIORITY_EXTENSIONS: |
|
potential_stems.add(f_path_obj.stem) |
|
|
|
if not potential_stems: |
|
print(f"情報: ディレクトリ '{target_dir_path.name}' に対象拡張子のファイルは見つかりませんでした。"); return |
|
print(f"{len(potential_stems)} 個のユニークなファイル名候補が見つかりました。優先順位に従って処理対象を選択します...") |
|
|
|
files_to_actually_process: List[Path] = [] |
|
for stem_name in sorted(list(potential_stems)): |
|
selected_file_for_this_stem: Optional[Path] = None |
|
for ext_priority in INPUT_PRIORITY_EXTENSIONS: |
|
potential_file = target_dir_path / f"{stem_name}{ext_priority}" |
|
if potential_file.exists() and potential_file.is_file(): |
|
selected_file_for_this_stem = potential_file |
|
print(f" ファイル名 '{stem_name}': '{potential_file.name}' を処理対象として選択。") |
|
break |
|
if selected_file_for_this_stem: files_to_actually_process.append(selected_file_for_this_stem) |
|
|
|
if not files_to_actually_process: |
|
print("情報: 優先順位適用後、実際に処理するファイルはありませんでした。"); return |
|
print(f"実際に処理するファイル数: {len(files_to_actually_process)} 個") |
|
|
|
processed_successfully_count = 0 |
|
skipped_due_to_existing_csv_count = 0 |
|
failed_count = 0 |
|
|
|
for input_file_to_process_obj in files_to_actually_process: |
|
print(f"\n======== ファイル処理開始: {input_file_to_process_obj.name} ========") |
|
is_skipped_at_batch_level = False |
|
if "csv" in output_formats_to_use: |
|
output_csv_path_check = input_file_to_process_obj.with_suffix('.csv') |
|
if output_csv_path_check.exists(): |
|
print(f"スキップ (バッチレベル): CSV '{output_csv_path_check.name}' は既に存在します。") |
|
skipped_due_to_existing_csv_count += 1 |
|
is_skipped_at_batch_level = True |
|
print(f"======== ファイル処理終了 (スキップ): {input_file_to_process_obj.name} ========\n") |
|
|
|
if not is_skipped_at_batch_level: |
|
success_flag = process_single_file( |
|
input_file_to_process_obj, |
|
asr_model_instance, |
|
device_to_use, |
|
output_formats_to_use |
|
) |
|
if success_flag: |
|
processed_successfully_count += 1 |
|
else: |
|
failed_count += 1 |
|
|
|
|
|
print("\n======== 全ファイルのバッチ処理が完了しました ========") |
|
total_considered = len(files_to_actually_process) |
|
print(f"総対象ファイル数(優先度選択後): {total_considered}") |
|
print(f" 処理成功ファイル数: {processed_successfully_count}") |
|
print(f" CSV既存によりスキップされたファイル数: {skipped_due_to_existing_csv_count}") |
|
print(f" 処理失敗ファイル数: {failed_count}") |
|
|
|
batch_end_time = time.time() |
|
total_batch_time_seconds = batch_end_time - batch_start_time |
|
batch_m = int(total_batch_time_seconds // 60) |
|
batch_s = total_batch_time_seconds % 60 |
|
print(f"バッチ処理全体の総所要時間: {batch_m}分{batch_s:.2f}秒 ({total_batch_time_seconds:.2f}秒)") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
target_directory_arg: Optional[str] = None |
|
formats_arg_str: str = ",".join(DEFAULT_OUTPUT_FORMATS) |
|
device_arg_str: Optional[str] = None |
|
|
|
if len(sys.argv) == 1: |
|
print("コマンドライン引数なしで起動されました。GUIでディレクトリを選択します。") |
|
try: |
|
import tkinter as tk |
|
from tkinter import filedialog |
|
|
|
def get_directory_from_gui_local() -> Optional[str]: |
|
"""GUIでディレクトリ選択ダイアログを表示し、選択されたパスを返す""" |
|
root = tk.Tk() |
|
root.withdraw() |
|
|
|
root.attributes('-topmost', True) |
|
|
|
initial_dir = "/mnt/t/demucs_folder/htdemucs" |
|
selected_path = filedialog.askdirectory( |
|
title="処理対象のディレクトリを選択してください", |
|
initialdir=initial_dir |
|
) |
|
root.attributes('-topmost', False) |
|
root.destroy() |
|
return selected_path if selected_path else None |
|
|
|
target_directory_arg = get_directory_from_gui_local() |
|
if not target_directory_arg: |
|
print("ディレクトリが選択されませんでした。処理を中止します。") |
|
sys.exit(0) |
|
|
|
print(f"GUIで選択されたディレクトリ: {target_directory_arg}") |
|
print(f"出力フォーマット (デフォルト): {formats_arg_str}") |
|
|
|
|
|
except ImportError: |
|
print("エラー: GUIモードに必要なTkinterライブラリが見つかりません。") |
|
print("Tkinterをインストールするか、コマンドライン引数を使用してスクリプトを実行してください。例:") |
|
print(f" python {Path(sys.argv[0]).name} /path/to/your/audio_directory") |
|
sys.exit(1) |
|
except Exception as e_gui: |
|
print(f"GUIの表示中に予期せぬエラーが発生しました: {e_gui}") |
|
sys.exit(1) |
|
else: |
|
parser = argparse.ArgumentParser( |
|
description="指定されたディレクトリ内の音声/動画ファイルをNVIDIA Parakeet ASRモデルで文字起こしします。\n" |
|
f"同じ名前のファイルが複数ある場合、{' > '.join(INPUT_PRIORITY_EXTENSIONS)} の優先順位で処理します。", |
|
formatter_class=argparse.RawTextHelpFormatter |
|
) |
|
parser.add_argument( |
|
"target_directory", type=str, |
|
help="処理対象のファイルが含まれるディレクトリのパス。" |
|
) |
|
parser.add_argument( |
|
"--formats", type=str, default=",".join(DEFAULT_OUTPUT_FORMATS), |
|
help=(f"出力する文字起こしファイルの形式をカンマ区切りで指定。\n" |
|
f"例: csv,srt (デフォルト: {','.join(DEFAULT_OUTPUT_FORMATS)})\n" |
|
f"利用可能な形式: {','.join(DEFAULT_OUTPUT_FORMATS)}") |
|
) |
|
parser.add_argument( |
|
"--device", type=str, default=None, choices=['cuda', 'cpu'], |
|
help="使用するデバイスを指定 (cuda または cpu)。指定がなければ自動判別。" |
|
) |
|
args = parser.parse_args() |
|
|
|
target_directory_arg = args.target_directory |
|
formats_arg_str = args.formats |
|
device_arg_str = args.device |
|
|
|
|
|
if device_arg_str: selected_device = device_arg_str |
|
else: selected_device = "cuda" if torch.cuda.is_available() else "cpu" |
|
print(f"使用デバイス: {selected_device.upper()}") |
|
if selected_device == "cuda": |
|
if not torch.cuda.is_available(): |
|
print("警告: CUDA指定だが利用不可。CPUを使用します。"); selected_device = "cpu" |
|
else: |
|
try: print(f"CUDAデバイス名: {torch.cuda.get_device_name(0)}") |
|
except Exception as e_cuda_name: print(f"CUDAデバイス名の取得失敗: {e_cuda_name}") |
|
|
|
print(f"ASRモデル '{MODEL_NAME}' をロードしています...") |
|
asr_model_main: Optional[ASRModel] = None |
|
try: |
|
asr_model_main = ASRModel.from_pretrained(model_name=MODEL_NAME) |
|
asr_model_main.eval() |
|
print(f"モデル '{MODEL_NAME}' のロード完了。") |
|
except Exception as model_load_e: |
|
print(f"致命的エラー: ASRモデル '{MODEL_NAME}' のロードに失敗: {model_load_e}"); sys.exit(1) |
|
|
|
output_formats_requested = [fmt.strip().lower() for fmt in formats_arg_str.split(',') if fmt.strip()] |
|
final_output_formats_to_use = [fmt for fmt in output_formats_requested if fmt in DEFAULT_OUTPUT_FORMATS] |
|
if not output_formats_requested and formats_arg_str: |
|
print(f"警告: 指定された出力フォーマット '{formats_arg_str}' は無効です。") |
|
if not final_output_formats_to_use : |
|
print(f"情報: 有効な出力フォーマットが指定されなかったため、デフォルトの全形式 ({','.join(DEFAULT_OUTPUT_FORMATS)}) で出力します。") |
|
final_output_formats_to_use = DEFAULT_OUTPUT_FORMATS |
|
|
|
|
|
if not target_directory_arg: |
|
print("エラー: 処理対象のディレクトリが指定されていません。処理を中止します。") |
|
sys.exit(1) |
|
|
|
if not asr_model_main: |
|
print("致命的エラー: ASRモデルがロードされていません。処理を中止します。") |
|
sys.exit(1) |
|
|
|
batch_process_directory( |
|
target_directory_arg, asr_model_main, selected_device, |
|
output_formats=final_output_formats_to_use |
|
) |