| """ |
| Implementation of a custom transfer agent for the transfer type "multipart" for |
| git-lfs. |
| |
| Inspired by: |
| github.com/cbartz/git-lfs-swift-transfer-agent/blob/master/git_lfs_swift_transfer.py |
| |
| Spec is: github.com/git-lfs/git-lfs/blob/master/docs/custom-transfers.md |
| |
| |
| To launch debugger while developing: |
| |
| ``` [lfs "customtransfer.multipart"] |
| path = /path/to/huggingface_hub/.env/bin/python args = -m debugpy --listen 5678 |
| --wait-for-client |
| /path/to/huggingface_hub/src/huggingface_hub/commands/huggingface_cli.py |
| lfs-multipart-upload ```""" |
|
|
| import json |
| import os |
| import subprocess |
| import sys |
| from typing import Annotated, Optional |
|
|
| import typer |
|
|
| from huggingface_hub.lfs import LFS_MULTIPART_UPLOAD_COMMAND |
|
|
| from ..utils import get_session, hf_raise_for_status, logging |
| from ..utils._lfs import SliceFileObj |
|
|
|
|
| logger = logging.get_logger(__name__) |
|
|
|
|
| def lfs_enable_largefiles( |
| path: Annotated[ |
| str, |
| typer.Argument( |
| help="Local path to repository you want to configure.", |
| ), |
| ], |
| ) -> None: |
| """ |
| Configure a local git repository to use the multipart transfer agent for large files. |
| |
| This command sets up git-lfs to use the custom multipart transfer agent |
| which enables efficient uploading of large files in chunks. |
| """ |
| local_path = os.path.abspath(path) |
| if not os.path.isdir(local_path): |
| print("This does not look like a valid git repo.") |
| raise typer.Exit(code=1) |
| subprocess.run( |
| "git config lfs.customtransfer.multipart.path hf".split(), |
| check=True, |
| cwd=local_path, |
| ) |
| subprocess.run( |
| f"git config lfs.customtransfer.multipart.args {LFS_MULTIPART_UPLOAD_COMMAND}".split(), |
| check=True, |
| cwd=local_path, |
| ) |
| print("Local repo set up for largefiles") |
|
|
|
|
| def write_msg(msg: dict): |
| """Write out the message in Line delimited JSON.""" |
| msg_str = json.dumps(msg) + "\n" |
| sys.stdout.write(msg_str) |
| sys.stdout.flush() |
|
|
|
|
| def read_msg() -> Optional[dict]: |
| """Read Line delimited JSON from stdin.""" |
| msg = json.loads(sys.stdin.readline().strip()) |
|
|
| if "terminate" in (msg.get("type"), msg.get("event")): |
| |
| return None |
|
|
| if msg.get("event") not in ("download", "upload"): |
| logger.critical("Received unexpected message") |
| sys.exit(1) |
|
|
| return msg |
|
|
|
|
| def lfs_multipart_upload() -> None: |
| """Internal git-lfs custom transfer agent for multipart uploads. |
| |
| This function implements the custom transfer protocol for git-lfs multipart uploads. |
| Handles chunked uploads of large files to Hugging Face Hub. |
| """ |
| |
| |
| |
| init_msg = json.loads(sys.stdin.readline().strip()) |
| if not (init_msg.get("event") == "init" and init_msg.get("operation") == "upload"): |
| write_msg({"error": {"code": 32, "message": "Wrong lfs init operation"}}) |
| sys.exit(1) |
|
|
| |
| |
| |
| |
| write_msg({}) |
|
|
| |
| |
| while True: |
| msg = read_msg() |
| if msg is None: |
| |
| |
| |
| |
| sys.exit(0) |
|
|
| oid = msg["oid"] |
| filepath = msg["path"] |
| completion_url = msg["action"]["href"] |
| header = msg["action"]["header"] |
| chunk_size = int(header.pop("chunk_size")) |
| presigned_urls: list[str] = list(header.values()) |
|
|
| |
| |
| |
| write_msg( |
| { |
| "event": "progress", |
| "oid": oid, |
| "bytesSoFar": 1, |
| "bytesSinceLast": 0, |
| } |
| ) |
|
|
| parts = [] |
| with open(filepath, "rb") as file: |
| for i, presigned_url in enumerate(presigned_urls): |
| with SliceFileObj( |
| file, |
| seek_from=i * chunk_size, |
| read_limit=chunk_size, |
| ) as data: |
| r = get_session().put(presigned_url, data=data) |
| hf_raise_for_status(r) |
| parts.append( |
| { |
| "etag": r.headers.get("etag"), |
| "partNumber": i + 1, |
| } |
| ) |
| |
| |
| write_msg( |
| { |
| "event": "progress", |
| "oid": oid, |
| "bytesSoFar": (i + 1) * chunk_size, |
| "bytesSinceLast": chunk_size, |
| } |
| ) |
|
|
| r = get_session().post( |
| completion_url, |
| json={ |
| "oid": oid, |
| "parts": parts, |
| }, |
| ) |
| hf_raise_for_status(r) |
|
|
| write_msg({"event": "complete", "oid": oid}) |
|
|