Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Model Download Script for MiloMusic | |
Downloads required model files from Hugging Face Hub at runtime to avoid storage limits. | |
""" | |
import os | |
import subprocess | |
import shutil | |
from pathlib import Path | |
def download_xcodec_models(): | |
"""Download xcodec_mini_infer using git clone (no LFS) + wget for large files""" | |
# Base path for xcodec models - convert to absolute path to avoid working directory issues | |
xcodec_base = Path("YuE/inference/xcodec_mini_infer").resolve() | |
print("π₯ Downloading xcodec_mini_infer using git clone + wget strategy...") | |
try: | |
# Remove existing directory if it exists | |
if xcodec_base.exists(): | |
print("ποΈ Removing existing xcodec_mini_infer directory...") | |
shutil.rmtree(xcodec_base) | |
# Ensure parent directory exists | |
os.makedirs(xcodec_base.parent, exist_ok=True) | |
# Change to the parent directory for git clone | |
original_cwd = os.getcwd() | |
os.chdir(xcodec_base.parent) | |
try: | |
# Step 1: Clone repository structure without LFS files | |
print("π Step 1: Cloning repository structure (no LFS)...") | |
# Set environment variable to skip LFS files during clone | |
env = os.environ.copy() | |
env["GIT_LFS_SKIP_SMUDGE"] = "1" | |
subprocess.run([ | |
"git", "clone", | |
"https://huggingface.co/m-a-p/xcodec_mini_infer", | |
"xcodec_mini_infer" | |
], check=True, capture_output=True, text=True, timeout=300, env=env) | |
print("β Repository structure downloaded successfully") | |
# Step 2: Download critical LFS files using wget | |
print("π Step 2: Downloading critical LFS files with wget...") | |
# Define critical LFS files and their download URLs | |
lfs_files = [ | |
{ | |
"path": "decoders/decoder_131000.pth", | |
"url": "https://huggingface.co/m-a-p/xcodec_mini_infer/resolve/main/decoders/decoder_131000.pth", | |
"description": "Vocal decoder model (70MB)" | |
}, | |
{ | |
"path": "decoders/decoder_151000.pth", | |
"url": "https://huggingface.co/m-a-p/xcodec_mini_infer/resolve/main/decoders/decoder_151000.pth", | |
"description": "Instrumental decoder model (70MB)" | |
}, | |
{ | |
"path": "final_ckpt/ckpt_00360000.pth", | |
"url": "https://huggingface.co/m-a-p/xcodec_mini_infer/resolve/main/final_ckpt/ckpt_00360000.pth", | |
"description": "Main checkpoint (1.3GB)" | |
}, | |
{ | |
"path": "semantic_ckpts/hf_1_325000/pytorch_model.bin", | |
"url": "https://huggingface.co/m-a-p/xcodec_mini_infer/resolve/main/semantic_ckpts/hf_1_325000/pytorch_model.bin", | |
"description": "Semantic model (361MB)" | |
} | |
] | |
# Change to the cloned directory | |
os.chdir("xcodec_mini_infer") | |
# Download each LFS file | |
success_count = 0 | |
for file_info in lfs_files: | |
try: | |
file_path = Path(file_info["path"]) | |
print(f"π₯ Downloading {file_info['description']}...") | |
# Ensure directory exists | |
os.makedirs(file_path.parent, exist_ok=True) | |
# Download with wget | |
subprocess.run([ | |
"wget", "-O", str(file_path), | |
file_info["url"] | |
], check=True, capture_output=True, text=True, timeout=1800) # 30min timeout for large files | |
# Verify download | |
if file_path.exists() and file_path.stat().st_size > 1024: # > 1KB | |
print(f"β Successfully downloaded {file_info['path']} ({file_path.stat().st_size // (1024*1024)}MB)") | |
success_count += 1 | |
else: | |
print(f"β οΈ {file_info['path']} download appears incomplete") | |
except subprocess.CalledProcessError as e: | |
print(f"β Failed to download {file_info['path']}: {e}") | |
if e.stderr: | |
print(f"Error details: {e.stderr[-500:]}...") # Last 500 chars | |
except subprocess.TimeoutExpired: | |
print(f"β Download timeout for {file_info['path']} (large file)") | |
except Exception as e: | |
print(f"β Unexpected error downloading {file_info['path']}: {e}") | |
os.chdir("..") | |
# Debug: Print current working directory and file locations | |
print(f"π Current working directory: {os.getcwd()}") | |
print(f"π Expected xcodec_base (absolute): {xcodec_base}") | |
print(f"π xcodec_base exists: {xcodec_base.exists()}") | |
# Additional debug: check if decoders directory exists | |
decoders_dir = xcodec_base / "decoders" | |
print(f"π Decoders directory: {decoders_dir}") | |
print(f"π Decoders directory exists: {decoders_dir.exists()}") | |
if decoders_dir.exists(): | |
all_decoder_files = list(decoders_dir.iterdir()) | |
print(f"π All files in decoders directory: {[f.name for f in all_decoder_files]}") | |
print(f"π File sizes: {[(f.name, f.stat().st_size if f.is_file() else 'dir') for f in all_decoder_files]}") | |
# Verify critical decoder files (minimum requirement) | |
decoder_files = [ | |
xcodec_base / "decoders" / "decoder_131000.pth", | |
xcodec_base / "decoders" / "decoder_151000.pth" | |
] | |
missing_decoders = [] | |
for decoder_file in decoder_files: | |
print(f"π Checking: {decoder_file}") | |
exists = decoder_file.exists() | |
print(f"π File exists: {exists}") | |
if exists: | |
size = decoder_file.stat().st_size | |
print(f"π File size: {size} bytes ({size // (1024*1024)} MB)") | |
if size < 1024: | |
missing_decoders.append(decoder_file.name) | |
print(f"β οΈ {decoder_file.name} is too small (likely an LFS pointer)") | |
else: | |
missing_decoders.append(decoder_file.name) | |
print(f"β οΈ {decoder_file.name} does not exist") | |
if missing_decoders: | |
print(f"β Critical decoder files missing or incomplete: {missing_decoders}") | |
print("Vocoder functionality will not work without decoder files.") | |
return False | |
print(f"β Successfully downloaded {success_count}/{len(lfs_files)} LFS files") | |
print("β All critical decoder files verified present and complete") | |
return True | |
finally: | |
os.chdir(original_cwd) | |
except subprocess.CalledProcessError as e: | |
print(f"β Git clone failed: {e}") | |
if e.stdout: | |
print(f"stdout: {e.stdout}") | |
if e.stderr: | |
print(f"stderr: {e.stderr}") | |
return False | |
except subprocess.TimeoutExpired: | |
print("β Git clone timed out") | |
return False | |
except Exception as e: | |
print(f"β Unexpected error: {e}") | |
return False | |
def ensure_model_availability(): | |
""" | |
Ensure all required models are available locally. | |
Download them if they don't exist. | |
""" | |
xcodec_base = Path("YuE/inference/xcodec_mini_infer") | |
# Check if critical files exist (both for recons and vocoder stages) | |
critical_files = [ | |
# Vocoder stage files | |
xcodec_base / "decoders" / "decoder_131000.pth", | |
xcodec_base / "decoders" / "decoder_151000.pth", | |
xcodec_base / "decoders" / "config.yaml", | |
# Recons stage files (critical for audio decoding) | |
xcodec_base / "final_ckpt" / "ckpt_00360000.pth", | |
xcodec_base / "final_ckpt" / "config.yaml", | |
# Python modules | |
xcodec_base / "models" / "soundstream_hubert_new.py" | |
] | |
missing_files = [f for f in critical_files if not f.exists()] | |
if missing_files: | |
print(f"β οΈ Missing critical model files: {[f.name for f in missing_files]}") | |
print("π Starting model download with git clone...") | |
success = download_xcodec_models() | |
if success: | |
print("β Model download completed successfully!") | |
else: | |
print("β Model download failed. Vocoder functionality will not work.") | |
return False | |
else: | |
print("β All critical model files are already present") | |
return True | |
if __name__ == "__main__": | |
""" | |
Run model download when script is executed directly | |
""" | |
print("π΅ MiloMusic Model Download Script") | |
print("=" * 50) | |
success = ensure_model_availability() | |
if success: | |
print("\nπ Setup complete! MiloMusic is ready to generate music.") | |
else: | |
print("\nβ οΈ Setup completed with warnings. Check the logs above.") | |
exit(1) |