|
from typing import Dict, List, Any |
|
from PIL import Image |
|
import torch |
|
from torch import autocast |
|
from tqdm.auto import tqdm |
|
from point_e.diffusion.configs import DIFFUSION_CONFIGS, diffusion_from_config |
|
from point_e.diffusion.sampler import PointCloudSampler |
|
from point_e.models.download import load_checkpoint |
|
from point_e.models.configs import MODEL_CONFIGS, model_from_config |
|
from point_e.util.plotting import plot_point_cloud |
|
from point_e.util.pc_to_mesh import marching_cubes_mesh |
|
from point_e.util.point_cloud import PointCloud |
|
import json |
|
import base64 |
|
import numpy as np |
|
from io import BytesIO |
|
import os |
|
|
|
|
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
|
if device.type != 'cuda': |
|
raise ValueError("need to run on GPU") |
|
|
|
class EndpointHandler(): |
|
def __init__(self, path=""): |
|
self.active = True |
|
|
|
|
|
print('creating base model...') |
|
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE" |
|
|
|
print('creating base model...') |
|
self.base_name = 'base40M-textvec' |
|
self.base_model = model_from_config(MODEL_CONFIGS[self.base_name], device) |
|
self.base_model.eval() |
|
self.base_diffusion = diffusion_from_config(DIFFUSION_CONFIGS[self.base_name]) |
|
|
|
print('creating image model...') |
|
|
|
self.base_image_name = 'base40M' |
|
self.base_image_model = model_from_config(MODEL_CONFIGS[self.base_image_name], device) |
|
self.base_image_model.eval() |
|
self.base_diffusion = diffusion_from_config(DIFFUSION_CONFIGS[self.base_image_name]) |
|
|
|
print('creating upsample model...') |
|
self.upsampler_model = model_from_config(MODEL_CONFIGS['upsample'], device) |
|
self.upsampler_model.eval() |
|
self.upsampler_diffusion = diffusion_from_config(DIFFUSION_CONFIGS['upsample']) |
|
|
|
print('downloading base checkpoint...') |
|
self.base_model.load_state_dict(load_checkpoint(self.base_name, device)) |
|
self.base_image_model.load_state_dict(load_checkpoint(self.base_image_name, device)) |
|
|
|
print('downloading upsampler checkpoint...') |
|
self.upsampler_model.load_state_dict(load_checkpoint('upsample', device)) |
|
|
|
print('creating SDF model...') |
|
self.sdf_name = 'sdf' |
|
self.sdf_model = model_from_config(MODEL_CONFIGS[self.sdf_name], device) |
|
self.sdf_model.eval() |
|
|
|
print('loading SDF model...') |
|
self.sdf_model.load_state_dict(load_checkpoint(self.sdf_name, device)) |
|
|
|
def __call__(self, input_data: Any) -> Any: |
|
|
|
|
|
if isinstance(input_data, str): |
|
print("input_data is a string, attempting to deserialize...") |
|
try: |
|
input_data = json.loads(input_data) |
|
except json.JSONDecodeError as e: |
|
print(f"Failed to parse JSON: {e}") |
|
return None |
|
|
|
command = "null" |
|
|
|
if "command" in input_data: |
|
command = input_data["command"] |
|
|
|
print(f"the command is: {command}") |
|
|
|
|
|
|
|
if command == "null": |
|
temp_pc = self.generate_point_cloud(input_data) |
|
return self.generate_mesh_from_pc(temp_pc) |
|
elif command == "generate_pc": |
|
return self.generate_point_cloud(input_data) |
|
elif command == "generate_mesh": |
|
print("generate_mesh command received...") |
|
raw_pc = input_data.get("raw_pc") |
|
|
|
if raw_pc is None: |
|
print("raw_pc not found in input_data!") |
|
return None |
|
|
|
|
|
if isinstance(raw_pc, str): |
|
print("raw_pc is a string, attempting to deserialize...") |
|
raw_pc = json.loads(raw_pc) |
|
|
|
print("Calling generate_mesh_from_pc...") |
|
return self.generate_mesh_from_pc(raw_pc) |
|
elif command == "status": |
|
return self.check_status() |
|
|
|
|
|
|
|
|
|
def check_status(self) -> bool: |
|
return self.active |
|
|
|
|
|
def generate_point_cloud(self, data: Any) -> Dict[str, Dict[str, float]]: |
|
print("generate pc called...") |
|
use_image = False |
|
|
|
|
|
if "image" in data: |
|
image_data_encoded = data.pop("image") |
|
use_image = True |
|
print('image data found') |
|
else: |
|
print('no image data found') |
|
|
|
|
|
inputs = data.pop("inputs", data) |
|
|
|
if use_image: |
|
sampler = PointCloudSampler( |
|
device=device, |
|
models=[self.base_image_model, self.upsampler_model], |
|
diffusions=[self.base_diffusion, self.upsampler_diffusion], |
|
num_points=[1024, 4096 - 1024], |
|
aux_channels=['R', 'G', 'B'], |
|
guidance_scale=[3.0, 3.0], |
|
) |
|
|
|
|
|
image_data = base64.b64decode(image_data_encoded) |
|
|
|
|
|
img = Image.open(BytesIO(image_data)) |
|
else: |
|
sampler = PointCloudSampler( |
|
device=device, |
|
models=[self.base_model,self.upsampler_model], |
|
diffusions=[self.base_diffusion, self.upsampler_diffusion], |
|
num_points=[1024, 4096 - 1024], |
|
aux_channels=['R', 'G', 'B'], |
|
guidance_scale=[3.0, 0.0], |
|
model_kwargs_key_filter=('texts', ''), |
|
) |
|
|
|
|
|
with autocast(device.type): |
|
samples = None |
|
if use_image: |
|
for x in tqdm(sampler.sample_batch_progressive(batch_size=1, model_kwargs=dict(images=[img]))): |
|
samples = x |
|
else: |
|
for x in tqdm(sampler.sample_batch_progressive(batch_size=1, model_kwargs=dict(texts=[inputs]))): |
|
samples = x |
|
|
|
|
|
|
|
pc = sampler.output_to_point_clouds(samples)[0] |
|
pc_dict = {} |
|
|
|
data_list = pc.coords.tolist() |
|
json_string = json.dumps(data_list) |
|
pc_dict['data'] = json_string |
|
|
|
|
|
serializable_channels = {key: value.tolist() for key, value in pc.channels.items()} |
|
|
|
|
|
channel_data = json.dumps(serializable_channels) |
|
pc_dict['channels'] = channel_data |
|
|
|
return pc_dict |
|
|
|
|
|
def generate_mesh_from_pc(self, pc_data: Any) -> Any: |
|
|
|
print("generate mesh called...") |
|
|
|
|
|
coords_list = json.loads(pc_data['data']) |
|
channels_dict = json.loads(pc_data['channels']) |
|
|
|
|
|
|
|
point_cloud = PointCloud( |
|
coords=np.array(coords_list, dtype=np.float32), |
|
channels={name: np.array(array, dtype=np.float32) for name, array in channels_dict.items()} |
|
) |
|
|
|
mesh = marching_cubes_mesh( |
|
pc=point_cloud, |
|
model=self.sdf_model, |
|
batch_size=4096, |
|
grid_size=32, |
|
progress=True, |
|
) |
|
|
|
|
|
with open('mesh.ply', 'wb') as f: |
|
mesh.write_ply(f) |
|
print('Mesh saved to ply file, returning mesh data...') |
|
|
|
|
|
return mesh |
|
|
|
|