Spaces:
Running
Running
File size: 14,217 Bytes
d5d20be |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
import os
import pymongo
import datetime
import time
from .cloudService import GetConfig
local_path = os.path.dirname(__file__)
class DBUtils(GetConfig):
"""
从安全的角度出发,将一些默认配置文件上传至COS中,接下来使用COS和它的子类的时候,在第一次使用时需要输入Cuny给的id和key
用于连接数据库等对象
当然,在db_default_download = False的时候,如果在运行路径下已经有配置文件了,
那么就不用再次下载了,也不用输入id和key
事实上这只需要运行一次,因为配置文件将会被下载至源码文件夹中
如果要自定义路径,请在继承的子类中编写__init__函数,将service_path定向到指定路径
"""
__BASE_DIR: dict = None
__PARAMS_DIR: dict = None
db_base_path: str = f"{local_path}/conf/base_config.json"
db_params_path: str = f"{local_path}/conf/params.json"
db_default_download: bool = False
@property
def base_config(self):
if self.__BASE_DIR is None:
self.__BASE_DIR = self.load_json(self.db_base_path, self.db_default_download)
return self.__BASE_DIR
@property
def db_config(self):
return self.base_config["database_config"]
@property
def params_config(self):
if self.__PARAMS_DIR is None:
self.__PARAMS_DIR = self.load_json(self.db_params_path, self.db_default_download)
return self.__PARAMS_DIR
@property
def size_dir(self):
return self.params_config["size_config"]
@property
def func_dir(self):
return self.params_config["func_config"]
@property
def wx_config(self):
return self.base_config["wx_config"]
def get_dbClient(self):
return pymongo.MongoClient(self.db_config["connect_url"])
@staticmethod
def get_time(yyyymmdd=None, delta_date=0):
"""
给出当前的时间
:param yyyymmdd: 以yyyymmdd给出的日期时间
:param delta_date: 获取减去delta_day后的时间,默认为0就是当天
时间格式:yyyy_mm_dd
"""
if yyyymmdd is None:
now_time = (datetime.datetime.now() - datetime.timedelta(delta_date)).strftime("%Y-%m-%d")
return now_time
# 输入了yyyymmdd的数据和delta_date,通过这两个数据返回距离yyyymmdd delta_date天的时间
pre_time = datetime.datetime(int(yyyymmdd[0:4]), int(yyyymmdd[4:6]), int(yyyymmdd[6:8]))
return (pre_time - datetime.timedelta(delta_date)).strftime("%Y-%m-%d")
# 获得时间戳
def get_timestamp(self, date_time:str=None) -> int:
"""
输入的日期形式为:"2021-11-29 16:39:45.999"
真正必须输入的是前十个字符,及精确到日期,后面的时间可以不输入,不输入则默认置零
"""
def standardDateTime(dt:str) -> str:
"""
规范化时间字符串
"""
if len(dt) < 10:
raise ValueError("你必须至少输入准确到天的日期!比如:2021-11-29")
elif len(dt) == 10:
return dt + " 00:00:00.0"
else:
try:
date, time = dt.split(" ")
except ValueError:
raise ValueError("你只能也必须在日期与具体时间之间增加一个空格,其他地方不能出现空格!")
while len(time) < 10:
if len(time) in (2, 5):
time += ":"
elif len(time) == 8:
time += "."
else:
time += "0"
return date + " " + time
if date_time is None:
# 默认返回当前时间(str), date_time精确到毫秒
date_time = datetime.datetime.now()
# 转换成时间戳
else:
date_time = standardDateTime(dt=date_time)
date_time = datetime.datetime.strptime(date_time, "%Y-%m-%d %H:%M:%S.%f")
timestamp_ms = int(time.mktime(date_time.timetuple()) * 1000.0 + date_time.microsecond / 1000.0)
return timestamp_ms
@staticmethod
def get_standardTime(yyyy_mm_dd: str):
return yyyy_mm_dd[0:4] + yyyy_mm_dd[5:7] + yyyy_mm_dd[8:10]
def find_oneDay_data(self, db_name: str, collection_name: str, date: str = None) -> dict:
"""
获取指定天数的数据,如果date is None,就自动寻找距今最近的有数据的那一天的数据
"""
df = None # 应该被返回的数据
collection = self.get_dbClient()[db_name][collection_name]
if date is None: # 自动寻找前几天的数据,最多三十天
for delta_date in range(1, 31):
date_yyyymmdd = self.get_standardTime(self.get_time(delta_date=delta_date))
filter_ = {"date": date_yyyymmdd}
df = collection.find_one(filter=filter_)
if df is not None:
del df["_id"]
break
else:
filter_ = {"date": date}
df = collection.find_one(filter=filter_)
if df is not None:
del df["_id"]
return df
def find_daysData_byPeriod(self, date_period: tuple, db_name: str, col_name: str):
# 给出一个指定的范围日期,返回相应的数据(日期的两头都会被寻找)
# 这个函数我们默认数据库中的数据是连续的,即不会出现在 20211221 到 20211229 之间有一天没有数据的情况
if len(date_period) != 2:
raise ValueError("date_period数据结构:(开始日期,截止日期)")
start, end = date_period # yyyymmdd
delta_date = int(end) - int(start)
if delta_date < 0:
raise ValueError("传入的日期有误!")
collection = self.get_dbClient()[db_name][col_name]
date = start
while int(date) <= int(end):
yield collection.find_one(filter={"date": date})
date = self.get_standardTime(self.get_time(date, -1))
@staticmethod
def find_biggest_valueDict(dict_: dict):
# 寻找字典中数值最大的字段,要求输入的字典的字段值全为数字
while len(dict_) > 0:
max_value = 0
p = None
for key in dict_:
if dict_[key] > max_value:
p = key
max_value = dict_[key]
yield p, max_value
del dict_[p]
def copy_andAdd_dict(self, dict_base, dict_):
# 深度拷贝字典,将后者赋值给前者
# 如果后者的键名在前者已经存在,则直接相加。这就要求两者的数据是数值型
for key in dict_:
if key not in dict_base:
dict_base[key] = dict_[key]
else:
if isinstance(dict_[key], int) or isinstance(dict_[key], float):
dict_base[key] = round(dict_[key] + dict_base[key], 2)
else:
dict_base[key] = self.copy_andAdd_dict(dict_base[key], dict_[key])
return dict_base
@staticmethod
def compare_data(dict1: dict, dict2: dict, suffix: str, save: int, **kwargs):
"""
有两个字典,并且通过kwargs会传输一个新的字典,根据字典中的键值我们进行比对,处理成相应的数据格式
并且在dict1中,生成一个新的键值,为kwargs中的元素+suffix
save:保留几位小数
"""
new_dict = dict1.copy()
for key in kwargs:
try:
if kwargs[key] not in dict2 or int(dict2[kwargs[key]]) == -1 or float(dict1[kwargs[key]]) <= 0.0:
# 数据不存在
data_new = 5002
else:
try:
data_new = round(
((float(dict1[kwargs[key]]) - float(dict2[kwargs[key]])) / float(dict2[kwargs[key]])) * 100
, save)
except ZeroDivisionError:
data_new = 5002
if data_new == 0.0:
data_new = 0
except TypeError as e:
print(e)
data_new = 5002 # 如果没有之前的数据,默认返回0
new_dict[kwargs[key] + suffix] = data_new
return new_dict
@staticmethod
def sum_dictList_byKey(dictList: list, **kwargs) -> dict:
"""
有一个列表,列表中的元素为字典,并且所有字典都有一个键值为key的字段,字段值为数字
我们将每一个字典的key字段提取后相加,得到该字段值之和.
"""
sum_num = {}
if kwargs is None:
raise ImportError("Please input at least ONE key")
for key in kwargs:
sum_num[kwargs[key]] = 0
for dict_ in dictList:
if not isinstance(dict_, dict):
raise TypeError("object is not DICT!")
for key in kwargs:
sum_num[kwargs[key]] += dict_[kwargs[key]]
return sum_num
@staticmethod
def sum_2ListDict(list_dict1: list, list_dict2: list, key_name, data_name):
"""
有两个列表,列表内的元素为字典,我们根据key所对应的键值寻找列表中键值相同的两个元素,将他们的data对应的键值相加
生成新的列表字典(其余键值被删除)
key仅在一个列表中存在,则直接加入新的列表字典
"""
sum_list = []
def find_sameKey(kn, key_, ld: list) -> int:
for dic_ in ld:
if dic_[kn] == key_:
post_ = ld.index(dic_)
return post_
return -1
for dic in list_dict1:
key = dic[key_name] # 键名
post = find_sameKey(key_name, key, list_dict2) # 在list2中寻找相同的位置
data = dic[data_name] + list_dict2[post][data_name] if post != -1 else dic[data_name]
sum_list.append({key_name: key, data_name: data})
return sum_list
@staticmethod
def find_biggest_dictList(dictList: list, key: str = "key", data: str = "value"):
"""
有一个列表,里面每一个元素都是一个字典
这些字典有一些共通性质,那就是里面都有一个key键名和一个data键名,后者的键值必须是数字
我们根据data键值的大小进行生成,每一次返回列表中data键值最大的数和它的key键值
"""
while len(dictList) > 0:
point = 0
biggest_num = int(dictList[0][data])
biggest_key = dictList[0][key]
for i in range(len(dictList)):
num = int(dictList[i][data])
if num > biggest_num:
point = i
biggest_num = int(dictList[i][data])
biggest_key = dictList[i][key]
yield str(biggest_key), biggest_num
del dictList[point]
def get_share_data(self, date_yyyymmdd: str):
# 获得用户界面情况
visitPage = self.find_oneDay_data(date=date_yyyymmdd,
db_name="cuny-user-analysis",
collection_name="daily-userVisitPage")
if visitPage is not None:
# 这一部分没有得到数据是可以容忍的.不用抛出模态框错误
# 获得昨日用户分享情况
sum_num = self.sum_dictList_byKey(dictList=visitPage["data_list"],
key1="page_share_pv",
key2="page_share_uv")
else:
# 此时将分享次数等置为-1
sum_num = {"page_share_pv": -1, "page_share_uv": -1}
return sum_num
@staticmethod
def compare_date(date1_yyyymmdd: str, date2_yyyymmdd: str):
# 如果date1是date2的昨天,那么就返回True
date1 = int(date1_yyyymmdd)
date2 = int(date2_yyyymmdd)
return True if date2 - date1 == 1 else False
def change_time(self, date_yyyymmdd: str, mode: int):
# 将yyyymmdd的数据分开为相应的数据形式
if mode == 1:
if self.compare_date(date_yyyymmdd, self.get_standardTime(self.get_time(delta_date=0))) is False:
return date_yyyymmdd[0:4] + "年" + date_yyyymmdd[4:6] + "月" + date_yyyymmdd[6:8] + "日"
else:
return "昨日"
elif mode == 2:
date = date_yyyymmdd[0:4] + "." + date_yyyymmdd[4:6] + "." + date_yyyymmdd[6:8]
if self.compare_date(date_yyyymmdd, self.get_standardTime(self.get_time(delta_date=0))) is True:
return date + "~" + date + " | 昨日"
else:
return date + "~" + date
@staticmethod
def changeList_dict2List_list(dl: list, order: list):
"""
列表内是一个个字典,本函数将字典拆解,以order的形式排列键值为列表
考虑到一些格式的问题,这里我采用生成器的形式封装
"""
for dic in dl:
# dic是列表内的字典元素
tmp = []
for key_name in order:
key = dic[key_name]
tmp.append(key)
yield tmp
def dict_mapping(self, dict_name: str, id_: str):
"""
进行字典映射,输入字典名称和键名,返回具体的键值
如果不存在,则原路返回键名
"""
try:
return getattr(self, dict_name)[id_]
except KeyError:
return id_
except AttributeError:
print(f"[WARNING]: 本对象内部不存在{dict_name}!")
return id_
@staticmethod
def dictAddKey(dic: dict, dic_tmp: dict, **kwargs):
"""
往字典中加入参数,可迭代
"""
for key in kwargs:
dic[key] = dic_tmp[key]
return dic
if __name__ == "__main__":
dbu = DBUtils() |