| | |
| | """ |
| | Hugging Face Data Persistence Tool |
| | |
| | This module provides functionality for interacting with Hugging Face Dataset, including: |
| | - Uploading archive files |
| | - Managing archive file count |
| | - LFS storage cleanup |
| | - Downloading and restoring archive files |
| | - Listing available archive files |
| | """ |
| |
|
| | import sys |
| | import os |
| | import traceback |
| | import time |
| | import tempfile |
| | import subprocess |
| | import argparse |
| | from pathlib import Path |
| | from huggingface_hub import HfApi |
| |
|
| | |
| | os.environ['TZ'] = 'UTC' |
| | time.tzset() |
| |
|
| | |
| | cache_dir = '/home/user/.cache/huggingface' |
| | os.makedirs(cache_dir, exist_ok=True) |
| | os.environ['HF_HOME'] = cache_dir |
| | os.environ['HUGGINGFACE_HUB_CACHE'] = cache_dir |
| |
|
| |
|
| | class HFPersistenceManager: |
| | """Hugging Face Data Persistence Manager""" |
| | |
| | def __init__(self, token: str, dataset_id: str): |
| | """ |
| | Initialize the manager |
| | |
| | Args: |
| | token: Hugging Face access token |
| | dataset_id: Dataset ID |
| | """ |
| | self.token = token |
| | self.dataset_id = dataset_id |
| | self.api = None |
| | |
| | |
| | os.environ['HUGGING_FACE_HUB_TOKEN'] = token |
| | |
| | def _get_api(self) -> HfApi: |
| | """Get HfApi instance""" |
| | if self.api is None: |
| | self.api = HfApi() |
| | return self.api |
| | |
| | def upload_archive(self, local_path: str, remote_path: str) -> bool: |
| | """ |
| | Upload archive file to Hugging Face Dataset |
| | |
| | Args: |
| | local_path: Local file path |
| | remote_path: Remote file path |
| | |
| | Returns: |
| | bool: Whether upload was successful |
| | """ |
| | try: |
| | api = self._get_api() |
| | |
| | |
| | try: |
| | api.repo_info(repo_id=self.dataset_id, repo_type='dataset') |
| | print(f'β Dataset exists: {self.dataset_id}') |
| | except Exception: |
| | print(f'π Dataset does not exist, creating private dataset: {self.dataset_id}') |
| | api.create_repo(repo_id=self.dataset_id, repo_type='dataset', private=True) |
| | print(f'β Private dataset created: {self.dataset_id}') |
| | |
| | api.upload_file( |
| | path_or_fileobj=local_path, |
| | path_in_repo=remote_path, |
| | repo_id=self.dataset_id, |
| | repo_type='dataset' |
| | ) |
| | print(f'β Archive uploaded successfully: {remote_path}') |
| | return True |
| | except Exception as e: |
| | print(f'β Archive upload failed: {str(e)}') |
| | traceback.print_exc() |
| | return False |
| | |
| | def check_and_cleanup_before_upload(self, archive_prefix: str, archive_extension: str, max_files: int) -> bool: |
| | """ |
| | Check if adding new archive would exceed limit, if so, force recreate dataset first |
| | |
| | Args: |
| | archive_prefix: Archive file prefix |
| | archive_extension: Archive file extension |
| | max_files: Maximum number of files to keep |
| | |
| | Returns: |
| | bool: Whether operation was successful |
| | """ |
| | try: |
| | api = self._get_api() |
| | files = api.list_repo_files(repo_id=self.dataset_id, repo_type='dataset') |
| | archive_files = [f for f in files if f.startswith(archive_prefix) and f.endswith(f'.{archive_extension}')] |
| | archive_files.sort(reverse=True) |
| | |
| | |
| | if len(archive_files) + 1 > max_files: |
| | print(f'π¨ Adding new archive would exceed limit ({len(archive_files)} + 1 > {max_files})') |
| | print('π¨ Starting force dataset recreation to clean up old archives') |
| | |
| | |
| | files_to_keep = archive_files[:max_files-1] if max_files > 1 else [] |
| | |
| | if not self._cleanup_lfs_recreate(api, files_to_keep, max_files, archive_prefix, archive_extension): |
| | return False |
| | |
| | print(f'β Dataset recreation completed, ready for new archive upload') |
| | else: |
| | print(f'β Archive count check passed ({len(archive_files)} + 1 <= {max_files})') |
| | |
| | return True |
| | except Exception as e: |
| | print(f'β Pre-upload cleanup failed: {str(e)}') |
| | return False |
| | |
| |
|
| | def _cleanup_lfs_recreate(self, api: HfApi, files_to_keep: list, max_files: int, |
| | archive_prefix: str, archive_extension: str) -> bool: |
| | """Force delete and recreate dataset to clean LFS storage""" |
| | print('π¨ WARNING: Force recreate mode enabled') |
| | print('π This will delete and recreate the entire dataset to clean LFS storage') |
| | |
| | |
| | remaining_files = files_to_keep |
| | if remaining_files: |
| | print(f'π¦ Backing up {len(remaining_files)} files for restoration...') |
| | backup_data = [] |
| | for file_name in remaining_files: |
| | try: |
| | |
| | file_path = api.hf_hub_download( |
| | repo_id=self.dataset_id, |
| | filename=file_name, |
| | repo_type='dataset' |
| | ) |
| | with open(file_path, 'rb') as f: |
| | backup_data.append((file_name, f.read())) |
| | print(f'β Backed up: {file_name}') |
| | except Exception as e: |
| | print(f'β Backup failed for {file_name}: {str(e)}') |
| | |
| | |
| | try: |
| | print('ποΈ Deleting dataset to clean LFS storage...') |
| | api.delete_repo(repo_id=self.dataset_id, repo_type='dataset') |
| | print('β Dataset deleted successfully') |
| | |
| | |
| | time.sleep(10) |
| | |
| | |
| | print('π¨ Recreating dataset...') |
| | api.create_repo(repo_id=self.dataset_id, repo_type='dataset', exist_ok=True, private=True) |
| | print('β Dataset recreated successfully') |
| | |
| | |
| | print('π€ Restoring backed up files...') |
| | for file_name, file_content in backup_data: |
| | try: |
| | |
| | with tempfile.NamedTemporaryFile(delete=False) as temp_file: |
| | temp_file.write(file_content) |
| | temp_path = temp_file.name |
| | |
| | |
| | api.upload_file( |
| | path_or_fileobj=temp_path, |
| | path_in_repo=file_name, |
| | repo_id=self.dataset_id, |
| | repo_type='dataset' |
| | ) |
| | |
| | |
| | os.unlink(temp_path) |
| | print(f'β Restored: {file_name}') |
| | except Exception as e: |
| | print(f'β Restore failed for {file_name}: {str(e)}') |
| | |
| | print('π Dataset recreation and LFS cleanup completed!') |
| | return True |
| | |
| | except Exception as e: |
| | print(f'β Dataset recreation failed: {str(e)}') |
| | print('β οΈ Manual intervention may be required') |
| | return False |
| | else: |
| | print('π No files to preserve, proceeding with dataset cleanup') |
| | |
| | try: |
| | print('ποΈ Deleting dataset to clean LFS storage...') |
| | api.delete_repo(repo_id=self.dataset_id, repo_type='dataset') |
| | print('β Dataset deleted successfully') |
| | |
| | |
| | time.sleep(10) |
| | |
| | |
| | print('π¨ Recreating dataset...') |
| | api.create_repo(repo_id=self.dataset_id, repo_type='dataset', exist_ok=True, private=True) |
| | print('β Dataset recreated successfully') |
| | |
| | print('π Dataset recreation and LFS cleanup completed!') |
| | return True |
| | |
| | except Exception as e: |
| | print(f'β Dataset recreation failed: {str(e)}') |
| | print('β οΈ Manual intervention may be required') |
| | return False |
| | |
| | def list_available_archives(self, archive_prefix: str, archive_extension: str) -> tuple[bool, str]: |
| | """ |
| | List available archive files |
| | |
| | Args: |
| | archive_prefix: Archive file prefix |
| | archive_extension: Archive file extension |
| | |
| | Returns: |
| | tuple: (success status, latest archive filename) |
| | """ |
| | try: |
| | api = self._get_api() |
| | files = api.list_repo_files(self.dataset_id, repo_type='dataset') |
| | archive_files = [f for f in files if f.startswith(archive_prefix) and f.endswith(f'.{archive_extension}')] |
| | archive_files.sort(reverse=True) |
| | |
| | if archive_files: |
| | print('Available archive list:') |
| | for i, archive in enumerate(archive_files, 1): |
| | print(f' {i}. {archive}') |
| | |
| | print(f'LATEST_BACKUP:{archive_files[0]}') |
| | return True, archive_files[0] |
| | else: |
| | print('No archive files found') |
| | return False, "" |
| | except Exception as e: |
| | print(f'Failed to get archive list: {str(e)}') |
| | traceback.print_exc() |
| | return False, "" |
| | |
| | def restore_from_archive(self, archive_name: str, restore_path: str) -> bool: |
| | """ |
| | Restore archive from Hugging Face Dataset |
| | |
| | Args: |
| | archive_name: Archive filename |
| | restore_path: Restore path |
| | |
| | Returns: |
| | bool: Whether restoration was successful |
| | """ |
| | try: |
| | api = self._get_api() |
| | |
| | |
| | download_dir = '/home/user/download' |
| | os.makedirs(download_dir, exist_ok=True) |
| |
|
| | |
| | print(f'Downloading archive: {archive_name}') |
| | local_path = api.hf_hub_download( |
| | repo_id=self.dataset_id, |
| | filename=archive_name, |
| | repo_type='dataset', |
| | local_dir=download_dir |
| | ) |
| |
|
| | |
| | print(f'Extracting archive to: {restore_path}') |
| |
|
| | |
| | |
| | env = os.environ.copy() |
| | env['TZ'] = 'UTC' |
| |
|
| | extract_cmd = [ |
| | 'tar', '-xzf', local_path, '-C', restore_path, |
| | '--warning=no-timestamp', |
| | '--warning=no-unknown-keyword', |
| | '--no-same-owner', |
| | '--no-same-permissions', |
| | '--touch' |
| | ] |
| |
|
| | result = subprocess.run(extract_cmd, capture_output=True, text=True, env=env) |
| |
|
| | if result.returncode == 0: |
| | print(f'β Archive restored successfully: {archive_name}') |
| | print('β Timestamps normalized to UTC timezone') |
| |
|
| | |
| | os.remove(local_path) |
| | return True |
| | else: |
| | print(f'β Archive extraction failed with return code: {result.returncode}') |
| | if result.stderr: |
| | print(f'Error output: {result.stderr}') |
| | return False |
| |
|
| | except Exception as e: |
| | print(f'β Archive restoration failed: {str(e)}') |
| | traceback.print_exc() |
| | return False |
| |
|
| |
|
| | def main(): |
| | """Command line entry point""" |
| | parser = argparse.ArgumentParser(description='Hugging Face Data Persistence Tool') |
| | parser.add_argument('action', choices=['upload', 'list', 'restore'], |
| | help='Action to perform') |
| | parser.add_argument('--token', required=True, help='Hugging Face access token') |
| | parser.add_argument('--dataset-id', required=True, help='Dataset ID') |
| | parser.add_argument('--archive-file', help='Archive file path (for upload)') |
| | parser.add_argument('--filename', help='Remote filename (for upload)') |
| | parser.add_argument('--archive-prefix', default='backup', help='Archive file prefix') |
| | parser.add_argument('--archive-extension', default='tar.gz', help='Archive file extension') |
| | parser.add_argument('--max-archives', type=int, default=5, help='Maximum number of archives to keep') |
| | parser.add_argument('--archive-name', help='Archive name to restore (for restore)') |
| | parser.add_argument('--restore-path', default='./', help='Restore path (for restore)') |
| | |
| | args = parser.parse_args() |
| | |
| | manager = HFPersistenceManager(args.token, args.dataset_id) |
| | |
| | if args.action == 'upload': |
| | if not args.archive_file or not args.filename: |
| | print('β upload action requires --archive-file and --filename parameters') |
| | sys.exit(1) |
| | |
| | |
| | success = manager.check_and_cleanup_before_upload(args.archive_prefix, args.archive_extension, args.max_archives) |
| | if success: |
| | success = manager.upload_archive(args.archive_file, args.filename) |
| | |
| | sys.exit(0 if success else 1) |
| | |
| | elif args.action == 'list': |
| | success, latest = manager.list_available_archives(args.archive_prefix, args.archive_extension) |
| | sys.exit(0 if success else 1) |
| | |
| | elif args.action == 'restore': |
| | if not args.archive_name: |
| | print('β restore action requires --archive-name parameter') |
| | sys.exit(1) |
| | |
| | success = manager.restore_from_archive(args.archive_name, args.restore_path) |
| | sys.exit(0 if success else 1) |
| |
|
| |
|
| | if __name__ == '__main__': |
| | main() |