File size: 3,887 Bytes
bfd6cec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# -*- coding: utf-8 -*-
import os
import sys
import string
import time
import datetime
import oss2
import random
import requests
import shutil
import os
use_internal_network = False
OSSKEY = os.getenv('OSSAccessKeyId')
OSSPASSWD = os.getenv('OSSAccessKeySecret')
OSSEndpoint = os.getenv('OSSEndpoint')
OSSBucketName = os.getenv('OSSBucketName')
OSSObjectName = os.getenv('OSSObjectName')
def get_random_string():
now = datetime.datetime.now()
date = now.strftime('%Y%m%d')
time = now.strftime('%H%M%S')
microsecond = now.strftime('%f')
microsecond = microsecond[:6] # 取前6位,即微秒
rand_num = ''.join([str(random.randint(0, 9)) for _ in range(6)])
random_string = ''.join(random.choices(string.ascii_uppercase, k=6)) # ascii_lowercase
return date + "-" + time + "-" + microsecond + "-" + random_string
class ossService():
def __init__(self):
# print(f'AK: ******{OSSAccessKeyId[-3:]}')
# print(f'SK: ******{OSSAccessKeySecret[-3:]}')
self.AccessKeyId = OSSKEY
self.AccessKeySecret = OSSPASSWD
self.Endpoint = OSSEndpoint
if use_internal_network: #内网加-internal
self.Endpoint = OSSEndpoint[:-len(".aliyuncs.com")] + "-internal.aliyuncs.com"
self.BucketName = OSSBucketName # "vigen-invi" # "vigen-video"
self.ObjectName = OSSObjectName # "video_generation" # "VideoGeneration"
self.Prefix = "oss://" + self.BucketName
print('AK:', self.AccessKeyId)
print('SK:', self.AccessKeySecret)
print('Endpoint:', self.Endpoint)
print('BucketName:', self.BucketName)
print('ObjectName:', self.ObjectName)
print('Prefix:', self.Prefix)
auth = oss2.Auth(self.AccessKeyId, self.AccessKeySecret)
self.bucket = oss2.Bucket(auth, self.Endpoint, self.BucketName)
# # oss_url: eg: oss://BucketName/ObjectName/xxx.mp4
# def sign(self, oss_url, timeout=3600):
# try:
# oss_path = oss_url[len("oss://" + self.BucketName + "/"):]
# return 1, self.bucket.sign_url('GET', oss_path, timeout, slash_safe=True)
# except Exception as e:
# print("sign error: {}".format(e))
# return 0, ""
# oss_url: eg: oss://BucketName/ObjectName/xxx.mp4
# timeout: eg: 3600000
# params: eg: {'x-oss-process': style}
def sign(self, oss_url, timeout=86400, params=None):
try:
oss_path = oss_url[len("oss://" + self.BucketName + "/"):]
return 1, self.bucket.sign_url('GET', oss_path, timeout, slash_safe=True, params=params), 'Success.'
except Exception as e:
print("sign error: {}".format(e))
return 0, "", f'{e}'
# eg: ObjectName/xxx.mp4
def uploadOssFile(self, oss_full_path, local_full_path):
try:
self.bucket.put_object_from_file(oss_full_path, local_full_path)
status,sign_url,message = self.sign(self.Prefix+"/"+oss_full_path, timeout=3600)
return status,sign_url,message
except oss2.exceptions.OssError as e:
print("oss upload error: ", e)
return 0, "", f'{e}'
# eg: ObjectName/xxx.mp4
def downloadOssFile(self, oss_full_path, local_full_path):
try:
self.bucket.get_object_to_file(oss_full_path, local_full_path)
return 1, 'Success.'
except oss2.exceptions.OssError as e:
print("oss download error: ", e)
return 0, f'{e}'
def downloadFile(self, file_full_url, local_full_path):
response = requests.get(file_full_url)
if response.status_code == 200:
with open(local_full_path, "wb") as f:
f.write(response.content)
return 1, 'Success.'
else:
print("oss download error. ")
return 0, f'{response}'
|