3d-nerf / app.py
Tohru127's picture
Upload 14 files
d99a295 verified
import gradio as gr
import cv2
import numpy as np
import os
import subprocess
import json
import tempfile
import shutil
from pathlib import Path
import torch
class NeRFReconstructor:
def __init__(self):
self.temp_dir = None
self.frames_dir = None
self.output_dir = None
self.colmap_dir = None
def setup_directories(self):
"""Create temporary directories for processing"""
self.temp_dir = tempfile.mkdtemp(prefix="nerf_")
self.frames_dir = os.path.join(self.temp_dir, "images")
self.colmap_dir = os.path.join(self.temp_dir, "colmap")
self.output_dir = os.path.join(self.temp_dir, "output")
os.makedirs(self.frames_dir, exist_ok=True)
os.makedirs(self.colmap_dir, exist_ok=True)
os.makedirs(self.output_dir, exist_ok=True)
return self.temp_dir
def extract_frames(self, video_path, fps=2, max_frames=100, progress=gr.Progress()):
"""Extract frames from video"""
progress(0, desc="πŸ“Ή Opening video file...")
if not os.path.exists(video_path):
raise ValueError("Video file not found")
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError("Could not open video file")
video_fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
duration = total_frames / video_fps if video_fps > 0 else 0
# Calculate frame interval
frame_interval = max(1, int(video_fps / fps))
frames_extracted = 0
frame_count = 0
frame_paths = []
progress(0.05, desc=f"🎬 Extracting frames (0/{max_frames})...")
while frames_extracted < max_frames:
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
# Save frame with high quality
frame_path = os.path.join(self.frames_dir, f"frame_{frames_extracted:05d}.jpg")
cv2.imwrite(frame_path, frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
frame_paths.append(frame_path)
frames_extracted += 1
# Update progress (5-25%)
progress_val = 0.05 + (0.20 * frames_extracted / max_frames)
progress(progress_val, desc=f"🎬 Extracting frames ({frames_extracted}/{max_frames})...")
frame_count += 1
cap.release()
return frame_paths, {
"frames_extracted": frames_extracted,
"video_fps": video_fps,
"video_duration": duration,
"resolution": f"{width}x{height}",
"total_video_frames": total_frames
}
def run_colmap(self, progress=gr.Progress()):
"""Run COLMAP for Structure from Motion"""
database_path = os.path.join(self.colmap_dir, "database.db")
sparse_dir = os.path.join(self.colmap_dir, "sparse")
os.makedirs(sparse_dir, exist_ok=True)
try:
# Check if COLMAP is available
result = subprocess.run(["which", "colmap"], capture_output=True, text=True)
if result.returncode != 0:
return {"status": "colmap_not_found", "message": "COLMAP not installed"}
# Feature extraction (25-40%)
progress(0.25, desc="πŸ” COLMAP: Extracting image features...")
result = subprocess.run([
"colmap", "feature_extractor",
"--database_path", database_path,
"--image_path", self.frames_dir,
"--ImageReader.single_camera", "1",
"--ImageReader.camera_model", "OPENCV",
"--SiftExtraction.use_gpu", "1"
], capture_output=True, text=True, timeout=600)
if result.returncode != 0:
return {"status": "error", "message": f"Feature extraction failed: {result.stderr}"}
# Feature matching (40-55%)
progress(0.40, desc="πŸ”— COLMAP: Matching features...")
result = subprocess.run([
"colmap", "exhaustive_matcher",
"--database_path", database_path,
"--SiftMatching.use_gpu", "1"
], capture_output=True, text=True, timeout=600)
if result.returncode != 0:
return {"status": "error", "message": f"Feature matching failed: {result.stderr}"}
# Sparse reconstruction (55-70%)
progress(0.55, desc="πŸ“ COLMAP: Building sparse 3D reconstruction...")
result = subprocess.run([
"colmap", "mapper",
"--database_path", database_path,
"--image_path", self.frames_dir,
"--output_path", sparse_dir
], capture_output=True, text=True, timeout=900)
if result.returncode != 0:
return {"status": "error", "message": f"Mapping failed: {result.stderr}"}
# Check if reconstruction was successful
model_dir = os.path.join(sparse_dir, "0")
if not os.path.exists(model_dir):
return {"status": "error", "message": "No reconstruction created"}
return {"status": "success", "sparse_dir": sparse_dir}
except subprocess.TimeoutExpired:
return {"status": "timeout", "message": "COLMAP processing timeout"}
except Exception as e:
return {"status": "error", "message": str(e)}
def convert_colmap_to_nerf(self, progress=gr.Progress()):
"""Convert COLMAP output to NeRF format"""
progress(0.70, desc="πŸ”„ Converting COLMAP to NeRF format...")
try:
# Use nerfstudio's data processing
result = subprocess.run([
"ns-process-data", "images",
"--data", self.frames_dir,
"--output-dir", self.output_dir,
"--skip-colmap"
], capture_output=True, text=True, timeout=300)
if result.returncode != 0:
return {"status": "error", "message": f"Conversion failed: {result.stderr}"}
# Copy COLMAP results
colmap_sparse = os.path.join(self.colmap_dir, "sparse", "0")
output_colmap = os.path.join(self.output_dir, "colmap", "sparse", "0")
os.makedirs(os.path.dirname(output_colmap), exist_ok=True)
if os.path.exists(colmap_sparse):
shutil.copytree(colmap_sparse, output_colmap, dirs_exist_ok=True)
return {"status": "success"}
except Exception as e:
return {"status": "error", "message": str(e)}
def train_nerf(self, num_iterations=1000, progress=gr.Progress()):
"""Train NeRF model using Nerfstudio"""
progress(0.75, desc="🧠 Training NeRF model...")
try:
# Check for nerfstudio
result = subprocess.run(["which", "ns-train"], capture_output=True, text=True)
if result.returncode != 0:
return {"status": "nerfstudio_not_found", "message": "Nerfstudio not installed"}
# Check GPU availability
gpu_available = torch.cuda.is_available()
gpu_info = f"GPU: {torch.cuda.get_device_name(0)}" if gpu_available else "CPU mode"
progress(0.75, desc=f"🧠 Training NeRF ({gpu_info})...")
# Train with nerfacto method (optimized for quality and speed)
config_path = os.path.join(self.output_dir, "config.yml")
result = subprocess.run([
"ns-train", "nerfacto",
"--data", self.output_dir,
"--output-dir", os.path.join(self.output_dir, "models"),
"--max-num-iterations", str(num_iterations),
"--pipeline.model.predict-normals", "False",
"--viewer.quit-on-train-completion", "True",
"--vis", "viewer+tensorboard"
], capture_output=True, text=True, timeout=3600)
# Update progress incrementally during training
for i in range(10):
progress(0.75 + (i * 0.015), desc=f"🧠 Training NeRF... ({(i+1)*10}% complete)")
if result.returncode != 0:
return {"status": "error", "message": f"Training failed: {result.stderr[:500]}"}
return {"status": "success", "gpu_used": gpu_available}
except subprocess.TimeoutExpired:
return {"status": "timeout", "message": "Training timeout (>1 hour)"}
except Exception as e:
return {"status": "error", "message": str(e)}
def export_model(self, progress=gr.Progress()):
"""Export point cloud and model"""
progress(0.90, desc="πŸ’Ύ Exporting 3D model...")
try:
# Find the latest model
models_dir = os.path.join(self.output_dir, "models")
if not os.path.exists(models_dir):
return {"status": "error", "message": "No trained model found"}
# Get the most recent model directory
model_dirs = [d for d in os.listdir(models_dir) if os.path.isdir(os.path.join(models_dir, d))]
if not model_dirs:
return {"status": "error", "message": "No model directories found"}
latest_model = sorted(model_dirs)[-1]
model_path = os.path.join(models_dir, latest_model)
config_path = os.path.join(model_path, "config.yml")
if not os.path.exists(config_path):
return {"status": "error", "message": "Model config not found"}
# Export point cloud
output_ply = os.path.join(self.output_dir, "point_cloud.ply")
result = subprocess.run([
"ns-export", "pointcloud",
"--load-config", config_path,
"--output-dir", self.output_dir,
"--num-points", "1000000",
"--remove-outliers", "True",
"--use-bounding-box", "True"
], capture_output=True, text=True, timeout=600)
if result.returncode != 0:
# Try COLMAP export as fallback
colmap_sparse = os.path.join(self.colmap_dir, "sparse", "0")
if os.path.exists(colmap_sparse):
result = subprocess.run([
"colmap", "model_converter",
"--input_path", colmap_sparse,
"--output_path", output_ply,
"--output_type", "PLY"
], capture_output=True, text=True, timeout=300)
# Check if PLY was created
if os.path.exists(output_ply):
file_size = os.path.getsize(output_ply) / (1024 * 1024) # MB
return {
"status": "success",
"ply_path": output_ply,
"file_size_mb": round(file_size, 2)
}
return {"status": "error", "message": "Export failed"}
except Exception as e:
return {"status": "error", "message": str(e)}
def cleanup(self):
"""Clean up temporary files"""
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
except:
pass
def create_preview_grid(frame_paths, max_frames=9):
"""Create a grid preview of extracted frames"""
preview_frames = frame_paths[:min(max_frames, len(frame_paths))]
images = []
for fp in preview_frames:
img = cv2.imread(fp)
if img is not None:
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (256, 256))
images.append(img)
if not images:
return None
# Create 3x3 grid
rows = []
for i in range(0, len(images), 3):
row_imgs = images[i:i+3]
# Pad if needed
while len(row_imgs) < 3:
row_imgs.append(np.zeros((256, 256, 3), dtype=np.uint8))
rows.append(np.hstack(row_imgs))
# Pad rows if needed
while len(rows) < 3:
rows.append(np.zeros((256, 768, 3), dtype=np.uint8))
grid = np.vstack(rows)
return grid
def process_video_full_pipeline(
video_path,
fps,
max_frames,
train_iterations,
progress=gr.Progress()
):
"""Full NeRF reconstruction pipeline"""
if video_path is None:
return (
"❌ Please upload a video file first.",
None,
None,
json.dumps({"error": "No video uploaded"}, indent=2)
)
reconstructor = NeRFReconstructor()
results = {"steps": []}
try:
# Step 1: Setup
progress(0, desc="βš™οΈ Setting up workspace...")
reconstructor.setup_directories()
results["workspace"] = reconstructor.temp_dir
# Step 2: Extract frames (0-25%)
progress(0.05, desc="🎬 Extracting frames...")
frame_paths, video_info = reconstructor.extract_frames(
video_path, fps, max_frames, progress
)
if len(frame_paths) < 3:
return (
"❌ Error: Not enough frames extracted (minimum 3 required).",
None,
None,
json.dumps({"error": "Insufficient frames", "frames": len(frame_paths)}, indent=2)
)
results["steps"].append({"name": "Frame Extraction", "status": "βœ… Complete"})
results["video_info"] = video_info
# Create preview
preview_grid = create_preview_grid(frame_paths)
# Step 3: COLMAP (25-70%)
progress(0.25, desc="πŸ“ Running COLMAP reconstruction...")
colmap_result = reconstructor.run_colmap(progress)
if colmap_result["status"] != "success":
return (
f"❌ COLMAP failed: {colmap_result.get('message', 'Unknown error')}",
preview_grid,
None,
json.dumps(results, indent=2)
)
results["steps"].append({"name": "COLMAP SfM", "status": "βœ… Complete"})
# Step 4: Convert to NeRF format (70-75%)
convert_result = reconstructor.convert_colmap_to_nerf(progress)
if convert_result["status"] != "success":
return (
f"❌ Conversion failed: {convert_result.get('message', 'Unknown error')}",
preview_grid,
None,
json.dumps(results, indent=2)
)
results["steps"].append({"name": "Data Conversion", "status": "βœ… Complete"})
# Step 5: Train NeRF (75-90%)
progress(0.75, desc="🧠 Training NeRF model (this may take several minutes)...")
train_result = reconstructor.train_nerf(train_iterations, progress)
if train_result["status"] != "success":
return (
f"❌ Training failed: {train_result.get('message', 'Unknown error')}",
preview_grid,
None,
json.dumps(results, indent=2)
)
results["steps"].append({"name": "NeRF Training", "status": "βœ… Complete"})
results["gpu_used"] = train_result.get("gpu_used", False)
# Step 6: Export (90-100%)
export_result = reconstructor.export_model(progress)
if export_result["status"] != "success":
return (
f"⚠️ Training complete but export failed: {export_result.get('message', 'Unknown error')}",
preview_grid,
None,
json.dumps(results, indent=2)
)
results["steps"].append({"name": "Model Export", "status": "βœ… Complete"})
results["output_file"] = export_result.get("ply_path")
results["file_size_mb"] = export_result.get("file_size_mb")
progress(1.0, desc="βœ… Complete!")
# Generate success message
status_msg = f"""
# βœ… 3D Reconstruction Complete!
## πŸ“Š Processing Results
**Video Information:**
- Frames extracted: {video_info['frames_extracted']}
- Video resolution: {video_info['resolution']}
- Video duration: {video_info['video_duration']:.2f} seconds
- Video FPS: {video_info['video_fps']:.1f}
**Processing Pipeline:**
{''.join([f"- {step['name']}: {step['status']}" + chr(10) for step in results['steps']])}
**Output:**
- 3D Point Cloud: {results['file_size_mb']:.2f} MB
- Format: PLY (compatible with CloudCompare, MeshLab, Blender)
- GPU Acceleration: {'βœ… Yes' if results.get('gpu_used') else '❌ No'}
## πŸ“₯ Next Steps
1. Download the PLY file below
2. Import into your GIS software or 3D viewer
3. Use for spatial analysis and visualization
## πŸ”¬ Suggested Tools
- **CloudCompare**: Point cloud analysis
- **MeshLab**: Mesh processing
- **Blender**: 3D visualization and animation
- **QGIS**: Geographic analysis
"""
return (
status_msg,
preview_grid,
export_result.get("ply_path"),
json.dumps(results, indent=2)
)
except Exception as e:
results["error"] = str(e)
return (
f"❌ Error during processing: {str(e)}",
None,
None,
json.dumps(results, indent=2)
)
finally:
# Note: Keep temp files for download, clean up later
pass
# Create Gradio interface
def create_app():
with gr.Blocks(theme=gr.themes.Soft(), title="NeRF 3D Reconstruction") as app:
gr.Markdown("""
# πŸŽ₯ NeRF 3D Reconstruction from Insta360 Video
### Neural Radiance Fields for Geographic Research
Transform your 360Β° Insta360 videos into detailed 3D models using state-of-the-art NeRF technology.
Powered by COLMAP and Nerfstudio with GPU acceleration.
""")
# Check GPU availability
gpu_info = ""
if torch.cuda.is_available():
gpu_name = torch.cuda.get_device_name(0)
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9
gpu_info = f"πŸš€ **GPU Detected:** {gpu_name} ({gpu_memory:.1f} GB)"
else:
gpu_info = "⚠️ **No GPU detected** - Processing will be slower"
gr.Markdown(gpu_info)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ“€ Input Configuration")
video_input = gr.Video(
label="Upload Insta360 Video",
height=300
)
with gr.Group():
gr.Markdown("**Frame Extraction Settings**")
fps_slider = gr.Slider(
minimum=1,
maximum=10,
value=2,
step=1,
label="Extraction FPS",
info="Frames per second to extract (2-3 recommended)"
)
max_frames_slider = gr.Slider(
minimum=20,
maximum=300,
value=100,
step=10,
label="Maximum Frames",
info="More frames = better quality but longer processing"
)
with gr.Group():
gr.Markdown("**NeRF Training Settings**")
iterations_slider = gr.Slider(
minimum=500,
maximum=5000,
value=2000,
step=500,
label="Training Iterations",
info="More iterations = better quality (2000 recommended)"
)
process_btn = gr.Button(
"πŸš€ Start Full Reconstruction",
variant="primary",
size="lg"
)
gr.Markdown("""
### ⏱️ Expected Processing Time
- Frame extraction: 1-2 minutes
- COLMAP reconstruction: 5-10 minutes
- NeRF training: 10-30 minutes
- **Total: ~20-45 minutes**
### πŸ’‘ Tips for Best Results
- Use well-lit outdoor scenes
- Ensure camera movement has overlap
- Avoid fast movements
- 50-150 frames optimal for most scenes
""")
with gr.Column(scale=1):
gr.Markdown("### πŸ“Š Results")
status_output = gr.Markdown(
value="Ready to process. Upload a video and click 'Start Full Reconstruction'.",
label="Status"
)
preview_output = gr.Image(
label="Extracted Frames Preview",
height=400
)
model_output = gr.File(
label="πŸ“₯ Download 3D Model (PLY)",
file_types=[".ply"]
)
with gr.Accordion("πŸ” Technical Details", open=False):
json_output = gr.JSON(label="Processing Log")
gr.Markdown("""
---
## πŸŽ“ Geographic Research Applications
### Landscape Analysis
- Terrain modeling and elevation analysis
- Geomorphological feature detection
- Erosion and landform studies
### Urban Geography
- 3D city modeling
- Building reconstruction
- Urban planning visualization
### Environmental Monitoring
- Vegetation structure analysis
- Coastal erosion monitoring
- Land use change detection
### Cultural Heritage
- Archaeological site documentation
- Historical structure preservation
- Virtual field trips
## πŸ“š Technical Pipeline
1. **Frame Extraction**: Extract key frames from 360Β° video
2. **COLMAP SfM**: Structure from Motion for camera poses
3. **NeRF Training**: Neural network learns 3D scene representation
4. **Export**: Generate point cloud in PLY format
## πŸ”— Export Compatibility
The generated PLY files work with:
- CloudCompare (point cloud processing)
- MeshLab (mesh editing)
- Blender (3D modeling)
- QGIS (geographic analysis)
- ArcGIS (spatial analysis)
---
**Powered by:** Nerfstudio β€’ COLMAP β€’ PyTorch β€’ Gradio
""")
# Event handler
process_btn.click(
fn=process_video_full_pipeline,
inputs=[
video_input,
fps_slider,
max_frames_slider,
iterations_slider
],
outputs=[
status_output,
preview_output,
model_output,
json_output
]
)
return app
# Launch app
if __name__ == "__main__":
app = create_app()
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)