import boto3 import os import tempfile from utility import aws_access_key_id, aws_secret_access_key,terminal_print s3 = boto3.client( 's3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key ) # post or update a file to s3 @terminal_print def upload_file(path, bucket, object_name=None): """Upload a file to an S3 bucket :param path: path to file for upload :param bucket: Bucket to upload to :param object_name: S3 object name. If not specified then file_name is used :return: True if file was uploaded, else False """ file_name = os.path.basename(path) if object_name is None: object_name = file_name try: res = s3.upload_file(path, bucket, object_name) except Exception as e: print(e) return False return res @terminal_print def upload_fileobj(file_obj, bucket, object_name=None): ''' Upload a file object to an S3 bucket :param file_obj: File object to upload :param bucket: Bucket to upload to :param object_name: S3 object name. If not specified then file_name is used :return: True if file was uploaded, else False ''' if object_name is None: object_name = file_obj.name try: res = s3.upload_fileobj(file_obj, bucket, object_name) except Exception as e: print(e) return e return res # get a file from s3 @terminal_print def download_file(bucket, object_name, file_name=None): """Download a file from an S3 bucket :param bucket: Bucket to download from :param file_name: File to download :param object_name: S3 object name. If not specified then file_name is used :return: True if file was downloaded, else False """ if file_name is None: file_name = object_name try: s3.download_file(bucket, object_name, file_name) except Exception as e: print(e) return False return True # download a file object from s3 @terminal_print def download_fileobj(bucket, object_name, temp_obj=None): ''' Download a file object from an S3 bucket :param file_name: File to download :param bucket: Bucket to download from :param object_name: S3 object name. If not specified then file_name is used :return: True if file was downloaded, else False ''' if temp_obj is None: temp_obj = tempfile.TemporaryFile() try: s3.download_fileobj(bucket, object_name, temp_obj) except Exception as e: print(e) return False return temp_obj # delete a file from s3 @terminal_print def delete_file(bucket, object_name): """Delete a file from an S3 bucket :param bucket: Bucket to delete from :param object_name: S3 object name. :return: True if file was deleted, else False """ try: response = s3.delete_object(Bucket=bucket, Key=object_name) except Exception as e: print(e) return False return response # list all files in a bucket @terminal_print def list_files(bucket): """List files in a bucket :param bucket: Bucket to list from :return: List of files in bucket """ try: response = s3.list_objects_v2(Bucket=bucket) except Exception as e: print(e) return False return response['Contents']