IsidoreSong's picture
Upload 8 files
479eb2f verified
raw
history blame contribute delete
No virus
2.09 kB
import io
import os
import boto3
from botocore.exceptions import NoCredentialsError
ACCESS_KEY = os.environ.get('ACCESS_KEY')
SECRET_KEY = os.environ.get('SECRET_KEY')
ENDPOINT = os.environ.get("ENDPOINT")
BUCKET_NAME = 'host'
def create_s3_client(access_key, secret_key, endpoint_url):
try:
s3_client = boto3.client(
's3',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
endpoint_url=endpoint_url
)
return s3_client
except NoCredentialsError:
print("Credentials not available")
return None
# 上传文件到 S3(或兼容 S3 的服务,如 IDrive e2)
def upload_file_stream_to_s3(file_stream, bucket_name, key):
s3_client = create_s3_client(ACCESS_KEY, SECRET_KEY, ENDPOINT)
file_stream.seek(0)
s3_client.upload_fileobj(
file_stream,
bucket_name,
key
)
# with open("output/output.pdf", 'wb') as f:
# file_stream.seek(0)
# f.write(file_stream.getvalue())
print(f"{key} has been uploaded to {bucket_name}")
return True
def download_file_stream_from_s3(bucket_name, key):
fileobj = io.BytesIO()
s3_client = create_s3_client(ACCESS_KEY, SECRET_KEY, ENDPOINT)
s3_client.download_fileobj(
bucket_name,
key,
fileobj,
)
return fileobj
# FILE_PATH = "src/assets/InvitationTemplate.pdf"
# OBJECT_NAME = 'test.pdf'
# ACCESS_KEY = os.environ.get('ACCESS_KEY')
# SECRET_KEY = os.environ.get('SECRET_KEY')
# ENDPOINT = os.environ.get("ENDPOINT")
# BUCKET_NAME = 'host'
# FILE_PATH = "src/assets/InvitationTemplate.pdf"
# OBJECT_NAME = 'test.pdf'
# # 创建 S3 客户端
# s3_client = create_s3_client(ACCESS_KEY, SECRET_KEY, ENDPOINT)
# # 上传文件
# if s3_client:
# # upload_file_to_s3(s3_client, FILE_PATH, BUCKET_NAME, OBJECT_NAME)
# fileobj = download_file_stream_from_s3(s3_client, BUCKET_NAME, OBJECT_NAME)
# with open(OBJECT_NAME, 'wb') as f:
# f.write(fileobj.getvalue())