File size: 20,902 Bytes
ad645ee
2167778
062de48
 
2167778
ee38ee4
062de48
 
 
dc14288
3b31687
ee38ee4
69bef1e
ee38ee4
 
f4b2697
062de48
dc14288
062de48
2d694e6
 
062de48
2d694e6
dc14288
ee38ee4
062de48
dc14288
062de48
44d164b
 
062de48
dc14288
3b31687
dc14288
44d164b
 
 
 
 
 
 
ee38ee4
dc14288
3b31687
dc14288
 
ee38ee4
dc14288
f4b2697
062de48
dc14288
062de48
622c422
1d4a4dd
 
 
 
 
 
2d694e6
d034be2
dc14288
2167778
062de48
 
 
 
 
2167778
dc14288
2167778
dc14288
 
062de48
dc14288
ee38ee4
e931565
062de48
 
 
2d694e6
d034be2
062de48
d034be2
062de48
 
 
 
2d694e6
062de48
1d4a4dd
 
de2187f
3b31687
1d4a4dd
dc14288
 
3b31687
ee38ee4
 
 
dc14288
3b31687
dc14288
3b31687
062de48
 
 
dc14288
34f4b5d
 
062de48
34f4b5d
dc14288
34f4b5d
dc14288
 
 
34f4b5d
dc14288
 
3b31687
062de48
 
 
ee38ee4
 
 
2d694e6
3b31687
ee38ee4
 
 
062de48
 
e931565
062de48
 
 
 
3b31687
ee38ee4
2d694e6
3b31687
062de48
dc14288
 
3b31687
062de48
1d4a4dd
062de48
d034be2
3b31687
062de48
dc14288
 
 
062de48
dc14288
062de48
dc14288
 
 
3b31687
ee38ee4
dc14288
062de48
 
 
 
 
dc14288
 
3b31687
dc14288
ee38ee4
dc14288
 
 
d034be2
 
dc14288
 
 
3b31687
062de48
 
 
ee38ee4
 
 
 
 
 
 
 
062de48
ee38ee4
dc14288
ee38ee4
062de48
 
 
d034be2
dc14288
ee38ee4
2d694e6
3b31687
dc14288
3b31687
dc14288
 
 
3b31687
ee38ee4
a7c3f7d
 
dc14288
 
062de48
 
 
 
 
 
 
 
 
69bef1e
062de48
 
 
 
 
 
 
 
3b31687
1d4a4dd
dc14288
 
d034be2
dc14288
 
3b31687
062de48
 
 
ee38ee4
 
 
 
 
 
 
dc14288
ee38ee4
5e4e72a
062de48
dc14288
 
 
 
3b31687
5e4e72a
 
dc14288
062de48
 
 
 
ee38ee4
5e4e72a
 
3b31687
ee38ee4
062de48
 
 
ee38ee4
dc14288
3b31687
dc14288
 
 
2d694e6
 
 
ee38ee4
dc14288
3b31687
062de48
 
 
ee38ee4
 
 
 
 
 
dc14288
062de48
ee38ee4
a7c3f7d
 
3b31687
dc14288
062de48
ee38ee4
dc14288
 
ee38ee4
3b31687
062de48
 
 
ee38ee4
2d694e6
3b31687
dc14288
 
 
 
3b31687
062de48
 
3b31687
ee38ee4
062de48
 
 
 
 
 
ee38ee4
 
 
3b31687
dc14288
 
2d694e6
a7c3f7d
2d694e6
ee38ee4
dc14288
3b31687
062de48
 
 
ee38ee4
dc14288
 
062de48
 
dc14288
 
 
 
ee38ee4
dc14288
 
062de48
 
 
dc14288
 
 
3b31687
2d694e6
dc14288
 
3b31687
ee38ee4
 
dc14288
3b31687
d034be2
 
062de48
d034be2
69bef1e
e931565
062de48
 
 
2d694e6
69bef1e
ee38ee4
 
69bef1e
ee38ee4
 
 
 
 
 
 
062de48
ee38ee4
dc14288
ee38ee4
 
062de48
 
 
 
 
 
 
 
 
ee38ee4
69bef1e
ee38ee4
 
69bef1e
ee38ee4
062de48
ee38ee4
69bef1e
ee38ee4
69bef1e
e931565
062de48
 
 
69bef1e
 
dc14288
d034be2
dc14288
3b31687
062de48
dc14288
3b31687
062de48
2d694e6
ee38ee4
 
 
062de48
ee38ee4
d034be2
 
f4b2697
e931565
2167778
3b31687
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
#!/usr/bin/env python3
"""
BackgroundFX Pro – Main Application Entry Point
Refactored modular architecture – orchestrates specialised components
"""

# ─────────────────────────────────────────────────────────────────────────────
# 0) Early env/threading hygiene (must run first)
# ─────────────────────────────────────────────────────────────────────────────
import early_env  # sets OMP/MKL/OPENBLAS + torch threads safely

import logging
import threading
from pathlib import Path
from typing import Optional, Tuple, Dict, Any, Callable

# ─────────────────────────────────────────────────────────────────────────────
# 1) Logging
# ─────────────────────────────────────────────────────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("core.app")

# ─────────────────────────────────────────────────────────────────────────────
# 2) Patch Gradio schema early (HF Spaces quirk)
# ─────────────────────────────────────────────────────────────────────────────
try:
    import gradio_client.utils as gc_utils

    _orig_get_type = gc_utils.get_type

    def _patched_get_type(schema):
        if not isinstance(schema, dict):
            if isinstance(schema, bool):
                return "boolean"
            if isinstance(schema, str):
                return "string"
            if isinstance(schema, (int, float)):
                return "number"
            return "string"
        return _orig_get_type(schema)

    gc_utils.get_type = _patched_get_type
    logger.info("Gradio schema patch applied")
except Exception as e:
    logger.warning(f"Gradio patch failed: {e}")

# ─────────────────────────────────────────────────────────────────────────────
# 3) Core config + components
# ─────────────────────────────────────────────────────────────────────────────
from config.app_config import get_config
from core.exceptions import ModelLoadingError, VideoProcessingError
from utils.hardware.device_manager import DeviceManager
from utils.system.memory_manager import MemoryManager
from models.loaders.model_loader import ModelLoader
from processing.video.video_processor import CoreVideoProcessor
from processing.audio.audio_processor import AudioProcessor
from utils.monitoring.progress_tracker import ProgressTracker

# Optional two-stage processor
try:
    from processing.two_stage.two_stage_processor import (
        TwoStageProcessor,
        CHROMA_PRESETS,
    )

    TWO_STAGE_AVAILABLE = True
except Exception:
    TWO_STAGE_AVAILABLE = False
    CHROMA_PRESETS = {"standard": {}}

# Validation helper
from utils.cv_processing import validate_video_file


# ╔══════════════════════════════════════════════════════════════════════════╗
# β•‘                           VideoProcessor class                           β•‘
# β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
class VideoProcessor:
    """
    Main orchestrator – coordinates all specialised components.
    """

    # ─────────────────────────────────────────────────────────────────────
    # Init
    # ─────────────────────────────────────────────────────────────────────
    def __init__(self):
        self.config = get_config()
        self.device_manager = DeviceManager()
        self.memory_manager = MemoryManager(self.device_manager.get_optimal_device())
        self.model_loader = ModelLoader(self.device_manager, self.memory_manager)

        self.audio_processor = AudioProcessor()
        self.core_processor: CoreVideoProcessor | None = None
        self.two_stage_processor: TwoStageProcessor | None = None

        self.models_loaded = False
        self.loading_lock = threading.Lock()
        self.cancel_event = threading.Event()
        self.progress_tracker: ProgressTracker | None = None

        logger.info(f"VideoProcessor on device: {self.device_manager.get_optimal_device()}")

    # ─────────────────────────────────────────────────────────────────────
    # Progress helper
    # ─────────────────────────────────────────────────────────────────────
    def _init_progress(self, video_path: str, cb: Optional[Callable] = None):
        try:
            import cv2

            cap = cv2.VideoCapture(video_path)
            total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            cap.release()
            if total <= 0:
                total = 100
            self.progress_tracker = ProgressTracker(total, cb)
        except Exception as e:
            logger.warning(f"Progress init failed: {e}")
            self.progress_tracker = ProgressTracker(100, cb)

    # ─────────────────────────────────────────────────────────────────────
    # Model loading
    # ─────────────────────────────────────────────────────────────────────
    def load_models(self, progress_callback: Optional[Callable] = None) -> str:
        with self.loading_lock:
            if self.models_loaded:
                return "Models already loaded and validated"

            try:
                self.cancel_event.clear()
                if progress_callback:
                    progress_callback(
                        0.0, f"Loading on {self.device_manager.get_optimal_device()}"
                    )

                sam2_loaded, mat_loaded = self.model_loader.load_all_models(
                    progress_callback=progress_callback, cancel_event=self.cancel_event
                )

                if self.cancel_event.is_set():
                    return "Model loading cancelled"

                # Unwrap actual predictor / model objects
                sam2_predictor = sam2_loaded.model if sam2_loaded else None
                mat_model = mat_loaded.model if mat_loaded else None

                # Core single-stage processor
                self.core_processor = CoreVideoProcessor(
                    config=self.config, models=self.model_loader
                )

                # Two-stage processor (optional)
                if TWO_STAGE_AVAILABLE and (sam2_predictor or mat_model):
                    try:
                        self.two_stage_processor = TwoStageProcessor(
                            sam2_predictor=sam2_predictor, matanyone_model=mat_model
                        )
                        logger.info("Two-stage processor initialised")
                    except Exception as e:
                        logger.warning(f"Two-stage init failed: {e}")
                        self.two_stage_processor = None

                self.models_loaded = True
                msg = self.model_loader.get_load_summary()
                msg += (
                    "\nβœ… Two-stage processor ready"
                    if self.two_stage_processor
                    else "\n⚠️ Two-stage processor not available"
                )
                logger.info(msg)
                return msg

            except (AttributeError, ModelLoadingError) as e:
                self.models_loaded = False
                err = f"Model loading failed: {e}"
                logger.error(err)
                return err
            except Exception as e:
                self.models_loaded = False
                err = f"Unexpected error during model loading: {e}"
                logger.error(err)
                return err

    # ─────────────────────────────────────────────────────────────────────
    # Public entry – process video
    # ─────────────────────────────────────────────────────────────────────
    def process_video(
        self,
        video_path: str,
        background_choice: str,
        custom_background_path: Optional[str] = None,
        progress_callback: Optional[Callable] = None,
        use_two_stage: bool = False,
        chroma_preset: str = "standard",
        key_color_mode: str = "auto",          # NEW
        preview_mask: bool = False,
        preview_greenscreen: bool = False,
    ) -> Tuple[Optional[str], str]:
        """
        Dispatch to single-stage or two-stage pipeline.
        """
        if not self.models_loaded or not self.core_processor:
            return None, "Models not loaded. Please click β€œLoad Models” first."
        if self.cancel_event.is_set():
            return None, "Processing cancelled"

        self._init_progress(video_path, progress_callback)

        ok, why = validate_video_file(video_path)
        if not ok:
            return None, f"Invalid video: {why}"

        try:
            if use_two_stage:
                if not TWO_STAGE_AVAILABLE:
                    return None, "Two-stage processing not available on this build"
                if not self.two_stage_processor:
                    return None, "Two-stage processor not initialised"
                return self._process_two_stage(
                    video_path,
                    background_choice,
                    custom_background_path,
                    progress_callback,
                    chroma_preset,
                    key_color_mode,           # NEW
                )
            else:
                return self._process_single_stage(
                    video_path,
                    background_choice,
                    custom_background_path,
                    progress_callback,
                    preview_mask,
                    preview_greenscreen,
                )

        except VideoProcessingError as e:
            logger.error(f"Processing failed: {e}")
            return None, f"Processing failed: {e}"
        except Exception as e:
            logger.error(f"Unexpected processing error: {e}")
            return None, f"Unexpected error: {e}"

    # ─────────────────────────────────────────────────────────────────────
    # Private – single-stage
    # ─────────────────────────────────────────────────────────────────────
    def _process_single_stage(
        self,
        video_path: str,
        background_choice: str,
        custom_background_path: Optional[str],
        progress_callback: Optional[Callable],
        preview_mask: bool,
        preview_greenscreen: bool,
    ) -> Tuple[Optional[str], str]:
        import time

        ts = int(time.time())
        out_dir = Path(self.config.output_dir) / "single_stage"
        out_dir.mkdir(parents=True, exist_ok=True)
        out_path = str(out_dir / f"processed_{ts}.mp4")

        result = self.core_processor.process_video(
            input_path=video_path,
            output_path=out_path,
            bg_config={
                "background_choice": background_choice,
                "custom_path": custom_background_path,
            },
        )
        if not result:
            return None, "Video processing failed"

        if not (preview_mask or preview_greenscreen):
            final_path = self.audio_processor.add_audio_to_video(
                original_video=video_path, processed_video=out_path
            )
        else:
            final_path = out_path

        msg = (
            "Processing completed.\n"
            f"Frames: {result.get('frames', 'unknown')}\n"
            f"Background: {background_choice}\n"
            f"Mode: Single-stage\n"
            f"Device: {self.device_manager.get_optimal_device()}"
        )
        return final_path, msg

    # ─────────────────────────────────────────────────────────────────────
    # Private – two-stage
    # ─────────────────────────────────────────────────────────────────────
    def _process_two_stage(
        self,
        video_path: str,
        background_choice: str,
        custom_background_path: Optional[str],
        progress_callback: Optional[Callable],
        chroma_preset: str,
        key_color_mode: str,               # NEW
    ) -> Tuple[Optional[str], str]:
        if self.two_stage_processor is None:
            return None, "Two-stage processor not available"

        import cv2, time

        cap = cv2.VideoCapture(video_path)
        w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        cap.release()

        background = self.core_processor.prepare_background(
            background_choice, custom_background_path, w, h
        )
        if background is None:
            return None, "Failed to prepare background"

        ts = int(time.time())
        out_dir = Path(self.config.output_dir) / "two_stage"
        out_dir.mkdir(parents=True, exist_ok=True)
        final_out = str(out_dir / f"final_{ts}.mp4")

        chroma_cfg = CHROMA_PRESETS.get(chroma_preset, CHROMA_PRESETS["standard"])
        logger.info(f"Two-stage with preset: {chroma_preset} and key_color_mode={key_color_mode}")

        result, message = self.two_stage_processor.process_full_pipeline(
            video_path,
            background,
            final_out,
            key_color_mode=key_color_mode,   # NEW
            chroma_settings=chroma_cfg,
            progress_callback=progress_callback,
        )
        if result is None:
            return None, message

        msg = (
            "Two-stage processing completed.\n"
            f"Background: {background_choice}\n"
            f"Chroma Preset: {chroma_preset}\n"
            f"Device: {self.device_manager.get_optimal_device()}"
        )
        return result, msg

    # ─────────────────────────────────────────────────────────────────────
    # Status helpers
    # ─────────────────────────────────────────────────────────────────────
    def get_status(self) -> Dict[str, Any]:
        status = {
            "models_loaded": self.models_loaded,
            "two_stage_available": TWO_STAGE_AVAILABLE
            and (self.two_stage_processor is not None),
            "device": str(self.device_manager.get_optimal_device()),
            "memory_usage": self.memory_manager.get_memory_usage(),
            "config": self.config.to_dict(),
            "core_processor_loaded": self.core_processor is not None,
        }
        try:
            status["sam2_loaded"] = self.model_loader.get_sam2() is not None
            status["matanyone_loaded"] = (
                self.model_loader.get_matanyone() is not None
            )
        except Exception:
            status["sam2_loaded"] = False
            status["matanyone_loaded"] = False

        if self.progress_tracker:
            status["progress"] = self.progress_tracker.get_all_progress()
        return status

    def cancel_processing(self):
        self.cancel_event.set()
        logger.info("Cancellation requested")

    def cleanup_resources(self):
        self.memory_manager.cleanup_aggressive()
        self.model_loader.cleanup()
        logger.info("Resources cleaned up")


# ╔══════════════════════════════════════════════════════════════════════════╗
# β•‘                         Singleton instance + wrappers                    β•‘
# β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
processor = VideoProcessor()

def load_models_with_validation(progress_callback: Optional[Callable] = None) -> str:
    return processor.load_models(progress_callback)

def process_video_fixed(
    video_path: str,
    background_choice: str,
    custom_background_path: Optional[str],
    progress_callback: Optional[Callable] = None,
    use_two_stage: bool = False,
    chroma_preset: str = "standard",
    key_color_mode: str = "auto",          # NEW
    preview_mask: bool = False,
    preview_greenscreen: bool = False,
) -> Tuple[Optional[str], str]:
    return processor.process_video(
        video_path,
        background_choice,
        custom_background_path,
        progress_callback,
        use_two_stage,
        chroma_preset,
        key_color_mode,                     # NEW
        preview_mask,
        preview_greenscreen,
    )

def get_model_status() -> Dict[str, Any]:
    return processor.get_status()

def get_cache_status() -> Dict[str, Any]:
    # Placeholder – could expose FS cache size, etc.
    return processor.get_status()

PROCESS_CANCELLED = processor.cancel_event


# ╔══════════════════════════════════════════════════════════════════════════╗
# β•‘                                    CLI                                  β•‘
# β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
def main():
    try:
        logger.info("Starting BackgroundFX Pro")
        logger.info(f"Device: {processor.device_manager.get_optimal_device()}")
        logger.info(f"Two-stage available: {TWO_STAGE_AVAILABLE}")

        # UI lives in ui/components.py
        from ui.components import create_interface

        demo = create_interface()
        demo.queue().launch(
            server_name="0.0.0.0",
            server_port=7860,
            show_error=True,
            debug=False,
        )
    finally:
        processor.cleanup_resources()


if __name__ == "__main__":
    main()