File size: 3,887 Bytes
bfd6cec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# -*- coding: utf-8 -*-
import os
import sys
import string
import time
import datetime
import oss2
import random
import requests
import shutil
import os

use_internal_network = False

OSSKEY = os.getenv('OSSAccessKeyId')
OSSPASSWD = os.getenv('OSSAccessKeySecret')
OSSEndpoint = os.getenv('OSSEndpoint')
OSSBucketName = os.getenv('OSSBucketName')
OSSObjectName = os.getenv('OSSObjectName')

def get_random_string():
    now = datetime.datetime.now()
    date = now.strftime('%Y%m%d')
    time = now.strftime('%H%M%S')
    microsecond = now.strftime('%f')
    microsecond = microsecond[:6] # 取前6位,即微秒

    rand_num = ''.join([str(random.randint(0, 9)) for _ in range(6)])
    random_string = ''.join(random.choices(string.ascii_uppercase, k=6)) # ascii_lowercase
    return date + "-" + time + "-" + microsecond + "-" + random_string

class ossService():
    def __init__(self):
        # print(f'AK: ******{OSSAccessKeyId[-3:]}')
        # print(f'SK: ******{OSSAccessKeySecret[-3:]}')
        self.AccessKeyId = OSSKEY
        self.AccessKeySecret = OSSPASSWD
        self.Endpoint = OSSEndpoint
        if use_internal_network: #内网加-internal
            self.Endpoint = OSSEndpoint[:-len(".aliyuncs.com")] + "-internal.aliyuncs.com"
        self.BucketName = OSSBucketName # "vigen-invi" # "vigen-video"
        self.ObjectName = OSSObjectName # "video_generation" # "VideoGeneration"
        self.Prefix = "oss://" + self.BucketName

        print('AK:', self.AccessKeyId)
        print('SK:', self.AccessKeySecret)
        print('Endpoint:', self.Endpoint)
        print('BucketName:', self.BucketName)
        print('ObjectName:', self.ObjectName)
        print('Prefix:', self.Prefix)

        auth = oss2.Auth(self.AccessKeyId, self.AccessKeySecret)
        self.bucket = oss2.Bucket(auth, self.Endpoint, self.BucketName)

    # # oss_url: eg: oss://BucketName/ObjectName/xxx.mp4
    # def sign(self, oss_url, timeout=3600):
    #     try:
    #         oss_path = oss_url[len("oss://" + self.BucketName + "/"):]
    #         return 1, self.bucket.sign_url('GET', oss_path, timeout, slash_safe=True)
    #     except Exception as e:
    #         print("sign error: {}".format(e))
    #         return 0, ""
    
    # oss_url: eg: oss://BucketName/ObjectName/xxx.mp4
    # timeout: eg: 3600000
    # params: eg: {'x-oss-process': style}
    def sign(self, oss_url, timeout=86400, params=None):
        try:
            oss_path = oss_url[len("oss://" + self.BucketName + "/"):]
            return 1, self.bucket.sign_url('GET', oss_path, timeout, slash_safe=True, params=params), 'Success.'
        except Exception as e:
            print("sign error: {}".format(e))
            return 0, "", f'{e}'
        
    # eg: ObjectName/xxx.mp4
    def uploadOssFile(self, oss_full_path, local_full_path):
        try:
            self.bucket.put_object_from_file(oss_full_path, local_full_path)
            status,sign_url,message = self.sign(self.Prefix+"/"+oss_full_path, timeout=3600)
            return status,sign_url,message
        except oss2.exceptions.OssError as e:
            print("oss upload error: ", e)
            return 0, "", f'{e}'

    # eg: ObjectName/xxx.mp4
    def downloadOssFile(self, oss_full_path, local_full_path):
        try:
            self.bucket.get_object_to_file(oss_full_path, local_full_path)
            return 1, 'Success.'
        except oss2.exceptions.OssError as e:
            print("oss download error: ", e)
            return 0, f'{e}'

        
    def downloadFile(self, file_full_url, local_full_path):
        response = requests.get(file_full_url)
        if response.status_code == 200:
            with open(local_full_path, "wb") as f:
                f.write(response.content)
            return 1, 'Success.'
        else:
            print("oss download error. ")
            return 0, f'{response}'