File size: 4,118 Bytes
8bc2fc9 4d0b8a7 22fe41e 4d0b8a7 22fe41e 8bc2fc9 22fe41e 4d0b8a7 8bc2fc9 4d0b8a7 22fe41e 8bc2fc9 4d0b8a7 8bc2fc9 4d0b8a7 8bc2fc9 4d0b8a7 8bc2fc9 4d0b8a7 22fe41e 8bc2fc9 4d0b8a7 22fe41e 8bc2fc9 4d0b8a7 22fe41e 8bc2fc9 4d0b8a7 22fe41e 8bc2fc9 4d0b8a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
import logging
import boto3
import os
from botocore.exceptions import ClientError
from botocore.client import Config
import time
from io import BytesIO
from rag.utils import singleton
@singleton
class RAGFlowS3(object):
def __init__(self):
self.conn = None
self.endpoint = os.getenv('ENDPOINT', None)
self.access_key = os.getenv('ACCESS_KEY', None)
self.secret_key = os.getenv('SECRET_KEY', None)
self.region = os.getenv('REGION', None)
self.__open__()
def __open__(self):
try:
if self.conn:
self.__close__()
except Exception:
pass
try:
config = Config(
s3={
'addressing_style': 'virtual'
}
)
self.conn = boto3.client(
's3',
endpoint_url=self.endpoint,
region_name=self.region,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=config
)
except Exception:
logging.exception(
"Fail to connect %s" % self.endpoint)
def __close__(self):
del self.conn
self.conn = None
def bucket_exists(self, bucket):
try:
logging.debug(f"head_bucket bucketname {bucket}")
self.conn.head_bucket(Bucket=bucket)
exists = True
except ClientError:
logging.exception(f"head_bucket error {bucket}")
exists = False
return exists
def health(self):
bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
if not self.bucket_exists(bucket):
self.conn.create_bucket(Bucket=bucket)
logging.debug(f"create bucket {bucket} ********")
r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
return r
def get_properties(self, bucket, key):
return {}
def list(self, bucket, dir, recursive=True):
return []
def put(self, bucket, fnm, binary):
logging.debug(f"bucket name {bucket}; filename :{fnm}:")
for _ in range(1):
try:
if not self.bucket_exists(bucket):
self.conn.create_bucket(Bucket=bucket)
logging.info(f"create bucket {bucket} ********")
r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
return r
except Exception:
logging.exception(f"Fail put {bucket}/{fnm}")
self.__open__()
time.sleep(1)
def rm(self, bucket, fnm):
try:
self.conn.delete_object(Bucket=bucket, Key=fnm)
except Exception:
logging.exception(f"Fail rm {bucket}/{fnm}")
def get(self, bucket, fnm):
for _ in range(1):
try:
r = self.conn.get_object(Bucket=bucket, Key=fnm)
object_data = r['Body'].read()
return object_data
except Exception:
logging.exception(f"fail get {bucket}/{fnm}")
self.__open__()
time.sleep(1)
return
def obj_exist(self, bucket, fnm):
try:
if self.conn.head_object(Bucket=bucket, Key=fnm):
return True
except ClientError as e:
if e.response['Error']['Code'] == '404':
return False
else:
raise
def get_presigned_url(self, bucket, fnm, expires):
for _ in range(10):
try:
r = self.conn.generate_presigned_url('get_object',
Params={'Bucket': bucket,
'Key': fnm},
ExpiresIn=expires)
return r
except Exception:
logging.exception(f"fail get url {bucket}/{fnm}")
self.__open__()
time.sleep(1)
return
|