import io import os import boto3 from botocore.exceptions import NoCredentialsError ACCESS_KEY = os.environ.get('ACCESS_KEY') SECRET_KEY = os.environ.get('SECRET_KEY') ENDPOINT = os.environ.get("ENDPOINT") BUCKET_NAME = 'host' def create_s3_client(access_key, secret_key, endpoint_url): try: s3_client = boto3.client( 's3', aws_access_key_id=access_key, aws_secret_access_key=secret_key, endpoint_url=endpoint_url ) return s3_client except NoCredentialsError: print("Credentials not available") return None # 上传文件到 S3(或兼容 S3 的服务,如 IDrive e2) def upload_file_stream_to_s3(file_stream, bucket_name, key): s3_client = create_s3_client(ACCESS_KEY, SECRET_KEY, ENDPOINT) file_stream.seek(0) s3_client.upload_fileobj( file_stream, bucket_name, key ) # with open("output/output.pdf", 'wb') as f: # file_stream.seek(0) # f.write(file_stream.getvalue()) print(f"{key} has been uploaded to {bucket_name}") return True def download_file_stream_from_s3(bucket_name, key): fileobj = io.BytesIO() s3_client = create_s3_client(ACCESS_KEY, SECRET_KEY, ENDPOINT) s3_client.download_fileobj( bucket_name, key, fileobj, ) return fileobj # FILE_PATH = "src/assets/InvitationTemplate.pdf" # OBJECT_NAME = 'test.pdf' # ACCESS_KEY = os.environ.get('ACCESS_KEY') # SECRET_KEY = os.environ.get('SECRET_KEY') # ENDPOINT = os.environ.get("ENDPOINT") # BUCKET_NAME = 'host' # FILE_PATH = "src/assets/InvitationTemplate.pdf" # OBJECT_NAME = 'test.pdf' # # 创建 S3 客户端 # s3_client = create_s3_client(ACCESS_KEY, SECRET_KEY, ENDPOINT) # # 上传文件 # if s3_client: # # upload_file_to_s3(s3_client, FILE_PATH, BUCKET_NAME, OBJECT_NAME) # fileobj = download_file_stream_from_s3(s3_client, BUCKET_NAME, OBJECT_NAME) # with open(OBJECT_NAME, 'wb') as f: # f.write(fileobj.getvalue())