|
import os |
|
import shutil |
|
import hashlib |
|
import time |
|
import base64 |
|
|
|
|
|
|
|
|
|
LOGS_FOLDER = '/content/Applio-RVC-Fork/logs' |
|
WEIGHTS_FOLDER = '/content/Applio-RVC-Fork/weights' |
|
GOOGLE_DRIVE_PATH = '/content/drive/MyDrive/RVC_Backup' |
|
|
|
def import_google_drive_backup(): |
|
print("Importing Google Drive backup...") |
|
weights_exist = False |
|
for root, dirs, files in os.walk(GOOGLE_DRIVE_PATH): |
|
for filename in files: |
|
filepath = os.path.join(root, filename) |
|
if os.path.isfile(filepath) and not filepath.startswith(os.path.join(GOOGLE_DRIVE_PATH, 'weights')): |
|
backup_filepath = os.path.join(LOGS_FOLDER, os.path.relpath(filepath, GOOGLE_DRIVE_PATH)) |
|
backup_folderpath = os.path.dirname(backup_filepath) |
|
if not os.path.exists(backup_folderpath): |
|
os.makedirs(backup_folderpath) |
|
print(f'Created backup folder: {backup_folderpath}', flush=True) |
|
shutil.copy2(filepath, backup_filepath) |
|
print(f'Imported file from Google Drive backup: {filename}') |
|
elif filepath.startswith(os.path.join(GOOGLE_DRIVE_PATH, 'weights')) and filename.endswith('.pth'): |
|
weights_exist = True |
|
weights_filepath = os.path.join(WEIGHTS_FOLDER, os.path.relpath(filepath, os.path.join(GOOGLE_DRIVE_PATH, 'weights'))) |
|
weights_folderpath = os.path.dirname(weights_filepath) |
|
if not os.path.exists(weights_folderpath): |
|
os.makedirs(weights_folderpath) |
|
print(f'Created weights folder: {weights_folderpath}', flush=True) |
|
shutil.copy2(filepath, weights_filepath) |
|
print(f'Imported file from weights: {filename}') |
|
if weights_exist: |
|
print("Copied weights from Google Drive backup to local weights folder.") |
|
else: |
|
print("No weights found in Google Drive backup.") |
|
print("Google Drive backup import completed.") |
|
|
|
def get_md5_hash(file_path): |
|
hash_md5 = hashlib.md5() |
|
with open(file_path, "rb") as f: |
|
for chunk in iter(lambda: f.read(4096), b""): |
|
hash_md5.update(chunk) |
|
return hash_md5.hexdigest() |
|
|
|
def copy_weights_folder_to_drive(): |
|
destination_folder = os.path.join(GOOGLE_DRIVE_PATH, 'weights') |
|
try: |
|
if not os.path.exists(destination_folder): |
|
os.makedirs(destination_folder) |
|
|
|
num_copied = 0 |
|
for filename in os.listdir(WEIGHTS_FOLDER): |
|
if filename.endswith('.pth'): |
|
source_file = os.path.join(WEIGHTS_FOLDER, filename) |
|
destination_file = os.path.join(destination_folder, filename) |
|
if not os.path.exists(destination_file): |
|
shutil.copy2(source_file, destination_file) |
|
num_copied += 1 |
|
print(f"Copied {filename} to Google Drive!") |
|
|
|
if num_copied == 0: |
|
print("No new finished models found for copying.") |
|
else: |
|
print(f"Finished copying {num_copied} files to Google Drive!") |
|
|
|
except Exception as e: |
|
print(f"An error occurred while copying weights: {str(e)}") |
|
|
|
|
|
def backup_files(): |
|
print("\nStarting backup loop...") |
|
last_backup_timestamps_path = os.path.join(LOGS_FOLDER, 'last_backup_timestamps.txt') |
|
fully_updated = False |
|
|
|
while True: |
|
try: |
|
updated = False |
|
last_backup_timestamps = {} |
|
|
|
try: |
|
with open(last_backup_timestamps_path, 'r') as f: |
|
last_backup_timestamps = dict(line.strip().split(':') for line in f) |
|
except FileNotFoundError: |
|
pass |
|
|
|
for root, dirs, files in os.walk(LOGS_FOLDER): |
|
for filename in files: |
|
if filename != 'last_backup_timestamps.txt': |
|
filepath = os.path.join(root, filename) |
|
if os.path.isfile(filepath): |
|
backup_filepath = os.path.join(GOOGLE_DRIVE_PATH, os.path.relpath(filepath, LOGS_FOLDER)) |
|
backup_folderpath = os.path.dirname(backup_filepath) |
|
if not os.path.exists(backup_folderpath): |
|
os.makedirs(backup_folderpath) |
|
print(f'Created backup folder: {backup_folderpath}', flush=True) |
|
|
|
last_backup_timestamp = last_backup_timestamps.get(filepath) |
|
current_timestamp = os.path.getmtime(filepath) |
|
if last_backup_timestamp is None or float(last_backup_timestamp) < current_timestamp: |
|
shutil.copy2(filepath, backup_filepath) |
|
last_backup_timestamps[filepath] = str(current_timestamp) |
|
if last_backup_timestamp is None: |
|
print(f'Backed up file: {filename}') |
|
else: |
|
print(f'Updating backed up file: {filename}') |
|
updated = True |
|
fully_updated = False |
|
|
|
|
|
for filepath in list(last_backup_timestamps.keys()): |
|
if not os.path.exists(filepath): |
|
backup_filepath = os.path.join(GOOGLE_DRIVE_PATH, os.path.relpath(filepath, LOGS_FOLDER)) |
|
if os.path.exists(backup_filepath): |
|
os.remove(backup_filepath) |
|
print(f'Deleted file: {filepath}') |
|
del last_backup_timestamps[filepath] |
|
updated = True |
|
fully_updated = False |
|
|
|
if not updated and not fully_updated: |
|
print("Files are up to date.") |
|
fully_updated = True |
|
copy_weights_folder_to_drive() |
|
sleep_time = 15 |
|
else: |
|
sleep_time = 0.1 |
|
|
|
with open(last_backup_timestamps_path, 'w') as f: |
|
for filepath, timestamp in last_backup_timestamps.items(): |
|
f.write(f'{filepath}:{timestamp}\n') |
|
|
|
time.sleep(sleep_time) |
|
|
|
except Exception as e: |
|
print(f"An error occurred: {str(e)}") |
|
|
|
|