Spaces:
Running
Running
import io | |
import os | |
import mimetypes | |
from uuid import uuid4 | |
from typing import BinaryIO, Optional | |
from .config import settings | |
if settings.STORAGE_PROVIDER != "local": | |
import boto3 | |
import botocore | |
s3 = boto3.client( | |
"s3", | |
endpoint_url=settings.S3_ENDPOINT, | |
aws_access_key_id=settings.S3_ACCESS_KEY, | |
aws_secret_access_key=settings.S3_SECRET_KEY, | |
region_name=getattr(settings, "S3_REGION", None), | |
) | |
else: | |
class DummyS3Client: | |
def __init__(self): | |
pass | |
def get_object(self, **kwargs): | |
raise RuntimeError("S3 client not available in local storage mode") | |
def generate_presigned_url(self, **kwargs): | |
raise RuntimeError("S3 client not available in local storage mode") | |
s3 = DummyS3Client() | |
def _ensure_bucket() -> None: | |
"""Ensure bucket exists. Safe to call on every upload.""" | |
if settings.STORAGE_PROVIDER == "local": | |
os.makedirs(settings.STORAGE_DIR, exist_ok=True) | |
return | |
try: | |
s3.head_bucket(Bucket=settings.S3_BUCKET) | |
except botocore.exceptions.ClientError as e: | |
create_kwargs = {"Bucket": settings.S3_BUCKET} | |
region = getattr(settings, "S3_REGION", None) | |
if region and (settings.S3_ENDPOINT is None or "amazonaws.com" in str(settings.S3_ENDPOINT).lower()): | |
create_kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region} | |
s3.create_bucket(**create_kwargs) | |
def get_object_url(key: str, *, expires_in: int = 3600) -> str: | |
"""Return browser-usable URL for object.""" | |
if settings.STORAGE_PROVIDER == "local": | |
return f"/uploads/{key}" | |
public_base = getattr(settings, "S3_PUBLIC_URL_BASE", None) | |
if public_base: | |
return f"{public_base.rstrip('/')}/{key}" | |
return generate_presigned_url(key, expires_in=expires_in) | |
def generate_presigned_url(key: str, expires_in: int = 3600) -> str: | |
"""Generate presigned URL for GETting object.""" | |
if settings.STORAGE_PROVIDER == "local": | |
return f"/uploads/{key}" | |
return s3.generate_presigned_url( | |
ClientMethod="get_object", | |
Params={"Bucket": settings.S3_BUCKET, "Key": key}, | |
ExpiresIn=expires_in, | |
) | |
def upload_fileobj( | |
fileobj: BinaryIO, | |
filename: str, | |
*, | |
content_type: Optional[str] = None, | |
cache_control: Optional[str] = "public, max-age=31536000, immutable", | |
) -> str: | |
"""Upload file-like object to configured storage. Returns object key.""" | |
if settings.STORAGE_PROVIDER == "local": | |
return _upload_local(fileobj, filename, content_type) | |
else: | |
return _upload_s3(fileobj, filename, content_type, cache_control) | |
def _upload_local( | |
fileobj: BinaryIO, | |
filename: str, | |
content_type: Optional[str] = None, | |
) -> str: | |
"""Upload to local filesystem""" | |
os.makedirs(settings.STORAGE_DIR, exist_ok=True) | |
safe_name = filename or "upload.bin" | |
key = f"maps/{uuid4()}_{safe_name}" | |
filepath = os.path.join(settings.STORAGE_DIR, key) | |
os.makedirs(os.path.dirname(filepath), exist_ok=True) | |
with open(filepath, 'wb') as f: | |
fileobj.seek(0) | |
f.write(fileobj.read()) | |
return key | |
def _upload_s3( | |
fileobj: BinaryIO, | |
filename: str, | |
content_type: Optional[str] = None, | |
cache_control: Optional[str] = "public, max-age=31536000, immutable", | |
) -> str: | |
"""Upload to S3/MinIO""" | |
_ensure_bucket() | |
safe_name = filename or "upload.bin" | |
key = f"maps/{uuid4()}_{safe_name}" | |
ct = content_type or (mimetypes.guess_type(safe_name)[0] or "application/octet-stream") | |
try: | |
fileobj.seek(0) | |
except Exception: | |
pass | |
extra_args = {"ContentType": ct} | |
if cache_control: | |
extra_args["CacheControl"] = cache_control | |
if getattr(settings, "S3_PUBLIC_READ", False): | |
extra_args["ACL"] = "public-read" | |
s3.upload_fileobj(fileobj, settings.S3_BUCKET, key, ExtraArgs=extra_args) | |
return key | |
def upload_bytes( | |
data: bytes, | |
filename: str, | |
*, | |
content_type: Optional[str] = None, | |
cache_control: Optional[str] = "public, max-age=31536000, immutable", | |
) -> str: | |
"""Upload raw bytes. Returns object key.""" | |
buf = io.BytesIO(data) | |
return upload_fileobj(buf, filename, content_type=content_type, cache_control=cache_control) | |
def copy_object( | |
src_key: str, | |
*, | |
new_filename: Optional[str] = None, | |
cache_control: Optional[str] = "public, max-age=31536000, immutable", | |
) -> str: | |
"""Server-side copy within same bucket. Returns new object key.""" | |
if settings.STORAGE_PROVIDER == "local": | |
return _copy_local(src_key, new_filename) | |
else: | |
return _copy_s3(src_key, new_filename, cache_control) | |
def _copy_local(src_key: str, new_filename: Optional[str] = None) -> str: | |
"""Copy file locally""" | |
src_path = os.path.join(settings.STORAGE_DIR, src_key) | |
if not os.path.exists(src_path): | |
raise FileNotFoundError(f"Source file not found: {src_key}") | |
tail = new_filename or src_key.split("/")[-1] | |
dest_key = f"maps/{uuid4()}_{tail}" | |
dest_path = os.path.join(settings.STORAGE_DIR, dest_key) | |
os.makedirs(os.path.dirname(dest_path), exist_ok=True) | |
with open(src_path, 'rb') as src, open(dest_path, 'wb') as dest: | |
dest.write(src.read()) | |
return dest_key | |
def _copy_s3(src_key: str, new_filename: Optional[str] = None, cache_control: Optional[str] = None) -> str: | |
"""Copy file in S3""" | |
_ensure_bucket() | |
tail = new_filename or src_key.split("/")[-1] | |
dest_key = f"maps/{uuid4()}_{tail}" | |
extra_args = {} | |
if cache_control: | |
extra_args["CacheControl"] = cache_control | |
if getattr(settings, "S3_PUBLIC_READ", False): | |
extra_args["ACL"] = "public-read" | |
s3.copy( | |
{"Bucket": settings.S3_BUCKET, "Key": src_key}, | |
settings.S3_BUCKET, | |
dest_key, | |
ExtraArgs=extra_args or None, | |
) | |
return dest_key | |
def delete_object(key: str) -> None: | |
"""Delete object (best-effort).""" | |
if settings.STORAGE_PROVIDER == "local": | |
_delete_local(key) | |
else: | |
_delete_s3(key) | |
def _delete_local(key: str) -> None: | |
"""Delete file locally""" | |
try: | |
file_path = os.path.join(settings.STORAGE_DIR, key) | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
except (OSError, FileNotFoundError): | |
pass | |
def _delete_s3(key: str) -> None: | |
"""Delete file in S3""" | |
try: | |
s3.delete_object(Bucket=settings.S3_BUCKET, Key=key) | |
except botocore.exceptions.ClientError: | |
pass | |