# -*- coding: utf-8 -*- import os import sys import string # import platform import time import datetime import json # import numpy as np # import threading # import cv2 # import PIL.Image as Image # import ffmpeg from io import BytesIO # from queue import Queue # import glob import oss2 import random import requests import shutil # import torch # import ctypes use_internal_network = False # OSSAccessKeyId = os.getenv('OSSAccessKeyId', "") # OSSAccessKeySecret = os.getenv('OSSAccessKeySecret', "") def get_random_string(): now = datetime.datetime.now() date = now.strftime('%Y%m%d') time = now.strftime('%H%M%S') microsecond = now.strftime('%f') microsecond = microsecond[:6] # 取前6位,即微秒 rand_num = ''.join([str(random.randint(0, 9)) for _ in range(6)]) random_string = ''.join(random.choices(string.ascii_uppercase, k=6)) # ascii_lowercase return date + "-" + time + "-" + microsecond + "-" + random_string class ossService(): def __init__(self, OSSAccessKeyId, OSSAccessKeySecret, Endpoint, BucketName, ObjectName): self.AccessKeyId = OSSAccessKeyId self.AccessKeySecret = OSSAccessKeySecret self.Endpoint = Endpoint self.BucketName = BucketName # "vigen-video" self.ObjectName = ObjectName # "VideoGeneration" self.Prefix = "oss://" + self.BucketName auth = oss2.Auth(self.AccessKeyId, self.AccessKeySecret) self.bucket = oss2.Bucket(auth, self.Endpoint, self.BucketName) # oss_url: eg: oss://BucketName/ObjectName/xxx.mp4 def sign(self, oss_url, timeout=86400): try: oss_path = oss_url[len("oss://" + self.BucketName + "/"):] return 1, self.bucket.sign_url('GET', oss_path, timeout, slash_safe=True) except Exception as e: print("sign error: {}".format(e)) return 0, "" def uploadOssFile(self, oss_full_path, local_full_path): try: self.bucket.put_object_from_file(oss_full_path, local_full_path) return self.sign(self.Prefix+"/"+oss_full_path, timeout=86400) except oss2.exceptions.OssError as e: print("oss upload error: ", e) return 0, "" def downloadOssFile(self, oss_full_path, local_full_path): status = 1 try: self.bucket.get_object_to_file(oss_full_path, local_full_path) except oss2.exceptions.OssError as e: print("oss download error: ", e) status = 0 return status def downloadFile(self, file_full_url, local_full_path): status = 1 response = requests.get(file_full_url) if response.status_code == 200: with open(local_full_path, "wb") as f: f.write(response.content) else: print("oss download error. ") status = 0 return status