Raffael-Kultyshev's picture
Upload app.py with huggingface_hub
9b7cf70 verified
#!/usr/bin/env python3
"""
DI LeRobot Converter API
========================
Receives episode data (JSON + video URL) from the iOS app,
creates a LeRobot v2.0 parquet file, uploads parquet + video
to the HuggingFace dataset repo, and updates meta/info.json.
Deployed as a HuggingFace Space with Gradio.
The iOS app calls the /api/convert endpoint after uploading to GCS.
"""
import gradio as gr
import json
import os
import tempfile
import shutil
from pathlib import Path
import pandas as pd
import numpy as np
from huggingface_hub import HfApi, hf_hub_download
# Config
HF_DATASET_REPO = "DynamicIntelligence/humanoid-robots-training-dataset"
HF_TOKEN = os.environ.get("HF_TOKEN", "")
GCS_BUCKET = "di_record_intern_data"
CHUNKS_SIZE = 100
def convert_episode(episode_json: str) -> str:
"""
Convert episode data to LeRobot v2.0 format and upload to dataset repo.
Input JSON schema:
{
"episode_index": int, # auto-assigned if -1
"language_instruction": str,
"fps": int,
"frames": [
{
"timestamp": float,
"pose": {"x": f, "y": f, "z": f, "yaw": f, "pitch": f, "roll": f},
"left_hand": [x, y, z] or null,
"right_hand": [x, y, z] or null
}, ...
],
"video_gcs_path": str # GCS path to rgb_video.mp4
}
"""
try:
data = json.loads(episode_json)
except json.JSONDecodeError as e:
return json.dumps({"error": f"Invalid JSON: {e}"})
api = HfApi(token=HF_TOKEN)
# Determine episode index
episode_index = data.get("episode_index", -1)
if episode_index < 0:
# Auto-assign: read current info.json to get next index
try:
info_path = hf_hub_download(
repo_id=HF_DATASET_REPO, filename="meta/info.json",
repo_type="dataset", token=HF_TOKEN
)
with open(info_path) as f:
info = json.load(f)
episode_index = info.get("total_episodes", 0)
except Exception:
episode_index = 0
lang = data.get("language_instruction", "")
fps = data.get("fps", 30) or 30
frames = data.get("frames", [])
num_frames = len(frames)
if num_frames == 0:
return json.dumps({"error": "No frames in episode data"})
# Build parquet rows
rows = []
for i, frame in enumerate(frames):
pose = frame.get("pose", {})
cam_x = pose.get("x", 0)
cam_y = pose.get("y", 0)
cam_z = pose.get("z", 0)
cam_roll = pose.get("roll", 0)
cam_pitch = pose.get("pitch", 0)
cam_yaw = pose.get("yaw", 0)
camera_pose = [cam_x, cam_y, cam_z, cam_roll, cam_pitch, cam_yaw]
# Hand data: [x, y, z] from end_effector → pad to 9 values (3 joints × xyz)
lh = frame.get("left_hand") or [0, 0, 0]
rh = frame.get("right_hand") or [0, 0, 0]
# Pad single palm position to 3-joint format (wrist=palm, others=0)
left_hand = list(lh[:3]) + [0.0] * 6
right_hand = list(rh[:3]) + [0.0] * 6
# Action deltas
if i > 0:
prev = frames[i - 1]
pp = prev.get("pose", {})
prev_cam = [pp.get("x", 0), pp.get("y", 0), pp.get("z", 0),
pp.get("roll", 0), pp.get("pitch", 0), pp.get("yaw", 0)]
cam_delta = [camera_pose[j] - prev_cam[j] for j in range(6)]
plh = prev.get("left_hand") or [0, 0, 0]
prh = prev.get("right_hand") or [0, 0, 0]
lh_delta = [lh[j] - plh[j] if j < len(lh) and j < len(plh) else 0 for j in range(3)] + [0.0] * 6
rh_delta = [rh[j] - prh[j] if j < len(rh) and j < len(prh) else 0 for j in range(3)] + [0.0] * 6
else:
cam_delta = [0.0] * 6
lh_delta = [0.0] * 9
rh_delta = [0.0] * 9
rows.append({
"episode_index": episode_index,
"frame_index": i,
"timestamp": frame.get("timestamp", i / fps),
"observation.camera_pose": camera_pose,
"observation.left_hand": left_hand,
"observation.right_hand": right_hand,
"action.camera_delta": cam_delta,
"action.left_hand_delta": lh_delta,
"action.right_hand_delta": rh_delta,
"language_instruction": lang,
"next.done": i == num_frames - 1,
})
# Create parquet
tmp = Path(tempfile.mkdtemp())
try:
df = pd.DataFrame(rows)
chunk_idx = episode_index // CHUNKS_SIZE
parquet_path = tmp / f"episode_{episode_index:06d}.parquet"
df.to_parquet(parquet_path, index=False)
# Upload parquet
api.upload_file(
path_or_fileobj=str(parquet_path),
path_in_repo=f"data/chunk-{chunk_idx:03d}/episode_{episode_index:06d}.parquet",
repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN,
)
# Upload video from GCS if provided
video_gcs_path = data.get("video_gcs_path", "")
video_gcs_url = data.get("video_gcs_url", "")
video_uploaded = False
if video_gcs_url:
# Download from GCS public URL and re-upload to HF
import urllib.request
video_local = tmp / "rgb_video.mp4"
try:
urllib.request.urlretrieve(video_gcs_url, str(video_local))
api.upload_file(
path_or_fileobj=str(video_local),
path_in_repo=f"videos/chunk-{chunk_idx:03d}/rgb/episode_{episode_index:06d}.mp4",
repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN,
)
video_uploaded = True
except Exception as ve:
pass # Video upload is optional
# Update meta/info.json
try:
existing_info_path = hf_hub_download(
repo_id=HF_DATASET_REPO, filename="meta/info.json",
repo_type="dataset", token=HF_TOKEN
)
with open(existing_info_path) as f:
info = json.load(f)
info["total_episodes"] = max(info.get("total_episodes", 0), episode_index + 1)
info["total_frames"] = info.get("total_frames", 0) + num_frames
info["splits"] = {"train": f"0:{info['total_episodes']}"}
info["total_chunks"] = (info["total_episodes"] - 1) // CHUNKS_SIZE + 1
if video_uploaded:
info["total_videos"] = info.get("total_videos", 0) + 1
except Exception:
info = build_default_info(episode_index, num_frames)
meta_dir = tmp / "meta"
meta_dir.mkdir(exist_ok=True)
with open(meta_dir / "info.json", "w") as f:
json.dump(info, f, indent=2)
api.upload_folder(
folder_path=str(meta_dir), path_in_repo="meta",
repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN,
)
result = {
"success": True,
"episode_index": episode_index,
"num_frames": num_frames,
"parquet_uploaded": True,
"video_uploaded": video_uploaded,
"dataset_url": f"https://huggingface.co/datasets/{HF_DATASET_REPO}",
}
return json.dumps(result)
finally:
shutil.rmtree(tmp, ignore_errors=True)
def build_default_info(episode_index, num_frames):
return {
"codebase_version": "v2.0",
"robot_type": "unknown",
"total_episodes": episode_index + 1,
"total_frames": num_frames,
"total_tasks": 1,
"total_videos": 1,
"total_chunks": 1,
"chunks_size": CHUNKS_SIZE,
"fps": 30,
"splits": {"train": f"0:{episode_index + 1}"},
"data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet",
"video_path": "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4",
"features": {
"observation.camera_pose": {"dtype": "float32", "shape": [6],
"names": ["x", "y", "z", "roll", "pitch", "yaw"]},
"observation.left_hand": {"dtype": "float32", "shape": [9],
"names": ["wrist_x", "wrist_y", "wrist_z", "thumb_x", "thumb_y", "thumb_z",
"index_x", "index_y", "index_z"]},
"observation.right_hand": {"dtype": "float32", "shape": [9],
"names": ["wrist_x", "wrist_y", "wrist_z", "index_x", "index_y", "index_z",
"middle_x", "middle_y", "middle_z"]},
"action.camera_delta": {"dtype": "float32", "shape": [6],
"names": ["dx", "dy", "dz", "droll", "dpitch", "dyaw"]},
"action.left_hand_delta": {"dtype": "float32", "shape": [9],
"names": ["wrist_dx", "wrist_dy", "wrist_dz", "thumb_dx", "thumb_dy",
"thumb_dz", "index_dx", "index_dy", "index_dz"]},
"action.right_hand_delta": {"dtype": "float32", "shape": [9],
"names": ["wrist_dx", "wrist_dy", "wrist_dz", "index_dx", "index_dy",
"index_dz", "middle_dx", "middle_dy", "middle_dz"]},
"language_instruction": {"dtype": "string", "shape": [1], "names": None},
"timestamp": {"dtype": "float64", "shape": [1], "names": None},
"frame_index": {"dtype": "int64", "shape": [1], "names": None},
"episode_index": {"dtype": "int64", "shape": [1], "names": None},
"next.done": {"dtype": "bool", "shape": [1], "names": None},
"rgb": {"dtype": "video", "shape": [480, 640, 3],
"names": ["height", "width", "channels"],
"video_info": {"video.fps": 30, "video.codec": "h264",
"video.pix_fmt": "yuv420p", "video.is_depth_map": False,
"has_audio": False}},
},
"videos": {
"rgb": {"video_info": {"video.fps": 30, "video.codec": "h264",
"video.pix_fmt": "yuv420p", "video.is_depth_map": False,
"has_audio": False}}
},
}
# Gradio UI (also exposes /api/convert endpoint automatically)
demo = gr.Interface(
fn=convert_episode,
inputs=gr.Textbox(label="Episode JSON", lines=10, placeholder="Paste episode JSON here..."),
outputs=gr.Textbox(label="Result"),
title="DI LeRobot Converter",
description="Converts episode data from DI iOS app to LeRobot v2.0 format and uploads to HuggingFace dataset repo.",
api_name="convert",
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)