MogensR commited on
Commit
7a8a04b
·
1 Parent(s): 968e618

Update processing/two_stage/two_stage_processor.py

Browse files
processing/two_stage/two_stage_processor.py CHANGED
@@ -3,353 +3,449 @@
3
  Two-Stage Green Screen Processing System
4
  Stage 1: Original → Green Screen
5
  Stage 2: Green Screen → Final Background
 
 
 
 
 
 
6
  """
7
 
 
 
8
  import cv2
9
  import numpy as np
10
  import os
 
 
11
  import pickle
12
  import logging
13
- from pathlib import Path
14
  import tempfile
15
  import traceback
16
- from utils.refinement.mask_refiner import refine_mask_hq # Updated import path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17
 
18
- logger = logging.getLogger(__name__)
19
 
20
  class TwoStageProcessor:
21
- """Handle two-stage video processing with green screen intermediate"""
22
-
 
 
 
 
23
  def __init__(self, sam2_predictor=None, matanyone_model=None):
24
- self.sam2_predictor = sam2_predictor
25
- self.matanyone_model = matanyone_model
 
 
 
 
 
26
  self.mask_cache_dir = Path("/tmp/mask_cache")
27
  self.mask_cache_dir.mkdir(exist_ok=True, parents=True)
28
-
29
- def stage1_extract_to_greenscreen(self, video_path, output_path, progress_callback=None):
 
 
 
 
 
 
 
 
 
 
 
 
 
30
  """
31
- Stage 1: Extract person and create green screen video
32
- Also saves masks for potential reuse
33
  """
34
  def _prog(pct: float, desc: str):
35
  if progress_callback:
36
- progress_callback(pct, desc)
37
-
 
 
 
38
  try:
39
- _prog(0.0, "Stage 1: Extracting to green screen...")
40
-
41
  cap = cv2.VideoCapture(video_path)
42
  if not cap.isOpened():
43
- logger.error("Could not open video file")
44
- return None, "Could not open video file"
45
-
46
- fps = cap.get(cv2.CAP_PROP_FPS)
47
- total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
48
  width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
49
  height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
50
-
51
- # Pure green background for chroma keying
52
- green_bg = np.zeros((height, width, 3), dtype=np.uint8)
53
- green_bg[:, :] = [0, 255, 0] # Pure green in BGR
54
-
55
- # Setup output using core app's create_video_writer
56
- from core.app import create_video_writer # Updated import path
57
- out, actual_output_path = create_video_writer(output_path, fps, width, height)
58
- if out is None:
59
  cap.release()
60
- logger.error("Could not create output video file")
61
- return None, "Could not create output video file"
62
- output_path = actual_output_path
63
-
64
- # Storage for masks (for potential reuse)
65
- masks = []
66
- frame_count = 0
67
-
68
  while True:
69
- ret, frame = cap.read()
70
- if not ret:
 
 
 
 
71
  break
72
-
73
- _prog(0.1 + (frame_count / max(1, total_frames)) * 0.8,
74
- f"Stage 1: Processing frame {frame_count + 1}/{total_frames}")
75
-
76
- # Get mask using SAM2
77
- mask = self._extract_person_mask(frame)
 
 
 
 
 
78
  masks.append(mask)
79
-
80
- # Refine mask every 3rd frame with MatAnyone
81
- if frame_count % 3 == 0 and self.matanyone_model:
82
- mask = self._refine_mask(frame, mask)
83
-
84
- # Apply green screen with HARD edges for clean keying
85
- result = self._apply_greenscreen_hard(frame, mask, green_bg)
86
- out.write(result)
87
-
88
- frame_count += 1
89
-
 
90
  cap.release()
91
- out.release()
92
-
93
- # Save masks for potential reuse
94
- mask_file = self.mask_cache_dir / f"{Path(output_path).stem}_masks.pkl"
95
  try:
96
- with open(mask_file, 'wb') as f:
 
97
  pickle.dump(masks, f)
98
- logger.info(f"Masks saved to {mask_file}")
99
  except Exception as e:
100
- logger.warning(f"Failed to save masks: {e}")
101
-
102
- _prog(1.0, "Stage 1 complete: Green screen created")
103
- return output_path, f"Green screen created: {frame_count} frames"
104
-
105
  except Exception as e:
106
  logger.error(f"Stage 1 error: {e}\n{traceback.format_exc()}")
107
- return None, f"Stage 1 failed: {str(e)}"
108
-
109
- def stage2_greenscreen_to_final(self, greenscreen_path, background, output_path,
110
- chroma_settings=None, progress_callback=None):
 
 
 
 
 
 
 
 
 
 
111
  """
112
- Stage 2: Replace green screen with final background using chroma keying
 
113
  """
114
  def _prog(pct: float, desc: str):
115
  if progress_callback:
116
- progress_callback(pct, desc)
117
-
118
- if chroma_settings is None:
119
- chroma_settings = CHROMA_PRESETS['standard']
120
-
121
  try:
122
- _prog(0.0, "Stage 2: Applying final background...")
123
-
124
  cap = cv2.VideoCapture(greenscreen_path)
125
  if not cap.isOpened():
126
- logger.error("Could not open green screen video")
127
  return None, "Could not open green screen video"
128
-
129
- fps = cap.get(cv2.CAP_PROP_FPS)
130
- total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
131
  width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
132
  height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
133
-
134
- # Prepare background
 
 
 
 
 
135
  if isinstance(background, str):
136
- bg = cv2.imread(background)
137
  if bg is None:
138
- logger.error("Could not load background image")
 
139
  return None, "Could not load background image"
140
  else:
141
  bg = background
142
-
143
- bg = cv2.resize(bg, (width, height))
144
-
145
- # Setup output using core app's create_video_writer
146
- from core.app import create_video_writer # Updated import path
147
- out, actual_output_path = create_video_writer(output_path, fps, width, height)
148
- if out is None:
149
- cap.release()
150
- logger.error("Could not create output video file")
151
- return None, "Could not create output video file"
152
- output_path = actual_output_path
153
-
154
- frame_count = 0
155
-
156
  while True:
157
- ret, frame = cap.read()
158
- if not ret:
159
  break
160
-
161
- _prog(0.1 + (frame_count / max(1, total_frames)) * 0.8,
162
- f"Stage 2: Compositing frame {frame_count + 1}/{total_frames}")
163
-
164
- # Apply chroma keying
165
- result = self._chroma_key_advanced(frame, bg, chroma_settings)
166
- out.write(result)
167
-
168
- frame_count += 1
169
-
 
 
 
 
 
170
  cap.release()
171
- out.release()
172
-
173
- _prog(1.0, "Stage 2 complete: Final video created")
174
- return output_path, f"Final video created: {frame_count} frames"
175
-
176
  except Exception as e:
177
  logger.error(f"Stage 2 error: {e}\n{traceback.format_exc()}")
178
- return None, f"Stage 2 failed: {str(e)}"
179
-
180
- def _extract_person_mask(self, frame):
181
- """Extract person mask using SAM2"""
182
- if self.sam2_predictor is None:
183
- logger.warning("SAM2 predictor not available, using fallback mask")
184
- h, w = frame.shape[:2]
185
- mask = np.zeros((h, w), dtype=np.uint8)
186
- mask[h//6:5*h//6, w//4:3*w//4] = 255
187
- return mask
188
-
 
 
 
 
 
 
 
189
  try:
190
- self.sam2_predictor.set_image(frame)
191
- h, w = frame.shape[:2]
192
-
193
- # Strategic points for person
194
- points = np.array([
195
- [w//2, h//3], # Head
196
- [w//2, h//2], # Torso
197
- [w//2, 2*h//3], # Lower body
198
- [w//3, h//2], # Left
199
- [2*w//3, h//2], # Right
200
- ])
201
- labels = np.ones(len(points))
202
-
203
- masks, scores, _ = self.sam2_predictor.predict(
204
- point_coords=points,
205
- point_labels=labels,
206
- multimask_output=True
207
  )
208
-
209
- best_idx = np.argmax(scores)
210
- mask = masks[best_idx]
211
-
212
- if mask.dtype != np.uint8:
213
- mask = (mask * 255).astype(np.uint8)
214
-
215
- return mask
216
-
217
- except Exception as e:
218
- logger.error(f"Mask extraction error: {e}\n{traceback.format_exc()}")
219
- h, w = frame.shape[:2]
220
- mask = np.zeros((h, w), dtype=np.uint8)
221
- mask[h//6:5*h//6, w//4:3*w//4] = 255
222
- return mask
223
-
224
- def _refine_mask(self, frame, mask):
225
- """Refine mask using MatAnyone if available"""
226
- if self.matanyone_model is None:
227
- logger.warning("MatAnyone not available, using original mask")
228
- return mask
229
-
 
 
 
 
 
 
 
 
 
 
 
 
230
  try:
231
- refined_mask = refine_mask_hq(frame, mask, self.matanyone_model)
232
- logger.info("MatAnyone mask refinement successful")
233
- return refined_mask
 
 
 
 
 
 
 
 
 
 
234
  except Exception as e:
235
- logger.warning(f"MatAnyone refinement failed: {e}\n{traceback.format_exc()}")
236
- return mask
237
-
238
- def _apply_greenscreen_hard(self, frame, mask, green_bg):
239
- """Apply green screen with hard edges for clean chroma keying"""
 
 
 
240
  try:
241
- # Binary threshold for clean edges
242
- _, mask_binary = cv2.threshold(mask, 140, 255, cv2.THRESH_BINARY)
243
-
244
- # No feathering - we want hard edges for chroma keying
245
- mask_3ch = cv2.cvtColor(mask_binary, cv2.COLOR_GRAY2BGR)
246
- mask_norm = mask_3ch.astype(float) / 255
247
-
248
- # Composite
249
- result = frame * mask_norm + green_bg * (1 - mask_norm)
250
- return result.astype(np.uint8)
251
  except Exception as e:
252
- logger.error(f"Greenscreen application error: {e}\n{traceback.format_exc()}")
253
- return frame
254
-
255
- def _chroma_key_advanced(self, frame, background, settings):
256
- """Advanced chroma keying with spill suppression"""
 
 
 
 
 
257
  try:
258
- key_color = np.array(settings['key_color'], dtype=np.uint8)
259
- tolerance = settings['tolerance']
260
- softness = settings['edge_softness']
261
- spill_suppress = settings['spill_suppression']
262
-
263
- # Convert to float for processing
264
- frame_float = frame.astype(np.float32)
265
- bg_float = background.astype(np.float32)
266
-
267
- # Calculate color distance from key color
268
- diff = np.abs(frame_float - key_color)
269
- distance = np.sqrt(np.sum(diff ** 2, axis=2))
270
-
271
- # Create mask based on distance
272
- mask = np.where(distance < tolerance, 0, 1)
273
-
274
- # Edge softening
275
- if softness > 0:
276
- mask = cv2.GaussianBlur(mask.astype(np.float32),
277
- (softness*2+1, softness*2+1),
278
- softness)
279
-
280
- # Spill suppression - reduce green in edges
281
- if spill_suppress > 0:
282
- green_channel = frame_float[:, :, 1]
283
- spill_mask = np.where(mask < 1, 1 - mask, 0)
284
- green_suppression = green_channel * spill_mask * spill_suppress
285
- frame_float[:, :, 1] -= green_suppression
286
- frame_float = np.clip(frame_float, 0, 255)
287
-
288
- # Expand mask to 3 channels
289
- mask_3ch = np.stack([mask] * 3, axis=2)
290
-
291
- # Composite
292
- result = frame_float * mask_3ch + bg_float * (1 - mask_3ch)
293
- return np.clip(result, 0, 255).astype(np.uint8)
294
  except Exception as e:
295
- logger.error(f"Chroma keying error: {e}\n{traceback.format_exc()}")
296
  return frame
297
-
298
- def process_full_pipeline(self, video_path, background, final_output,
299
- chroma_settings=None, progress_callback=None):
300
  """
301
- Run the complete two-stage pipeline
302
  """
303
  try:
304
- # Stage 1: Create green screen
305
- greenscreen_path = tempfile.mktemp(suffix='_greenscreen.mp4')
306
- gs_result, gs_msg = self.stage1_extract_to_greenscreen(
307
- video_path, greenscreen_path, progress_callback
308
- )
309
-
310
- if gs_result is None:
311
- logger.error(f"Stage 1 failed: {gs_msg}")
312
- return None, gs_msg
313
-
314
- # Stage 2: Apply final background
315
- final_result, final_msg = self.stage2_greenscreen_to_final(
316
- greenscreen_path, background, final_output,
317
- chroma_settings, progress_callback
318
- )
319
-
320
- # Cleanup
321
- try:
322
- os.remove(greenscreen_path)
323
- except Exception as e:
324
- logger.warning(f"Failed to clean up greenscreen file: {e}")
325
-
326
- if final_result is None:
327
- logger.error(f"Stage 2 failed: {final_msg}")
328
- return None, final_msg
329
-
330
- return final_result, final_msg
331
- except Exception as e:
332
- logger.error(f"Full pipeline error: {e}\n{traceback.format_exc()}")
333
- return None, f"Full pipeline failed: {str(e)}"
334
 
335
- # Chroma key settings presets (aligned with app.py and ui_components.py)
336
- CHROMA_PRESETS = {
337
- 'standard': {
338
- 'key_color': [0, 255, 0],
339
- 'tolerance': 40,
340
- 'edge_softness': 2,
341
- 'spill_suppression': 0.3
342
- },
343
- 'studio': {
344
- 'key_color': [0, 255, 0],
345
- 'tolerance': 30,
346
- 'edge_softness': 1,
347
- 'spill_suppression': 0.4
348
- },
349
- 'outdoor': {
350
- 'key_color': [0, 255, 0],
351
- 'tolerance': 50,
352
- 'edge_softness': 3,
353
- 'spill_suppression': 0.2
354
- }
355
- }
 
 
 
 
 
 
 
3
  Two-Stage Green Screen Processing System
4
  Stage 1: Original → Green Screen
5
  Stage 2: Green Screen → Final Background
6
+
7
+ This version is aligned with the current project structure:
8
+ - Uses segment/refine helpers from utils.cv_processing
9
+ - Has its own safe create_video_writer (no core.app dependency)
10
+ - Supports cancel via stop_event
11
+ - Robust SAM2 predictor handling
12
  """
13
 
14
+ from __future__ import annotations
15
+
16
  import cv2
17
  import numpy as np
18
  import os
19
+ import io
20
+ import gc
21
  import pickle
22
  import logging
 
23
  import tempfile
24
  import traceback
25
+ from pathlib import Path
26
+ from typing import Optional, Dict, Any, Callable
27
+
28
+ from utils.cv_processing import (
29
+ segment_person_hq,
30
+ refine_mask_hq,
31
+ )
32
+
33
+ try:
34
+ from utils.logger import get_logger
35
+ logger = get_logger(__name__)
36
+ except Exception:
37
+ logger = logging.getLogger(__name__)
38
+
39
+
40
+ # ---------------------------
41
+ # Small local video I/O helper
42
+ # ---------------------------
43
+ def create_video_writer(
44
+ output_path: str,
45
+ fps: float,
46
+ width: int,
47
+ height: int,
48
+ prefer_mp4: bool = True,
49
+ ):
50
+ """
51
+ Create a cv2.VideoWriter with sane defaults.
52
+ Returns (writer, actual_output_path) or (None, output_path) on failure.
53
+ """
54
+ try:
55
+ ext = ".mp4" if prefer_mp4 else ".avi"
56
+ if not output_path:
57
+ output_path = tempfile.mktemp(suffix=ext)
58
+ else:
59
+ base, curr_ext = os.path.splitext(output_path)
60
+ if curr_ext.lower() not in [".mp4", ".avi", ".mov", ".mkv"]:
61
+ output_path = base + ext
62
+
63
+ # pick codec
64
+ # mp4v works widely on Spaces; if that fails, try XVID
65
+ fourcc = cv2.VideoWriter_fourcc(*"mp4v") if prefer_mp4 else cv2.VideoWriter_fourcc(*"XVID")
66
+ writer = cv2.VideoWriter(output_path, fourcc, float(fps), (int(width), int(height)))
67
+ if not writer or not writer.isOpened():
68
+ # Fallback
69
+ alt_ext = ".avi" if prefer_mp4 else ".mp4"
70
+ alt_fourcc = cv2.VideoWriter_fourcc(*"XVID") if prefer_mp4 else cv2.VideoWriter_fourcc(*"mp4v")
71
+ alt_path = os.path.splitext(output_path)[0] + alt_ext
72
+ writer = cv2.VideoWriter(alt_path, alt_fourcc, float(fps), (int(width), int(height)))
73
+ if not writer or not writer.isOpened():
74
+ return None, output_path
75
+ return writer, alt_path
76
+
77
+ return writer, output_path
78
+ except Exception as e:
79
+ logger.error(f"create_video_writer failed: {e}")
80
+ return None, output_path
81
+
82
+
83
+ # ---------------------------
84
+ # Chroma key presets
85
+ # ---------------------------
86
+ CHROMA_PRESETS: Dict[str, Dict[str, Any]] = {
87
+ 'standard': {
88
+ 'key_color': [0, 255, 0], # pure green (BGR)
89
+ 'tolerance': 38, # color distance threshold
90
+ 'edge_softness': 2, # Gaussian kernel radius
91
+ 'spill_suppression': 0.35, # 0..1
92
+ },
93
+ 'studio': {
94
+ 'key_color': [0, 255, 0],
95
+ 'tolerance': 30,
96
+ 'edge_softness': 1,
97
+ 'spill_suppression': 0.45,
98
+ },
99
+ 'outdoor': {
100
+ 'key_color': [0, 255, 0],
101
+ 'tolerance': 50,
102
+ 'edge_softness': 3,
103
+ 'spill_suppression': 0.25,
104
+ },
105
+ }
106
 
 
107
 
108
  class TwoStageProcessor:
109
+ """
110
+ Handle two-stage video processing with a green screen intermediate.
111
+ - Stage 1: generate clean green screen video (hard edges; great for chroma key)
112
+ - Stage 2: chroma-key that green to your final background
113
+ """
114
+
115
  def __init__(self, sam2_predictor=None, matanyone_model=None):
116
+ # We expect `sam2_predictor` to behave like SAM2ImagePredictor:
117
+ # .set_image(np.ndarray)
118
+ # .predict(point_coords=..., point_labels=..., multimask_output=True)
119
+ # If you passed a wrapper, we’ll try to unwrap it.
120
+ self.sam2 = self._unwrap_sam2(sam2_predictor)
121
+ self.matanyone = matanyone_model
122
+
123
  self.mask_cache_dir = Path("/tmp/mask_cache")
124
  self.mask_cache_dir.mkdir(exist_ok=True, parents=True)
125
+
126
+ logger.info("TwoStageProcessor initialized. "
127
+ f"SAM2 available: {self.sam2 is not None} | "
128
+ f"MatAnyOne available: {self.matanyone is not None}")
129
+
130
+ # ---------------------------
131
+ # Stage 1: Original → Green
132
+ # ---------------------------
133
+ def stage1_extract_to_greenscreen(
134
+ self,
135
+ video_path: str,
136
+ output_path: str,
137
+ progress_callback: Optional[Callable[[float, str], None]] = None,
138
+ stop_event: Optional["threading.Event"] = None,
139
+ ):
140
  """
141
+ Extract foreground to a pure green background.
142
+ Saves per-frame masks (pickle) next to the output for optional reuse.
143
  """
144
  def _prog(pct: float, desc: str):
145
  if progress_callback:
146
+ try:
147
+ progress_callback(float(pct), str(desc))
148
+ except Exception:
149
+ pass
150
+
151
  try:
152
+ _prog(0.0, "Stage 1: Preparing…")
 
153
  cap = cv2.VideoCapture(video_path)
154
  if not cap.isOpened():
155
+ return None, "Could not open input video"
156
+
157
+ fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
158
+ total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
 
159
  width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
160
  height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
161
+
162
+ writer, output_path = create_video_writer(output_path, fps, width, height)
163
+ if writer is None:
 
 
 
 
 
 
164
  cap.release()
165
+ return None, "Could not create output writer"
166
+
167
+ green_bg = np.zeros((height, width, 3), dtype=np.uint8)
168
+ green_bg[:, :] = [0, 255, 0] # BGR Pure Green
169
+
170
+ masks: list[np.ndarray] = []
171
+ frame_idx = 0
172
+
173
  while True:
174
+ if stop_event is not None and stop_event.is_set():
175
+ _prog(1.0, "Stage 1: Cancelled")
176
+ break
177
+
178
+ ok, frame = cap.read()
179
+ if not ok:
180
  break
181
+
182
+ # 1) get a mask (SAM2 w/ smart points via segment_person_hq)
183
+ mask = self._get_mask(frame)
184
+
185
+ # 2) refine occasionally with MatAnyOne to keep it light
186
+ if (self.matanyone is not None) and (frame_idx % 3 == 0):
187
+ try:
188
+ mask = refine_mask_hq(frame, mask, self.matanyone, fallback_enabled=True)
189
+ except Exception as e:
190
+ logger.warning(f"MatAnyOne refine failed (frame {frame_idx}): {e}")
191
+
192
  masks.append(mask)
193
+
194
+ # 3) HARD-edge composite to green (no feather here)
195
+ green = self._apply_greenscreen_hard(frame, mask, green_bg)
196
+ writer.write(green)
197
+
198
+ frame_idx += 1
199
+ if total > 0:
200
+ pct = 0.05 + 0.9 * (frame_idx / total)
201
+ else:
202
+ pct = min(0.95, 0.05 + frame_idx * 0.002)
203
+ _prog(pct, f"Stage 1: {frame_idx}/{total or '?'} frames")
204
+
205
  cap.release()
206
+ writer.release()
207
+
208
+ # Save masks (best-effort)
 
209
  try:
210
+ mask_file = self.mask_cache_dir / (Path(output_path).stem + "_masks.pkl")
211
+ with open(mask_file, "wb") as f:
212
  pickle.dump(masks, f)
213
+ logger.info(f"Stage 1: saved masks {mask_file}")
214
  except Exception as e:
215
+ logger.warning(f"Stage 1: failed to save masks: {e}")
216
+
217
+ _prog(1.0, "Stage 1: Complete")
218
+ return output_path, f"Green screen video created ({frame_idx} frames)"
219
+
220
  except Exception as e:
221
  logger.error(f"Stage 1 error: {e}\n{traceback.format_exc()}")
222
+ return None, f"Stage 1 failed: {e}"
223
+
224
+ # ---------------------------
225
+ # Stage 2: Green → Final BG
226
+ # ---------------------------
227
+ def stage2_greenscreen_to_final(
228
+ self,
229
+ greenscreen_path: str,
230
+ background: np.ndarray | str,
231
+ output_path: str,
232
+ chroma_settings: Optional[Dict[str, Any]] = None,
233
+ progress_callback: Optional[Callable[[float, str], None]] = None,
234
+ stop_event: Optional["threading.Event"] = None,
235
+ ):
236
  """
237
+ Replace green screen with the given background using chroma keying.
238
+ `background` may be a path or an already-loaded image (BGR).
239
  """
240
  def _prog(pct: float, desc: str):
241
  if progress_callback:
242
+ try:
243
+ progress_callback(float(pct), str(desc))
244
+ except Exception:
245
+ pass
246
+
247
  try:
248
+ _prog(0.0, "Stage 2: Preparing…")
 
249
  cap = cv2.VideoCapture(greenscreen_path)
250
  if not cap.isOpened():
 
251
  return None, "Could not open green screen video"
252
+
253
+ fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
254
+ total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
255
  width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
256
  height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
257
+
258
+ writer, output_path = create_video_writer(output_path, fps, width, height)
259
+ if writer is None:
260
+ cap.release()
261
+ return None, "Could not create output writer"
262
+
263
+ # Load/resize background
264
  if isinstance(background, str):
265
+ bg = cv2.imread(background, cv2.IMREAD_COLOR)
266
  if bg is None:
267
+ cap.release()
268
+ writer.release()
269
  return None, "Could not load background image"
270
  else:
271
  bg = background
272
+ bg = cv2.resize(bg, (width, height), interpolation=cv2.INTER_LANCZOS4)
273
+
274
+ settings = dict(CHROMA_PRESETS.get('standard', {}))
275
+ if chroma_settings:
276
+ settings.update(chroma_settings)
277
+
278
+ frame_idx = 0
279
+
 
 
 
 
 
 
280
  while True:
281
+ if stop_event is not None and stop_event.is_set():
282
+ _prog(1.0, "Stage 2: Cancelled")
283
  break
284
+
285
+ ok, frame = cap.read()
286
+ if not ok:
287
+ break
288
+
289
+ out = self._chroma_key_advanced(frame, bg, settings)
290
+ writer.write(out)
291
+
292
+ frame_idx += 1
293
+ if total > 0:
294
+ pct = 0.05 + 0.9 * (frame_idx / total)
295
+ else:
296
+ pct = min(0.95, 0.05 + frame_idx * 0.002)
297
+ _prog(pct, f"Stage 2: {frame_idx}/{total or '?'} frames")
298
+
299
  cap.release()
300
+ writer.release()
301
+ _prog(1.0, "Stage 2: Complete")
302
+
303
+ return output_path, f"Final video created ({frame_idx} frames)"
304
+
305
  except Exception as e:
306
  logger.error(f"Stage 2 error: {e}\n{traceback.format_exc()}")
307
+ return None, f"Stage 2 failed: {e}"
308
+
309
+ # ---------------------------
310
+ # Full pipeline
311
+ # ---------------------------
312
+ def process_full_pipeline(
313
+ self,
314
+ video_path: str,
315
+ background: np.ndarray | str,
316
+ final_output: str,
317
+ chroma_settings: Optional[Dict[str, Any]] = None,
318
+ progress_callback: Optional[Callable[[float, str], None]] = None,
319
+ stop_event: Optional["threading.Event"] = None,
320
+ ):
321
+ """
322
+ Stage 1 (to temp greenscreen) → Stage 2 (final composite).
323
+ """
324
+ gs_temp = tempfile.mktemp(suffix="_greenscreen.mp4")
325
  try:
326
+ gs_path, msg1 = self.stage1_extract_to_greenscreen(
327
+ video_path, gs_temp, progress_callback=progress_callback, stop_event=stop_event
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
328
  )
329
+ if gs_path is None:
330
+ return None, msg1
331
+
332
+ result, msg2 = self.stage2_greenscreen_to_final(
333
+ gs_path, background, final_output,
334
+ chroma_settings=chroma_settings,
335
+ progress_callback=progress_callback,
336
+ stop_event=stop_event
337
+ )
338
+ if result is None:
339
+ return None, msg2
340
+
341
+ return result, msg2
342
+
343
+ finally:
344
+ # best-effort cleanup
345
+ try:
346
+ if os.path.exists(gs_temp):
347
+ os.remove(gs_temp)
348
+ except Exception:
349
+ pass
350
+ gc.collect()
351
+
352
+ # ---------------------------
353
+ # Internals
354
+ # ---------------------------
355
+ def _unwrap_sam2(self, obj):
356
+ """
357
+ Try to get a callable SAM2-like predictor from whatever was passed.
358
+ Accepts:
359
+ - direct predictor (has set_image + predict)
360
+ - wrapper with .model that has set_image + predict
361
+ - wrapper with .predictor
362
+ """
363
  try:
364
+ if obj is None:
365
+ return None
366
+ # predictor directly?
367
+ if hasattr(obj, "set_image") and hasattr(obj, "predict"):
368
+ return obj
369
+ # wrapper.model?
370
+ model = getattr(obj, "model", None)
371
+ if model is not None and hasattr(model, "set_image") and hasattr(model, "predict"):
372
+ return model
373
+ # wrapper.predictor?
374
+ predictor = getattr(obj, "predictor", None)
375
+ if predictor is not None and hasattr(predictor, "set_image") and hasattr(predictor, "predict"):
376
+ return predictor
377
  except Exception as e:
378
+ logger.warning(f"SAM2 unwrap failed: {e}")
379
+ return None
380
+
381
+ def _get_mask(self, frame: np.ndarray) -> np.ndarray:
382
+ """
383
+ Use our project’s enhanced segmentation helper so validation/fallbacks are consistent.
384
+ """
385
+ predictor = self.sam2
386
  try:
387
+ mask = segment_person_hq(frame, predictor, fallback_enabled=True)
388
+ return mask
 
 
 
 
 
 
 
 
389
  except Exception as e:
390
+ logger.warning(f"Segmentation failed, using geometric fallback: {e}")
391
+ h, w = frame.shape[:2]
392
+ m = np.zeros((h, w), dtype=np.uint8)
393
+ m[h//6:5*h//6, w//4:3*w//4] = 255
394
+ return m
395
+
396
+ def _apply_greenscreen_hard(self, frame: np.ndarray, mask: np.ndarray, green_bg: np.ndarray) -> np.ndarray:
397
+ """
398
+ Hard-edge composite to pure green for very clean keying later.
399
+ """
400
  try:
401
+ if mask.ndim == 3:
402
+ mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
403
+ if mask.dtype != np.uint8:
404
+ mask = (np.clip(mask, 0, 1) * 255).astype(np.uint8)
405
+
406
+ _, mask_bin = cv2.threshold(mask, 140, 255, cv2.THRESH_BINARY)
407
+ mask3 = cv2.cvtColor(mask_bin, cv2.COLOR_GRAY2BGR).astype(np.float32) / 255.0
408
+
409
+ out = frame.astype(np.float32) * mask3 + green_bg.astype(np.float32) * (1.0 - mask3)
410
+ return np.clip(out, 0, 255).astype(np.uint8)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
411
  except Exception as e:
412
+ logger.error(f"Greenscreen composite failed: {e}")
413
  return frame
414
+
415
+ def _chroma_key_advanced(self, frame_bgr: np.ndarray, bg_bgr: np.ndarray, settings: Dict[str, Any]) -> np.ndarray:
 
416
  """
417
+ Distance-to-key color mask + soft edge + spill suppression (green reduction).
418
  """
419
  try:
420
+ key = np.array(settings.get("key_color", [0, 255, 0]), dtype=np.float32)
421
+ tol = float(settings.get("tolerance", 40))
422
+ soft = int(settings.get("edge_softness", 2))
423
+ spill = float(settings.get("spill_suppression", 0.3))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
424
 
425
+ f = frame_bgr.astype(np.float32)
426
+ b = bg_bgr.astype(np.float32)
427
+
428
+ # distance (BGR space)
429
+ diff = f - key
430
+ dist = np.sqrt((diff ** 2).sum(axis=2))
431
+
432
+ # inside green → 0, far from green → 1
433
+ mask = np.clip((dist - tol) / max(tol, 1.0), 0.0, 1.0)
434
+
435
+ if soft > 0:
436
+ ksize = max(1, soft * 2 + 1)
437
+ mask = cv2.GaussianBlur(mask.astype(np.float32), (ksize, ksize), soft)
438
+
439
+ # spill suppression
440
+ if spill > 0:
441
+ # where mask < 1.0 (near edges), reduce green channel proportionally
442
+ spill_zone = 1.0 - mask
443
+ g = f[:, :, 1]
444
+ f[:, :, 1] = np.clip(g - g * spill_zone * spill, 0, 255)
445
+
446
+ mask3 = np.stack([mask] * 3, axis=2)
447
+ out = f * mask3 + b * (1.0 - mask3)
448
+ return np.clip(out, 0, 255).astype(np.uint8)
449
+ except Exception as e:
450
+ logger.error(f"Chroma keying failed: {e}")
451
+ return frame_bgr