Spaces:
Paused
Paused
File size: 16,574 Bytes
cc30486 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 |
"""
焕影小程序功能服务端的基本工具函数,以类的形式封装
"""
try: # 加上这个try的原因在于本地环境和云函数端的import形式有所不同
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
except ImportError:
try:
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
except ImportError:
raise ImportError("请下载腾讯云COS相关代码包:pip install cos-python-sdk-v5")
import requests
import datetime
import json
from .error import ProcessError
import os
local_path_ = os.path.dirname(__file__)
class GetConfig(object):
@staticmethod
def hy_sdk_client(Id:str, Key:str):
# 从cos中寻找文件
REGION: str = 'ap-beijing'
TOKEN = None
SCHEME: str = 'https'
BUCKET: str = 'hy-sdk-config-1305323352'
client_config = CosConfig(Region=REGION,
SecretId=Id,
SecretKey=Key,
Token=TOKEN,
Scheme=SCHEME)
return CosS3Client(client_config), BUCKET
def load_json(self, path:str, default_download=False):
try:
if os.path.isdir(path):
raise ProcessError("请输入具体的配置文件路径,而非文件夹!")
if default_download is True:
print(f"\033[34m 默认强制重新下载配置文件...\033[0m")
raise FileNotFoundError
with open(path) as f:
config = json.load(f)
return config
except FileNotFoundError:
dir_name = os.path.dirname(path)
try:
os.makedirs(dir_name)
except FileExistsError:
pass
base_name = os.path.basename(path)
print(f"\033[34m 正在从COS中下载配置文件...\033[0m")
print(f"\033[31m 请注意,接下来会在{dir_name}路径下生成文件{base_name}...\033[0m")
Id = input("请输入SecretId:")
Key = input("请输入SecretKey:")
client, bucket = self.hy_sdk_client(Id, Key)
data_bytes = client.get_object(Bucket=bucket,Key=base_name)["Body"].get_raw_stream().read()
data = json.loads(data_bytes.decode("utf-8"))
# data["SecretId"] = Id # 未来可以把这个加上
# data["SecretKey"] = Key
with open(path, "w") as f:
data_str = json.dumps(data, ensure_ascii=False)
# 如果 ensure_ascii 是 true (即默认值),输出保证将所有输入的非 ASCII 字符转义。
# 如果 ensure_ascii 是 false,这些字符会原样输出。
f.write(data_str)
f.close()
print(f"\033[32m 配置文件保存成功\033[0m")
return data
except json.decoder.JSONDecodeError:
print(f"\033[31m WARNING: 配置文件为空!\033[0m")
return {}
def load_file(self, cloud_path:str, local_path:str):
"""
从COS中下载文件到本地,本函数将会被默认执行的,在使用的时候建议加一些限制.
:param cloud_path: 云端的文件路径
:param local_path: 将云端文件保存在本地的路径
"""
if os.path.isdir(cloud_path):
raise ProcessError("请输入具体的云端文件路径,而非文件夹!")
if os.path.isdir(local_path):
raise ProcessError("请输入具体的本地文件路径,而非文件夹!")
dir_name = os.path.dirname(local_path)
base_name = os.path.basename(local_path)
try:
os.makedirs(dir_name)
except FileExistsError:
pass
cloud_name = os.path.basename(cloud_path)
print(f"\033[31m 请注意,接下来会在{dir_name}路径下生成文件{base_name}\033[0m")
Id = input("请输入SecretId:")
Key = input("请输入SecretKey:")
client, bucket = self.hy_sdk_client(Id, Key)
print(f"\033[34m 正在从COS中下载文件: {cloud_name}, 此过程可能耗费一些时间...\033[0m")
data_bytes = client.get_object(Bucket=bucket,Key=cloud_path)["Body"].get_raw_stream().read()
# data["SecretId"] = Id # 未来可以把这个加上
# data["SecretKey"] = Key
with open(local_path, "wb") as f:
# 如果 ensure_ascii 是 true (即默认值),输出保证将所有输入的非 ASCII 字符转义。
# 如果 ensure_ascii 是 false,这些字符会原样输出。
f.write(data_bytes)
f.close()
print(f"\033[32m 文件保存成功\033[0m")
class CosConf(GetConfig):
"""
从安全的角度出发,将一些默认配置文件上传至COS中,接下来使用COS和它的子类的时候,在第一次使用时需要输入Cuny给的id和key
用于连接cos存储桶,下载配置文件.
当然,在service_default_download = False的时候,如果在运行路径下已经有conf/service_config.json文件了,
那么就不用再次下载了,也不用输入id和key
事实上这只需要运行一次,因为配置文件将会被下载至源码文件夹中
如果要自定义路径,请在继承的子类中编写__init__函数,将service_path定向到指定路径
"""
def __init__(self) -> None:
# 下面这些参数是类的共享参数
self.__SECRET_ID: str = None # 服务的id
self.__SECRET_KEY: str = None # 服务的key
self.__REGION: str = None # 服务的存储桶地区
self.__TOKEN: str = None # 服务的token,目前一直是None
self.__SCHEME: str = None # 服务的访问协议,默认实际上是https
self.__BUCKET: str = None # 服务的存储桶
self.__SERVICE_CONFIG: dict = None # 服务的配置文件
self.service_path: str = f"{local_path_}/conf/service_config.json"
# 配置文件路径,默认是函数运行的路径下的conf文件夹
self.service_default_download = False # 是否在每次访问配置的时候都重新下载文件
@property
def service_config(self):
if self.__SERVICE_CONFIG is None or self.service_default_download is True:
self.__SERVICE_CONFIG = self.load_json(self.service_path, self.service_default_download)
return self.__SERVICE_CONFIG
@property
def client(self):
client_config = CosConfig(Region=self.region,
SecretId=self.secret_id,
SecretKey=self.secret_key,
Token=self.token,
Scheme=self.scheme)
return CosS3Client(client_config)
def get_key(self, key:str):
try:
data = self.service_config[key]
if data == "None":
return None
else:
return data
except KeyError:
print(f"\033[31m没有对应键值{key},默认返回None\033[0m")
return None
@property
def secret_id(self):
if self.__SECRET_ID is None:
self.__SECRET_ID = self.get_key("SECRET_ID")
return self.__SECRET_ID
@secret_id.setter
def secret_id(self, value:str):
self.__SECRET_ID = value
@property
def secret_key(self):
if self.__SECRET_KEY is None:
self.__SECRET_KEY = self.get_key("SECRET_KEY")
return self.__SECRET_KEY
@secret_key.setter
def secret_key(self, value:str):
self.__SECRET_KEY = value
@property
def region(self):
if self.__REGION is None:
self.__REGION = self.get_key("REGION")
return self.__REGION
@region.setter
def region(self, value:str):
self.__REGION = value
@property
def token(self):
# if self.__TOKEN is None:
# self.__TOKEN = self.get_key("TOKEN")
# 这里可以注释掉
return self.__TOKEN
@token.setter
def token(self, value:str):
self.__TOKEN= value
@property
def scheme(self):
if self.__SCHEME is None:
self.__SCHEME = self.get_key("SCHEME")
return self.__SCHEME
@scheme.setter
def scheme(self, value:str):
self.__SCHEME = value
@property
def bucket(self):
if self.__BUCKET is None:
self.__BUCKET = self.get_key("BUCKET")
return self.__BUCKET
@bucket.setter
def bucket(self, value):
self.__BUCKET = value
def downloadFile_COS(self, key, bucket:str=None, if_read:bool=False):
"""
从COS下载对象(二进制数据), 如果下载失败就返回None
"""
CosBucket = self.bucket if bucket is None else bucket
try:
# 将本类的Debug继承给抛弃了
# self.debug_print(f"Download from {CosBucket}", font_color="blue")
obj = self.client.get_object(
Bucket=CosBucket,
Key=key
)
if if_read is True:
data = obj["Body"].get_raw_stream().read() # byte
return data
else:
return obj
except Exception as e:
print(f"\033[31m下载失败! 错误描述:{e}\033[0m")
return None
def showFileList_COS_base(self, key, bucket, marker:str=""):
"""
返回cos存储桶内部的某个文件夹的内部名称
:param key: cos云端的存储路径
:param bucket: cos存储桶名称,如果没指定名称(None)就会寻找默认的存储桶
:param marker: 标记,用于记录上次查询到哪里了
ps:如果需要修改默认的存储桶配置,请在代码运行的时候加入代码 s.bucket = 存储桶名称 (s是对象实例)
返回的内容存储在response["Content"],不过返回的数据大小是有限制的,具体内容还是请看官方文档。
"""
response = self.client.list_objects(
Bucket=bucket,
Prefix=key,
Marker=marker
)
return response
def showFileList_COS(self, key, bucket:str=None)->list:
"""
实现查询存储桶中所有对象的操作,因为cos的sdk有返回数据包大小的限制,所以我们需要进行一定的改动
"""
marker = ""
file_list = []
CosBucket = self.bucket if bucket is None else bucket
while True: # 轮询
response = self.showFileList_COS_base(key, CosBucket, marker)
try:
file_list.extend(response["Contents"])
except KeyError as e:
print(e)
raise
if response['IsTruncated'] == 'false': # 接下来没有数据了,就退出
break
marker = response['NextMarker']
return file_list
def uploadFile_COS(self, buffer, key, bucket:str=None):
"""
从COS上传数据,需要注意的是必须得是二进制文件
"""
CosBucket = self.bucket if bucket is None else bucket
try:
self.client.put_object(
Bucket=CosBucket,
Body=buffer,
Key=key
)
return True
except Exception as e:
print(e)
return False
class FuncDiary(CosConf):
filter_dict = {"60a5e13da00e6e0001fd53c8": "Cuny",
"612c290f3a9af4000170faad": "守望平凡",
"614de96e1259260001506d6c": "林泽毅-焕影一新"}
def __init__(self, func_name: str, uid: str, error_conf_path: str = f"{local_path_}/conf/func_error_conf.json"):
"""
日志类的实例化
Args:
func_name: 功能名称,影响了日志投递的路径
"""
super().__init__()
# 配置文件路径,默认是函数运行的路径下的conf文件夹
self.service_path: str = os.path.join(os.path.dirname(error_conf_path), "service_config.json")
self.error_dict = self.load_json(path=error_conf_path)
self.__up: str = f"wx/invokeFunction_c/{datetime.datetime.now().strftime('%Y/%m/%d/%H')}/{func_name}/"
self.func_name: str = func_name
# 下面这个属性是的日志名称的前缀
self.__start_time = datetime.datetime.now().timestamp()
h_point = datetime.datetime.strptime(datetime.datetime.now().strftime('%Y/%m/%d/%H'), '%Y/%m/%d/%H')
h_point_timestamp = h_point.timestamp()
self.__prefix = int(self.__start_time - h_point_timestamp).__str__() + "_"
self.__uid = uid
self.__diary = None
def __str__(self):
return f"<{self.func_name}> DIARY for {self.__uid}"
@property
def content(self):
return self.__diary
@content.setter
def content(self, value: str):
if not isinstance(value, dict):
raise TypeError("content 只能是字典!")
if "status" in value:
raise KeyError("status字段已被默认占用,请在日志信息中更换字段名称!")
if self.__diary is None:
self.__diary = value
else:
raise PermissionError("为了减小日志对整体代码的影响,<content>只能被覆写一次!")
def uploadDiary_COS(self, status_id: str, suffix: str = "", bucket: str = "hy-hcy-data-logs-1306602019"):
if self.__diary is None:
self.__diary = {"status": self.error_dict[status_id]}
if status_id == "0000":
self.__up += f"True/{self.__uid}/"
else:
self.__up += f"False/{self.__uid}/"
interval = int(10 * (datetime.datetime.now().timestamp() - self.__start_time))
prefix = self.__prefix + status_id + "_" + interval.__str__()
self.__diary["status"] = self.error_dict[status_id]
name = prefix + "_" + suffix if len(suffix) != 0 else prefix
self.uploadFile_COS(buffer=json.dumps(self.__diary), key=self.__up + name, bucket=bucket)
print(f"{self}上传成功.")
class ResponseWebSocket(CosConf):
# 网关推送地址
__HOST:str = None
@property
def sendBackHost(self):
if self.__HOST is None:
self.__HOST = self.get_key("HOST")
return self.__HOST
@sendBackHost.setter
def sendBackHost(self, value):
self.__HOST = value
def sendMsg_toWebSocket(self, message,connectionID:str = None):
if connectionID is not None:
retmsg = {'websocket': {}}
retmsg['websocket']['action'] = "data send"
retmsg['websocket']['secConnectionID'] = connectionID
retmsg['websocket']['dataType'] = 'text'
retmsg['websocket']['data'] = json.dumps(message)
requests.post(self.sendBackHost, json=retmsg)
print("send success!")
else:
pass
@staticmethod
def create_Msg(status, msg):
"""
本方法用于创建一个用于发送到WebSocket客户端的数据
输入的信息部分,需要有如下几个参数:
1. id,固定为"return-result"
2. status,如果输入为1则status=true, 如果输入为-1则status=false
3. obj_key, 图片的云端路径, 这是输入的msg本身自带的
"""
msg['status'] = "false" if status == -1 else 'true' # 其实最好还是用bool
msg['id'] = "async-back-msg"
msg['type'] = "funcType"
msg["format"] = "imageType"
return msg
# 功能服务类
class Service(ResponseWebSocket):
"""
服务的主函数,封装了cos上传/下载功能以及与api网关的一键通讯
将类的实例变成一个可被调用的对象,在服务运行的时候,只需要运行该对象即可
当然,因为是类,所以支持继承和修改
"""
@classmethod
def process(cls, *args, **kwargs):
"""
处理函数,在使用的时候请将之重构
"""
pass
@classmethod
def __call__(cls, *args, **kwargs):
pass
|