File size: 2,889 Bytes
df5cb06
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# -*- coding: utf-8 -*-
import os
import sys
import string
# import platform
import time
import datetime
import json
# import numpy as np
# import threading
# import cv2
# import PIL.Image as Image
# import ffmpeg
from io import BytesIO
# from queue import Queue
# import glob
import oss2
import random
import requests
import shutil
# import torch
# import ctypes

use_internal_network = False

# OSSAccessKeyId = os.getenv('OSSAccessKeyId', "")
# OSSAccessKeySecret = os.getenv('OSSAccessKeySecret', "")

def get_random_string():
    now = datetime.datetime.now()
    date = now.strftime('%Y%m%d')
    time = now.strftime('%H%M%S')
    microsecond = now.strftime('%f')
    microsecond = microsecond[:6] # 取前6位,即微秒

    rand_num = ''.join([str(random.randint(0, 9)) for _ in range(6)])
    random_string = ''.join(random.choices(string.ascii_uppercase, k=6)) # ascii_lowercase
    return date + "-" + time + "-" + microsecond + "-" + random_string

class ossService():
    def __init__(self, OSSAccessKeyId, OSSAccessKeySecret, Endpoint, BucketName, ObjectName):
        self.AccessKeyId = OSSAccessKeyId
        self.AccessKeySecret = OSSAccessKeySecret
        self.Endpoint = Endpoint
        self.BucketName = BucketName # "vigen-video"
        self.ObjectName = ObjectName # "VideoGeneration"
        self.Prefix = "oss://" + self.BucketName

        auth = oss2.Auth(self.AccessKeyId, self.AccessKeySecret)
        self.bucket = oss2.Bucket(auth, self.Endpoint, self.BucketName)


    # oss_url: eg: oss://BucketName/ObjectName/xxx.mp4
    def sign(self, oss_url, timeout=86400):
        try:
            oss_path = oss_url[len("oss://" + self.BucketName + "/"):]
            return 1, self.bucket.sign_url('GET', oss_path, timeout, slash_safe=True)
        except Exception as e:
            print("sign error: {}".format(e))
            return 0, ""

    def uploadOssFile(self, oss_full_path, local_full_path):
        try:
            self.bucket.put_object_from_file(oss_full_path, local_full_path)
            return self.sign(self.Prefix+"/"+oss_full_path, timeout=86400)
        except oss2.exceptions.OssError as e:
            print("oss upload error: ", e)
            return 0, ""

    def downloadOssFile(self, oss_full_path, local_full_path):
        status = 1
        try:
            self.bucket.get_object_to_file(oss_full_path, local_full_path)
        except oss2.exceptions.OssError as e:
            print("oss download error: ", e)
            status = 0
        return status

        
    def downloadFile(self, file_full_url, local_full_path):
        status = 1
        response = requests.get(file_full_url)
        if response.status_code == 200:
            with open(local_full_path, "wb") as f:
                f.write(response.content)
        else:
            print("oss download error. ")
            status = 0
        return status