| """
|
| R2 Storage integration for OpenManus
|
| Provides interface to Cloudflare R2 storage operations
|
| """
|
|
|
| import io
|
| from typing import Any, BinaryIO, Dict, List, Optional
|
|
|
| from app.logger import logger
|
|
|
| from .client import CloudflareClient, CloudflareError
|
|
|
|
|
| class R2Storage:
|
| """Cloudflare R2 Storage client"""
|
|
|
| def __init__(
|
| self,
|
| client: CloudflareClient,
|
| storage_bucket: str,
|
| assets_bucket: Optional[str] = None,
|
| ):
|
| self.client = client
|
| self.storage_bucket = storage_bucket
|
| self.assets_bucket = assets_bucket or storage_bucket
|
| self.base_endpoint = f"accounts/{client.account_id}/r2/buckets"
|
|
|
| def _get_bucket_name(self, bucket_type: str = "storage") -> str:
|
| """Get bucket name based on type"""
|
| if bucket_type == "assets":
|
| return self.assets_bucket
|
| return self.storage_bucket
|
|
|
| async def upload_file(
|
| self,
|
| key: str,
|
| file_data: bytes,
|
| content_type: str = "application/octet-stream",
|
| bucket_type: str = "storage",
|
| metadata: Optional[Dict[str, str]] = None,
|
| use_worker: bool = True,
|
| ) -> Dict[str, Any]:
|
| """Upload a file to R2"""
|
|
|
| bucket_name = self._get_bucket_name(bucket_type)
|
|
|
| try:
|
| if use_worker:
|
|
|
| form_data = {
|
| "file": file_data,
|
| "bucket": bucket_type,
|
| "key": key,
|
| "contentType": content_type,
|
| }
|
|
|
| if metadata:
|
| form_data["metadata"] = metadata
|
|
|
| response = await self.client.post(
|
| "api/files", data=form_data, use_worker=True
|
| )
|
| else:
|
|
|
| headers = {"Content-Type": content_type}
|
|
|
| if metadata:
|
| for k, v in metadata.items():
|
| headers[f"x-amz-meta-{k}"] = v
|
|
|
| response = await self.client.upload_file(
|
| f"{self.base_endpoint}/{bucket_name}/objects/{key}",
|
| file_data,
|
| content_type,
|
| headers,
|
| )
|
|
|
| return {
|
| "success": True,
|
| "key": key,
|
| "bucket": bucket_type,
|
| "bucket_name": bucket_name,
|
| "size": len(file_data),
|
| "content_type": content_type,
|
| "url": f"/{bucket_type}/{key}",
|
| **response,
|
| }
|
|
|
| except CloudflareError as e:
|
| logger.error(f"R2 upload failed: {e}")
|
| raise
|
|
|
| async def upload_file_stream(
|
| self,
|
| key: str,
|
| file_stream: BinaryIO,
|
| content_type: str = "application/octet-stream",
|
| bucket_type: str = "storage",
|
| metadata: Optional[Dict[str, str]] = None,
|
| ) -> Dict[str, Any]:
|
| """Upload a file from stream"""
|
|
|
| file_data = file_stream.read()
|
| return await self.upload_file(
|
| key, file_data, content_type, bucket_type, metadata
|
| )
|
|
|
| async def get_file(
|
| self, key: str, bucket_type: str = "storage", use_worker: bool = True
|
| ) -> Optional[Dict[str, Any]]:
|
| """Get a file from R2"""
|
|
|
| bucket_name = self._get_bucket_name(bucket_type)
|
|
|
| try:
|
| if use_worker:
|
| response = await self.client.get(
|
| f"api/files/{key}?bucket={bucket_type}", use_worker=True
|
| )
|
|
|
| if response:
|
| return {
|
| "key": key,
|
| "bucket": bucket_type,
|
| "bucket_name": bucket_name,
|
| "data": response,
|
| "exists": True,
|
| }
|
| else:
|
| response = await self.client.get(
|
| f"{self.base_endpoint}/{bucket_name}/objects/{key}"
|
| )
|
|
|
| return {
|
| "key": key,
|
| "bucket": bucket_type,
|
| "bucket_name": bucket_name,
|
| "data": response,
|
| "exists": True,
|
| }
|
|
|
| except CloudflareError as e:
|
| if e.status_code == 404:
|
| return None
|
| logger.error(f"R2 get file failed: {e}")
|
| raise
|
|
|
| return None
|
|
|
| async def delete_file(
|
| self, key: str, bucket_type: str = "storage", use_worker: bool = True
|
| ) -> Dict[str, Any]:
|
| """Delete a file from R2"""
|
|
|
| bucket_name = self._get_bucket_name(bucket_type)
|
|
|
| try:
|
| if use_worker:
|
| response = await self.client.delete(
|
| f"api/files/{key}?bucket={bucket_type}", use_worker=True
|
| )
|
| else:
|
| response = await self.client.delete(
|
| f"{self.base_endpoint}/{bucket_name}/objects/{key}"
|
| )
|
|
|
| return {
|
| "success": True,
|
| "key": key,
|
| "bucket": bucket_type,
|
| "bucket_name": bucket_name,
|
| **response,
|
| }
|
|
|
| except CloudflareError as e:
|
| logger.error(f"R2 delete failed: {e}")
|
| raise
|
|
|
| async def list_files(
|
| self,
|
| bucket_type: str = "storage",
|
| prefix: str = "",
|
| limit: int = 1000,
|
| use_worker: bool = True,
|
| ) -> Dict[str, Any]:
|
| """List files in R2 bucket"""
|
|
|
| bucket_name = self._get_bucket_name(bucket_type)
|
|
|
| try:
|
| if use_worker:
|
| params = {"bucket": bucket_type, "prefix": prefix, "limit": limit}
|
|
|
| query_string = "&".join([f"{k}={v}" for k, v in params.items() if v])
|
| response = await self.client.get(
|
| f"api/files/list?{query_string}", use_worker=True
|
| )
|
| else:
|
| params = {"prefix": prefix, "max-keys": limit}
|
|
|
| query_string = "&".join([f"{k}={v}" for k, v in params.items() if v])
|
| response = await self.client.get(
|
| f"{self.base_endpoint}/{bucket_name}/objects?{query_string}"
|
| )
|
|
|
| return {
|
| "bucket": bucket_type,
|
| "bucket_name": bucket_name,
|
| "prefix": prefix,
|
| "files": response.get("objects", []),
|
| "truncated": response.get("truncated", False),
|
| **response,
|
| }
|
|
|
| except CloudflareError as e:
|
| logger.error(f"R2 list files failed: {e}")
|
| raise
|
|
|
| async def get_file_metadata(
|
| self, key: str, bucket_type: str = "storage", use_worker: bool = True
|
| ) -> Optional[Dict[str, Any]]:
|
| """Get file metadata without downloading content"""
|
|
|
| bucket_name = self._get_bucket_name(bucket_type)
|
|
|
| try:
|
| if use_worker:
|
| response = await self.client.get(
|
| f"api/files/{key}/metadata?bucket={bucket_type}", use_worker=True
|
| )
|
| else:
|
|
|
| response = await self.client.get(
|
| f"{self.base_endpoint}/{bucket_name}/objects/{key}",
|
| headers={"Range": "bytes=0-0"},
|
| )
|
|
|
| if response:
|
| return {
|
| "key": key,
|
| "bucket": bucket_type,
|
| "bucket_name": bucket_name,
|
| **response,
|
| }
|
|
|
| except CloudflareError as e:
|
| if e.status_code == 404:
|
| return None
|
| logger.error(f"R2 get metadata failed: {e}")
|
| raise
|
|
|
| return None
|
|
|
| async def copy_file(
|
| self,
|
| source_key: str,
|
| destination_key: str,
|
| source_bucket: str = "storage",
|
| destination_bucket: str = "storage",
|
| use_worker: bool = True,
|
| ) -> Dict[str, Any]:
|
| """Copy a file within R2 or between buckets"""
|
|
|
| try:
|
| if use_worker:
|
| copy_data = {
|
| "sourceKey": source_key,
|
| "destinationKey": destination_key,
|
| "sourceBucket": source_bucket,
|
| "destinationBucket": destination_bucket,
|
| }
|
|
|
| response = await self.client.post(
|
| "api/files/copy", data=copy_data, use_worker=True
|
| )
|
| else:
|
|
|
| source_file = await self.get_file(source_key, source_bucket, False)
|
|
|
| if not source_file:
|
| raise CloudflareError(f"Source file {source_key} not found")
|
|
|
|
|
| response = await self.upload_file(
|
| destination_key,
|
| source_file["data"],
|
| bucket_type=destination_bucket,
|
| use_worker=False,
|
| )
|
|
|
| return {
|
| "success": True,
|
| "source_key": source_key,
|
| "destination_key": destination_key,
|
| "source_bucket": source_bucket,
|
| "destination_bucket": destination_bucket,
|
| **response,
|
| }
|
|
|
| except CloudflareError as e:
|
| logger.error(f"R2 copy failed: {e}")
|
| raise
|
|
|
| async def move_file(
|
| self,
|
| source_key: str,
|
| destination_key: str,
|
| source_bucket: str = "storage",
|
| destination_bucket: str = "storage",
|
| use_worker: bool = True,
|
| ) -> Dict[str, Any]:
|
| """Move a file (copy then delete)"""
|
|
|
| try:
|
|
|
| copy_result = await self.copy_file(
|
| source_key,
|
| destination_key,
|
| source_bucket,
|
| destination_bucket,
|
| use_worker,
|
| )
|
|
|
|
|
| delete_result = await self.delete_file(
|
| source_key, source_bucket, use_worker
|
| )
|
|
|
| return {
|
| "success": True,
|
| "source_key": source_key,
|
| "destination_key": destination_key,
|
| "source_bucket": source_bucket,
|
| "destination_bucket": destination_bucket,
|
| "copy_result": copy_result,
|
| "delete_result": delete_result,
|
| }
|
|
|
| except CloudflareError as e:
|
| logger.error(f"R2 move failed: {e}")
|
| raise
|
|
|
| async def generate_presigned_url(
|
| self,
|
| key: str,
|
| bucket_type: str = "storage",
|
| expires_in: int = 3600,
|
| method: str = "GET",
|
| ) -> Dict[str, Any]:
|
| """Generate a presigned URL for direct access"""
|
|
|
|
|
|
|
|
|
| try:
|
| url_data = {
|
| "key": key,
|
| "bucket": bucket_type,
|
| "expiresIn": expires_in,
|
| "method": method,
|
| }
|
|
|
| response = await self.client.post(
|
| "api/files/presigned-url", data=url_data, use_worker=True
|
| )
|
|
|
| return {
|
| "success": True,
|
| "key": key,
|
| "bucket": bucket_type,
|
| "method": method,
|
| "expires_in": expires_in,
|
| **response,
|
| }
|
|
|
| except CloudflareError as e:
|
| logger.error(f"R2 presigned URL generation failed: {e}")
|
| raise
|
|
|
| async def get_storage_stats(self, use_worker: bool = True) -> Dict[str, Any]:
|
| """Get storage statistics"""
|
|
|
| try:
|
| if use_worker:
|
| response = await self.client.get("api/files/stats", use_worker=True)
|
| else:
|
|
|
| storage_list = await self.list_files("storage", use_worker=False)
|
| assets_list = await self.list_files("assets", use_worker=False)
|
|
|
| storage_size = sum(
|
| file.get("size", 0) for file in storage_list.get("files", [])
|
| )
|
| assets_size = sum(
|
| file.get("size", 0) for file in assets_list.get("files", [])
|
| )
|
|
|
| response = {
|
| "storage": {
|
| "file_count": len(storage_list.get("files", [])),
|
| "total_size": storage_size,
|
| },
|
| "assets": {
|
| "file_count": len(assets_list.get("files", [])),
|
| "total_size": assets_size,
|
| },
|
| "total": {
|
| "file_count": len(storage_list.get("files", []))
|
| + len(assets_list.get("files", [])),
|
| "total_size": storage_size + assets_size,
|
| },
|
| }
|
|
|
| return response
|
|
|
| except CloudflareError as e:
|
| logger.error(f"R2 storage stats failed: {e}")
|
| raise
|
|
|
| def create_file_stream(self, data: bytes) -> io.BytesIO:
|
| """Create a file stream from bytes"""
|
| return io.BytesIO(data)
|
|
|
| def get_public_url(self, key: str, bucket_type: str = "storage") -> str:
|
| """Get public URL for a file (if bucket is configured for public access)"""
|
| bucket_name = self._get_bucket_name(bucket_type)
|
|
|
|
|
|
|
| if self.client.worker_url:
|
| return f"{self.client.worker_url}/api/files/{key}?bucket={bucket_type}"
|
|
|
|
|
| return f"https://pub-{bucket_name}.r2.dev/{key}"
|
|
|