Spaces:
Runtime error
Runtime error
File size: 2,335 Bytes
7553410 a689179 7553410 a689179 7553410 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
import os
import boto3
from botocore.exceptions import ClientError
from config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, BUCKET_NAME, S3_OBJ_BASE_URL
class S3Handler:
def __init__(self):
"""
Establishes connection with the required s3 bucket
"""
try:
self.s3 = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)
except ClientError as e:
print(f"Failed to connect to S3: {e}")
def upload_file(self, user_id, s3_path, file_path, **kwargs):
"""
Uploads file to the s3
"""
try:
# Filename: File path to upload
# Bucket: Name of the bucket to upload the file.
# Key: Name of the key to upload to S3.
s3_user_path = user_id + '/' + s3_path
self.s3.upload_file(
Filename=file_path,
Bucket=BUCKET_NAME,
Key=s3_user_path,
# ExtraArgs={'ACL': 'public-read'}
)
print(f"File uploaded to {BUCKET_NAME} as {s3_path}")
s3_obj_path = S3_OBJ_BASE_URL + '/' + s3_user_path
return s3_obj_path
except ClientError as e:
print(f"Failed to upload file to S3: {e}")
def download_file(self, user_id, s3_path, file_path, **kwargs):
"""
Downloads file from s3
"""
try:
# Filename: Local File path to download to
# Bucket: Name of the bucket to download the file from
# Key: Name of the file to download from the bucket
s3_user_path = user_id + '/' + s3_path
self.s3.download_file(
Filename=file_path,
Bucket=BUCKET_NAME,
Key=s3_user_path
)
print(f"File downloaded from {BUCKET_NAME} as {file_path}")
s3_obj_path = S3_OBJ_BASE_URL + '/' + s3_user_path
return s3_obj_path
except ClientError as e:
print(f"Failed to download file from S3: {e}")
# s3_path = "sample_video_srt_2.mp4"
# file_path = "Output/video_1.mp4"
# s3 = S3Handler()
# s3.upload_file(s3_path, file_path, user_id="uzair") |