Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import fsspec | |
import pyarrow as pa | |
# pyarrow needs the initialization from this import | |
import pyarrow.dataset # pyright: ignore | |
import typer | |
from pyarrow.lib import ArrowInvalid | |
from rich.progress import track | |
def is_valid_arrow_file(path: str): | |
try: | |
dataset = pa.dataset.dataset(path, format="arrow") | |
return True | |
except ArrowInvalid: | |
return False | |
app = typer.Typer() | |
S3_PREFIX = "s3://" | |
def get_fs(path: str, s3_profile: str | None = None) -> fsspec.AbstractFileSystem: | |
if path.startswith("s3://"): | |
if s3_profile is None: | |
return fsspec.filesystem("s3") | |
else: | |
return fsspec.filesystem("s3", profile=s3_profile) | |
else: | |
return fsspec.filesystem("file") | |
def print_local_to_delete( | |
blob_dir: str, local_dirs: list[str], s3_profile: str = "blt" | |
): | |
for s in local_dirs: | |
assert s.endswith("/"), "Dirs must end with /" | |
assert blob_dir.endswith("/"), "Dirs must end with /" | |
blob_fs = fsspec.filesystem("s3", profile=s3_profile) | |
blob_files = blob_fs.find(blob_dir) | |
for f in track(blob_files): | |
size = blob_fs.info(f)["Size"] | |
if not f.lower().endswith(".complete"): | |
assert size != 0, f"Size was invalidly zero for {f}" | |
blob_relative_paths = {f[len(blob_dir) - len(S3_PREFIX) :] for f in blob_files} | |
local_fs = fsspec.filesystem("file") | |
files_to_delete = [] | |
for local_dir in local_dirs: | |
local_files = local_fs.find(local_dir) | |
for f in local_files: | |
relative_path = f[len(local_dir) :] | |
if relative_path in blob_relative_paths and not os.path.islink(f): | |
files_to_delete.append(f) | |
print(len(files_to_delete)) | |
with open("/tmp/files_to_delete.txt", "w") as f: | |
for file in files_to_delete: | |
f.write(f"{file}\n") | |
def compare_local_to_blob( | |
source_dirs: list[str], | |
dst_dir: str, | |
s3_profile: str = "blt", | |
print_sizes: bool = False, | |
): | |
for s in source_dirs: | |
assert s.endswith("/"), "Dirs must end with /" | |
assert dst_dir.endswith("/"), "Dirs must end with /" | |
assert len(source_dirs) != 0 | |
assert dst_dir.startswith("s3://") | |
local_fs = fsspec.filesystem("file") | |
dst_fs = fsspec.filesystem("s3", profile=s3_profile) | |
source_to_files = {} | |
source_file_to_size = {} | |
all_local_files = set() | |
for s in source_dirs: | |
skipped = [] | |
if s not in source_to_files: | |
source_to_files[s] = [] | |
for f in local_fs.find(s): | |
if os.path.islink(f): | |
continue | |
if f.endswith(".COMPLETE") or f.endswith(".complete"): | |
is_complete_file = True | |
assert os.path.getsize(f) == 0, ".COMPLETE files should be empty" | |
else: | |
is_complete_file = False | |
if not is_complete_file and os.path.getsize(f) == 0: | |
skipped.append(f) | |
continue | |
if f.endswith(".arrow"): | |
if not is_valid_arrow_file(f): | |
skipped.append(f) | |
continue | |
file_without_prefix = f[len(s) :] | |
if file_without_prefix not in source_file_to_size: | |
source_file_to_size[file_without_prefix] = os.path.getsize(f) | |
else: | |
source_file_to_size[file_without_prefix] = max( | |
source_file_to_size[file_without_prefix], os.path.getsize(f) | |
) | |
source_to_files[s].append(f) | |
all_local_files.add(file_without_prefix) | |
print(s, len(source_to_files[s]), "skipped", len(skipped), skipped[:10]) | |
dst_files = dst_fs.find(dst_dir) | |
print(dst_dir, len(dst_files)) | |
dst_file_to_size = {} | |
dst_file_set = set() | |
for f in dst_files: | |
dst_file_without_prefix = f[len(dst_dir) - len(S3_PREFIX) :] | |
dst_file_set.add(dst_file_without_prefix) | |
dst_file_to_size[dst_file_without_prefix] = dst_fs.size(f) | |
diff = all_local_files.symmetric_difference(dst_file_set) | |
print("Local files", len(all_local_files)) | |
print("DST Files", len(dst_file_set)) | |
print("Symmetric difference", len(diff)) | |
dst_only_files = dst_file_set - all_local_files | |
print("DST only", len(dst_only_files), list(dst_only_files)[:10]) | |
all_files = dst_file_set | all_local_files | |
print("Check that files match") | |
size_success = True | |
for f in sorted(all_files): | |
if f in source_file_to_size and f in dst_file_to_size: | |
if source_file_to_size[f] != dst_file_to_size[f]: | |
size_success = False | |
print( | |
f"Mismatch file size for {f}, Local: {source_file_to_size[f]} Blob: {dst_file_to_size[f]}" | |
) | |
else: | |
if print_sizes: | |
print(f"Matching file size: {dst_file_to_size[f]} for {f}") | |
elif f not in source_file_to_size: | |
size_success = False | |
print(f"Missing file in source: {f}") | |
elif f not in dst_file_to_size: | |
size_success = False | |
print(f"missing file in dst: {f}") | |
else: | |
raise ValueError("Unexpected to be missing file in src and dst") | |
if size_success: | |
print("All files pass size check") | |
else: | |
raise ValueError("At least one file failed size comparison check") | |
if __name__ == "__main__": | |
app() | |