Spaces:
				
			
			
	
			
			
					
		Running
		
			on 
			
			CPU Upgrade
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
			on 
			
			CPU Upgrade
	File size: 4,223 Bytes
			
			| e7abd9e 23c96f8 e7abd9e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | import os
import shutil
import tempfile
import logging
from pathlib import Path
from huggingface_hub import HfApi, snapshot_download, upload_folder, create_repo
from dotenv import load_dotenv
# Configure source and destination usernames
SOURCE_USERNAME = "open-llm-leaderboard"
DESTINATION_USERNAME = "tfrere"
# Get the backend directory path
BACKEND_DIR = Path(__file__).parent.parent
ROOT_DIR = BACKEND_DIR.parent
# Load environment variables from .env file in root directory
load_dotenv(ROOT_DIR / ".env")
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(message)s'
)
logger = logging.getLogger(__name__)
# List of dataset names to sync
DATASET_NAMES = [
    "votes",
    "results",
    "requests",
    "contents",
    "official-providers",
]
# Build list of datasets with their source and destination paths
DATASETS = [
    (name, f"{SOURCE_USERNAME}/{name}", f"{DESTINATION_USERNAME}/{name}")
    for name in DATASET_NAMES
]
# Initialize Hugging Face API
api = HfApi()
def ensure_repo_exists(repo_id, token):
    """Ensure the repository exists, create it if it doesn't"""
    try:
        api.repo_info(repo_id=repo_id, repo_type="dataset")
        logger.info(f"β Repository {repo_id} already exists")
    except Exception:
        logger.info(f"Creating repository {repo_id}...")
        create_repo(
            repo_id=repo_id,
            repo_type="dataset",
            token=token,
            private=True
        )
        logger.info(f"β Repository {repo_id} created")
def process_dataset(dataset_info, token):
    """Process a single dataset"""
    name, source_dataset, destination_dataset = dataset_info
    try:
        logger.info(f"\nπ₯ Processing dataset: {name}")
        
        # Ensure destination repository exists
        ensure_repo_exists(destination_dataset, token)
        
        # Create a temporary directory for this dataset
        with tempfile.TemporaryDirectory() as temp_dir:
            try:
                # List files in source dataset
                logger.info(f"Listing files in {source_dataset}...")
                files = api.list_repo_files(source_dataset, repo_type="dataset")
                logger.info(f"Detected structure: {len(files)} files")
                
                # Download dataset
                logger.info(f"Downloading from {source_dataset}...")
                local_dir = snapshot_download(
                    repo_id=source_dataset,
                    repo_type="dataset",
                    local_dir=temp_dir,
                    token=token
                )
                logger.info(f"β Download complete")
                
                # Upload to destination while preserving structure
                logger.info(f"π€ Uploading to {destination_dataset}...")
                api.upload_folder(
                    folder_path=local_dir,
                    repo_id=destination_dataset,
                    repo_type="dataset",
                    token=token
                )
                logger.info(f"β
 {name} copied successfully!")
                return True
                
            except Exception as e:
                logger.error(f"β Error processing {name}: {str(e)}")
                return False
    except Exception as e:
        logger.error(f"β Error for {name}: {str(e)}")
        return False
def copy_datasets():
    try:
        logger.info("π Checking authentication...")
        # Get token from .env file
        token = os.getenv("HF_TOKEN")
        if not token:
            raise ValueError("HF_TOKEN not found in .env file")
        
        # Process datasets sequentially
        results = []
        for dataset_info in DATASETS:
            success = process_dataset(dataset_info, token)
            results.append((dataset_info[0], success))
            
        # Print final summary
        logger.info("\nπ Final summary:")
        for dataset, success in results:
            status = "β
 Success" if success else "β Failure"
            logger.info(f"{dataset}: {status}")
            
    except Exception as e:
        logger.error(f"β Global error: {str(e)}")
if __name__ == "__main__":
    copy_datasets()  | 
