import boto3 from botocore.exceptions import NoCredentialsError from io import BytesIO import requests AWS_KEY = 'AKIAR6R7JAWA2GSZ4BOY' AWS_SECRET = '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' AWS_BUCKET = 'cloudko.auto.staging.test' AWS_PREFIX = 'koauto/livetouch-bg/' AWS_UPLOAD_URL = 'https://s3.amazonaws.com/' + AWS_BUCKET + '/' + AWS_PREFIX CSAI_LAMBDA_URL = 'https://ppmcgu088b.execute-api.us-east-1.amazonaws.com/Prod/' def save_bytes_to_s3(buffer, s3_file_name): s3 = boto3.resource('s3', aws_access_key_id=AWS_KEY, aws_secret_access_key=AWS_SECRET, region_name='us-east-1') # or your preferred region try: buffer.seek(0) s3.Bucket(AWS_BUCKET).put_object(Key=AWS_PREFIX + s3_file_name, Body=buffer, ACL='public-read') print(f"Successfully uploaded file to {AWS_BUCKET}/{AWS_PREFIX}{s3_file_name}") except requests.exceptions.HTTPError as errh: print(f"HTTP Error: {errh}") except requests.exceptions.ConnectionError as errc: print(f"Error Connecting: {errc}") except requests.exceptions.Timeout as errt: print(f"Timeout Error: {errt}") except requests.exceptions.RequestException as err: print(f"Something Else: {err}") except NoCredentialsError: print("Credentials not available") def save_image_to_s3(image, format, s3_file_name): # create an in-memory bytes buffer buffer = BytesIO() # save the image to the buffer in PNG format image.save(buffer, format) save_bytes_to_s3(buffer, s3_file_name) def save_url_to_s3(url, s3_file_name): response = requests.get(url) response.raise_for_status() buffer = BytesIO(response.content) save_bytes_to_s3(buffer, s3_file_name)