""" Frame Bridge - Batch Processing Module フォルダ内の動画ファイルを順次結合するバッチ処理モジュール """ import os import glob import logging from pathlib import Path from typing import List, Tuple, Optional from .video_processor import FrameBridge # ログ設定 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BatchProcessor: """バッチ処理を行うクラス""" def __init__(self, output_dir: str = "output", exclude_edge_frames: bool = True): """ 初期化 Args: output_dir: 出力ディレクトリ exclude_edge_frames: 最初と最後のフレームを除外するかどうか """ self.frame_bridge = FrameBridge(exclude_edge_frames=exclude_edge_frames) self.output_dir = Path(output_dir) self.output_dir.mkdir(exist_ok=True) self.exclude_edge_frames = exclude_edge_frames # サポートする動画形式 self.supported_formats = ['.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm'] def get_video_files(self, input_dir: str) -> List[str]: """ 指定ディレクトリから動画ファイルを取得し、名前順にソート Args: input_dir: 入力ディレクトリ Returns: ソートされた動画ファイルのリスト """ input_path = Path(input_dir) if not input_path.exists(): logger.error(f"入力ディレクトリが存在しません: {input_dir}") return [] video_files = [] for ext in self.supported_formats: pattern = str(input_path / f"*{ext}") video_files.extend(glob.glob(pattern)) # ファイル名でソート(自然順序) video_files.sort(key=lambda x: os.path.basename(x).lower()) logger.info(f"検出された動画ファイル数: {len(video_files)}") for i, file in enumerate(video_files): logger.info(f" {i+1}. {os.path.basename(file)}") return video_files def process_sequential_merge(self, input_dir: str, output_filename: str = "merged_sequence.mp4") -> Tuple[bool, str, List[dict]]: """ フォルダ内の動画を順次結合 Args: input_dir: 入力ディレクトリ output_filename: 出力ファイル名 Returns: Tuple[成功フラグ, 最終出力パス, 処理結果リスト] """ video_files = self.get_video_files(input_dir) if len(video_files) < 2: return False, "", [{"error": "結合には最低2つの動画ファイルが必要です"}] results = [] current_video = video_files[0] logger.info(f"順次結合処理開始: {len(video_files)}個のファイル") for i in range(1, len(video_files)): next_video = video_files[i] logger.info(f"結合 {i}/{len(video_files)-1}: {os.path.basename(current_video)} + {os.path.basename(next_video)}") # 中間出力ファイル名 if i == len(video_files) - 1: # 最後の結合は最終ファイル名 temp_output = self.output_dir / output_filename else: # 中間ファイル temp_output = self.output_dir / f"temp_merge_{i}.mp4" # 結合処理 result_text, output_path, frame1_path, frame2_path, similarity = self.frame_bridge.process_video_bridge( current_video, next_video ) if output_path and os.path.exists(output_path): # 結果を指定の場所に移動 import shutil shutil.move(output_path, str(temp_output)) result_info = { "step": i, "video1": os.path.basename(current_video), "video2": os.path.basename(next_video), "similarity": similarity, "output": str(temp_output), "success": True } # 次のループでは結合結果を使用 current_video = str(temp_output) logger.info(f"結合完了 {i}/{len(video_files)-1}: 類似度 {similarity:.3f}") else: result_info = { "step": i, "video1": os.path.basename(current_video), "video2": os.path.basename(next_video), "error": result_text, "success": False } logger.error(f"結合失敗 {i}/{len(video_files)-1}: {result_text}") results.append(result_info) # 中間ファイルのクリーンアップ(最後以外) if i > 1 and i < len(video_files) - 1: prev_temp = self.output_dir / f"temp_merge_{i-1}.mp4" if prev_temp.exists(): prev_temp.unlink() final_output = self.output_dir / output_filename success = final_output.exists() if success: logger.info(f"全結合処理完了: {final_output}") logger.info(f"最終ファイルサイズ: {final_output.stat().st_size / (1024*1024):.1f} MB") return success, str(final_output), results def process_pairwise_merge(self, input_dir: str) -> Tuple[bool, List[str], List[dict]]: """ フォルダ内の動画をペアワイズで結合 Args: input_dir: 入力ディレクトリ Returns: Tuple[成功フラグ, 出力ファイルリスト, 処理結果リスト] """ video_files = self.get_video_files(input_dir) if len(video_files) < 2: return False, [], [{"error": "結合には最低2つの動画ファイルが必要です"}] results = [] output_files = [] logger.info(f"ペアワイズ結合処理開始: {len(video_files)}個のファイル") # ペアごとに処理 for i in range(0, len(video_files) - 1, 2): video1 = video_files[i] video2 = video_files[i + 1] if i + 1 < len(video_files) else None if video2 is None: # 奇数個の場合、最後のファイルはそのままコピー import shutil output_name = f"single_{os.path.basename(video1)}" output_path = self.output_dir / output_name shutil.copy2(video1, output_path) output_files.append(str(output_path)) results.append({ "pair": i // 2 + 1, "video1": os.path.basename(video1), "video2": None, "action": "copied", "output": str(output_path), "success": True }) continue logger.info(f"ペア {i//2 + 1}: {os.path.basename(video1)} + {os.path.basename(video2)}") # 出力ファイル名 output_name = f"merged_pair_{i//2 + 1}_{os.path.basename(video1).split('.')[0]}_{os.path.basename(video2).split('.')[0]}.mp4" output_path = self.output_dir / output_name # 結合処理 result_text, temp_output, frame1_path, frame2_path, similarity = self.frame_bridge.process_video_bridge( video1, video2 ) if temp_output and os.path.exists(temp_output): # 結果を指定の場所に移動 import shutil shutil.move(temp_output, str(output_path)) output_files.append(str(output_path)) result_info = { "pair": i // 2 + 1, "video1": os.path.basename(video1), "video2": os.path.basename(video2), "similarity": similarity, "output": str(output_path), "success": True } logger.info(f"ペア結合完了 {i//2 + 1}: 類似度 {similarity:.3f}") else: result_info = { "pair": i // 2 + 1, "video1": os.path.basename(video1), "video2": os.path.basename(video2), "error": result_text, "success": False } logger.error(f"ペア結合失敗 {i//2 + 1}: {result_text}") results.append(result_info) success = len(output_files) > 0 logger.info(f"ペアワイズ結合完了: {len(output_files)}個のファイル出力") return success, output_files, results def generate_report(self, results: List[dict], output_path: str = None) -> str: """ 処理結果のレポートを生成 Args: results: 処理結果リスト output_path: レポート出力パス Returns: レポート文字列 """ report_lines = [ "🎬 Frame Bridge - バッチ処理レポート", "=" * 60, f"📅 処理日時: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", f"📊 総処理数: {len(results)}", "" ] success_count = sum(1 for r in results if r.get('success', False)) report_lines.extend([ f"✅ 成功: {success_count}", f"❌ 失敗: {len(results) - success_count}", "" ]) # 詳細結果 for i, result in enumerate(results, 1): if result.get('success', False): if 'similarity' in result: quality = self._evaluate_quality(result['similarity']) report_lines.extend([ f"📋 処理 {i}: ✅ 成功", f" 📹 動画1: {result.get('video1', 'N/A')}", f" 📹 動画2: {result.get('video2', 'N/A')}", f" 📈 類似度: {result['similarity']:.3f} ({quality})", f" 📁 出力: {os.path.basename(result.get('output', 'N/A'))}", "" ]) else: report_lines.extend([ f"📋 処理 {i}: ✅ {result.get('action', '処理完了')}", f" 📹 ファイル: {result.get('video1', 'N/A')}", f" 📁 出力: {os.path.basename(result.get('output', 'N/A'))}", "" ]) else: report_lines.extend([ f"📋 処理 {i}: ❌ 失敗", f" 📹 動画1: {result.get('video1', 'N/A')}", f" 📹 動画2: {result.get('video2', 'N/A')}", f" ⚠️ エラー: {result.get('error', '不明なエラー')}", "" ]) report_text = "\n".join(report_lines) # ファイルに保存 if output_path: with open(output_path, 'w', encoding='utf-8') as f: f.write(report_text) logger.info(f"レポート保存: {output_path}") return report_text def _evaluate_quality(self, similarity: float) -> str: """類似度から品質を評価""" if similarity > 0.8: return "優秀" elif similarity > 0.6: return "良好" elif similarity > 0.4: return "普通" else: return "要確認"