import os import shutil import hashlib import time import base64 LOGS_FOLDER = '/content/Applio-RVC-Fork/logs' WEIGHTS_FOLDER = '/content/Applio-RVC-Fork/weights' GOOGLE_DRIVE_PATH = '/content/drive/MyDrive/RVC_Backup' def import_google_drive_backup(): print("Importing Google Drive backup...") weights_exist = False for root, dirs, files in os.walk(GOOGLE_DRIVE_PATH): for filename in files: filepath = os.path.join(root, filename) if os.path.isfile(filepath) and not filepath.startswith(os.path.join(GOOGLE_DRIVE_PATH, 'weights')): backup_filepath = os.path.join(LOGS_FOLDER, os.path.relpath(filepath, GOOGLE_DRIVE_PATH)) backup_folderpath = os.path.dirname(backup_filepath) if not os.path.exists(backup_folderpath): os.makedirs(backup_folderpath) print(f'Created backup folder: {backup_folderpath}', flush=True) shutil.copy2(filepath, backup_filepath) # copy file with metadata print(f'Imported file from Google Drive backup: {filename}') elif filepath.startswith(os.path.join(GOOGLE_DRIVE_PATH, 'weights')) and filename.endswith('.pth'): weights_exist = True weights_filepath = os.path.join(WEIGHTS_FOLDER, os.path.relpath(filepath, os.path.join(GOOGLE_DRIVE_PATH, 'weights'))) weights_folderpath = os.path.dirname(weights_filepath) if not os.path.exists(weights_folderpath): os.makedirs(weights_folderpath) print(f'Created weights folder: {weights_folderpath}', flush=True) shutil.copy2(filepath, weights_filepath) # copy file with metadata print(f'Imported file from weights: {filename}') if weights_exist: print("Copied weights from Google Drive backup to local weights folder.") else: print("No weights found in Google Drive backup.") print("Google Drive backup import completed.") def get_md5_hash(file_path): hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def copy_weights_folder_to_drive(): destination_folder = os.path.join(GOOGLE_DRIVE_PATH, 'weights') try: if not os.path.exists(destination_folder): os.makedirs(destination_folder) num_copied = 0 for filename in os.listdir(WEIGHTS_FOLDER): if filename.endswith('.pth'): source_file = os.path.join(WEIGHTS_FOLDER, filename) destination_file = os.path.join(destination_folder, filename) if not os.path.exists(destination_file): shutil.copy2(source_file, destination_file) num_copied += 1 print(f"Copied {filename} to Google Drive!") if num_copied == 0: print("No new finished models found for copying.") else: print(f"Finished copying {num_copied} files to Google Drive!") except Exception as e: print(f"An error occurred while copying weights: {str(e)}") # You can log the error or take appropriate actions here. def backup_files(): print("\nStarting backup loop...") last_backup_timestamps_path = os.path.join(LOGS_FOLDER, 'last_backup_timestamps.txt') fully_updated = False # boolean to track if all files are up to date while True: try: updated = False # flag to check if any files were updated last_backup_timestamps = {} try: with open(last_backup_timestamps_path, 'r') as f: last_backup_timestamps = dict(line.strip().split(':') for line in f) except FileNotFoundError: pass # File does not exist yet, which is fine for root, dirs, files in os.walk(LOGS_FOLDER): for filename in files: if filename != 'last_backup_timestamps.txt': filepath = os.path.join(root, filename) if os.path.isfile(filepath): backup_filepath = os.path.join(GOOGLE_DRIVE_PATH, os.path.relpath(filepath, LOGS_FOLDER)) backup_folderpath = os.path.dirname(backup_filepath) if not os.path.exists(backup_folderpath): os.makedirs(backup_folderpath) print(f'Created backup folder: {backup_folderpath}', flush=True) # check if file has changed since last backup last_backup_timestamp = last_backup_timestamps.get(filepath) current_timestamp = os.path.getmtime(filepath) if last_backup_timestamp is None or float(last_backup_timestamp) < current_timestamp: shutil.copy2(filepath, backup_filepath) # copy file with metadata last_backup_timestamps[filepath] = str(current_timestamp) # update last backup timestamp if last_backup_timestamp is None: print(f'Backed up file: {filename}') else: print(f'Updating backed up file: {filename}') updated = True fully_updated = False # if a file is updated, all files are not up to date # check if any files were deleted in Colab and delete them from the backup drive for filepath in list(last_backup_timestamps.keys()): if not os.path.exists(filepath): backup_filepath = os.path.join(GOOGLE_DRIVE_PATH, os.path.relpath(filepath, LOGS_FOLDER)) if os.path.exists(backup_filepath): os.remove(backup_filepath) print(f'Deleted file: {filepath}') del last_backup_timestamps[filepath] updated = True fully_updated = False # if a file is deleted, all files are not up to date if not updated and not fully_updated: print("Files are up to date.") fully_updated = True # if all files are up to date, set the boolean to True copy_weights_folder_to_drive() sleep_time = 15 else: sleep_time = 0.1 with open(last_backup_timestamps_path, 'w') as f: for filepath, timestamp in last_backup_timestamps.items(): f.write(f'{filepath}:{timestamp}\n') time.sleep(sleep_time) # wait for 15 seconds before checking again, or 0.1s if not fully up to date to speed up backups except Exception as e: print(f"An error occurred: {str(e)}") # You can log the error or take appropriate actions here.