# -*- coding: utf-8 -*- import os import sys import string import time import datetime import oss2 import random import requests import shutil import os use_internal_network = False OSSKEY = os.getenv('OSSAccessKeyId') OSSPASSWD = os.getenv('OSSAccessKeySecret') OSSEndpoint = os.getenv('OSSEndpoint') OSSBucketName = os.getenv('OSSBucketName') OSSObjectName = os.getenv('OSSObjectName') def get_random_string(): now = datetime.datetime.now() date = now.strftime('%Y%m%d') time = now.strftime('%H%M%S') microsecond = now.strftime('%f') microsecond = microsecond[:6] # 取前6位,即微秒 rand_num = ''.join([str(random.randint(0, 9)) for _ in range(6)]) random_string = ''.join(random.choices(string.ascii_uppercase, k=6)) # ascii_lowercase return date + "-" + time + "-" + microsecond + "-" + random_string class ossService(): def __init__(self): # print(f'AK: ******{OSSAccessKeyId[-3:]}') # print(f'SK: ******{OSSAccessKeySecret[-3:]}') self.AccessKeyId = OSSKEY self.AccessKeySecret = OSSPASSWD self.Endpoint = OSSEndpoint if use_internal_network: #内网加-internal self.Endpoint = OSSEndpoint[:-len(".aliyuncs.com")] + "-internal.aliyuncs.com" self.BucketName = OSSBucketName # "vigen-invi" # "vigen-video" self.ObjectName = OSSObjectName # "video_generation" # "VideoGeneration" self.Prefix = "oss://" + self.BucketName print('AK:', self.AccessKeyId) print('SK:', self.AccessKeySecret) print('Endpoint:', self.Endpoint) print('BucketName:', self.BucketName) print('ObjectName:', self.ObjectName) print('Prefix:', self.Prefix) auth = oss2.Auth(self.AccessKeyId, self.AccessKeySecret) self.bucket = oss2.Bucket(auth, self.Endpoint, self.BucketName) # # oss_url: eg: oss://BucketName/ObjectName/xxx.mp4 # def sign(self, oss_url, timeout=3600): # try: # oss_path = oss_url[len("oss://" + self.BucketName + "/"):] # return 1, self.bucket.sign_url('GET', oss_path, timeout, slash_safe=True) # except Exception as e: # print("sign error: {}".format(e)) # return 0, "" # oss_url: eg: oss://BucketName/ObjectName/xxx.mp4 # timeout: eg: 3600000 # params: eg: {'x-oss-process': style} def sign(self, oss_url, timeout=86400, params=None): try: oss_path = oss_url[len("oss://" + self.BucketName + "/"):] return 1, self.bucket.sign_url('GET', oss_path, timeout, slash_safe=True, params=params), 'Success.' except Exception as e: print("sign error: {}".format(e)) return 0, "", f'{e}' # eg: ObjectName/xxx.mp4 def uploadOssFile(self, oss_full_path, local_full_path): try: self.bucket.put_object_from_file(oss_full_path, local_full_path) status,sign_url,message = self.sign(self.Prefix+"/"+oss_full_path, timeout=3600) return status,sign_url,message except oss2.exceptions.OssError as e: print("oss upload error: ", e) return 0, "", f'{e}' # eg: ObjectName/xxx.mp4 def downloadOssFile(self, oss_full_path, local_full_path): try: self.bucket.get_object_to_file(oss_full_path, local_full_path) return 1, 'Success.' except oss2.exceptions.OssError as e: print("oss download error: ", e) return 0, f'{e}' def downloadFile(self, file_full_url, local_full_path): response = requests.get(file_full_url) if response.status_code == 200: with open(local_full_path, "wb") as f: f.write(response.content) return 1, 'Success.' else: print("oss download error. ") return 0, f'{response}'