Dreamoving-Phantom / oss_utils.py
fangxia
first release
df5cb06
# -*- coding: utf-8 -*-
import os
import sys
import string
# import platform
import time
import datetime
import json
# import numpy as np
# import threading
# import cv2
# import PIL.Image as Image
# import ffmpeg
from io import BytesIO
# from queue import Queue
# import glob
import oss2
import random
import requests
import shutil
# import torch
# import ctypes
use_internal_network = False
# OSSAccessKeyId = os.getenv('OSSAccessKeyId', "")
# OSSAccessKeySecret = os.getenv('OSSAccessKeySecret', "")
def get_random_string():
now = datetime.datetime.now()
date = now.strftime('%Y%m%d')
time = now.strftime('%H%M%S')
microsecond = now.strftime('%f')
microsecond = microsecond[:6] # 取前6位,即微秒
rand_num = ''.join([str(random.randint(0, 9)) for _ in range(6)])
random_string = ''.join(random.choices(string.ascii_uppercase, k=6)) # ascii_lowercase
return date + "-" + time + "-" + microsecond + "-" + random_string
class ossService():
def __init__(self, OSSAccessKeyId, OSSAccessKeySecret, Endpoint, BucketName, ObjectName):
self.AccessKeyId = OSSAccessKeyId
self.AccessKeySecret = OSSAccessKeySecret
self.Endpoint = Endpoint
self.BucketName = BucketName # "vigen-video"
self.ObjectName = ObjectName # "VideoGeneration"
self.Prefix = "oss://" + self.BucketName
auth = oss2.Auth(self.AccessKeyId, self.AccessKeySecret)
self.bucket = oss2.Bucket(auth, self.Endpoint, self.BucketName)
# oss_url: eg: oss://BucketName/ObjectName/xxx.mp4
def sign(self, oss_url, timeout=86400):
try:
oss_path = oss_url[len("oss://" + self.BucketName + "/"):]
return 1, self.bucket.sign_url('GET', oss_path, timeout, slash_safe=True)
except Exception as e:
print("sign error: {}".format(e))
return 0, ""
def uploadOssFile(self, oss_full_path, local_full_path):
try:
self.bucket.put_object_from_file(oss_full_path, local_full_path)
return self.sign(self.Prefix+"/"+oss_full_path, timeout=86400)
except oss2.exceptions.OssError as e:
print("oss upload error: ", e)
return 0, ""
def downloadOssFile(self, oss_full_path, local_full_path):
status = 1
try:
self.bucket.get_object_to_file(oss_full_path, local_full_path)
except oss2.exceptions.OssError as e:
print("oss download error: ", e)
status = 0
return status
def downloadFile(self, file_full_url, local_full_path):
status = 1
response = requests.get(file_full_url)
if response.status_code == 200:
with open(local_full_path, "wb") as f:
f.write(response.content)
else:
print("oss download error. ")
status = 0
return status