|
import os import boto3 from dotenv import load_dotenv # 加载.env文件中的环境变量 load_dotenv() ACCOUNT_ID = os.getenv("S3_ACCOUNT_ID") ACCESS_KEY = os.getenv("S3_ACCESS_KEY") SECRET_KEY = os.getenv("S3_SECRET_KEY") BUCKET = "scrapydocs" s3 = boto3.client( "s3", endpoint_url=f"https://75f5ed467c14e39214f3a6f2a169f3d0.r2.cloudflarestorage.com/scrapydocs", aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, region_name="auto", ) def upload_folder_contents(folder_path, bucket, prefix=""): try: file_count = 0 for root, _, files in os.walk(folder_path): for file in files: full_path = os.path.join(root, file) key = f"{prefix}/{file}" if prefix else file try: s3.upload_file(full_path, bucket, key) print(f"✔ Successfully uploaded {key}") file_count += 1 except Exception as e: print(f"✖ Failed to upload {key}: {str(e)}") print(f"\nUpload complete. Total files processed: {file_count}") return file_count except Exception as e: print(f"✖ Fatal error: {str(e)}") return 0 if __name__ == "__main__": print("Starting S3 upload process...") folder_path = "D:/app/JupyterLab/crawl4ai/output/docs_scrapy" if not os.path.exists(folder_path): print(f"✖ Error: Folder not found at {folder_path}") else: print(f"Testing connection to {s3.meta.endpoint_url}") print(s3.list_buckets()) # 这会验证你的凭证是否有效 upload_folder_contents(folder_path, BUCKET) print("Process completed.") |