import os | |
import zipfile | |
from pathlib import Path | |
def extract_multipart_zip(): | |
# 设置工作目录和文件路径 | |
target_dir = '/data/workspace/nips_2025_database/assets' | |
base_dir = '/data/workspace/nips_2025_database/assets' | |
# 创建目标文件夹 | |
# os.makedirs(target_dir, exist_ok=True) | |
try: | |
# 获取所有分卷文件 | |
zip_parts = sorted([f for f in os.listdir(base_dir) if f.startswith('lvm_2032fbx.zip.')]) | |
if not zip_parts: | |
print("错误:未找到分卷文件") | |
return | |
print(f"找到以下分卷文件:") | |
for part in zip_parts: | |
print(f"- {part}") | |
# 合并所有分卷到一个临时文件 | |
temp_zip = os.path.join(base_dir, 'temp_complete.zip') | |
print("\n开始合并分卷文件...") | |
with open(temp_zip, 'wb') as outfile: | |
for part in zip_parts: | |
part_path = os.path.join(base_dir, part) | |
print(f"正在处理: {part}") | |
with open(part_path, 'rb') as infile: | |
outfile.write(infile.read()) | |
# 解压合并后的文件 | |
print("\n开始解压文件...") | |
with zipfile.ZipFile(temp_zip, 'r') as zip_ref: | |
zip_ref.extractall(target_dir) | |
# 清理临时文件 | |
os.remove(temp_zip) | |
print(f"\n解压完成!文件已保存到: {target_dir}") | |
except Exception as e: | |
print(f"解压过程中出现错误:{str(e)}") | |
if os.path.exists(temp_zip): | |
os.remove(temp_zip) | |
if __name__ == "__main__": | |
print("开始处理分卷文件...") | |
extract_multipart_zip() |