File size: 14,217 Bytes
d5d20be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import os
import pymongo
import datetime
import time
from .cloudService import GetConfig
local_path = os.path.dirname(__file__)


class DBUtils(GetConfig):
    """
    从安全的角度出发,将一些默认配置文件上传至COS中,接下来使用COS和它的子类的时候,在第一次使用时需要输入Cuny给的id和key
    用于连接数据库等对象
    当然,在db_default_download = False的时候,如果在运行路径下已经有配置文件了,
    那么就不用再次下载了,也不用输入id和key
    事实上这只需要运行一次,因为配置文件将会被下载至源码文件夹中
    如果要自定义路径,请在继承的子类中编写__init__函数,将service_path定向到指定路径
    """
    __BASE_DIR: dict = None
    __PARAMS_DIR: dict = None
    db_base_path: str = f"{local_path}/conf/base_config.json"
    db_params_path: str = f"{local_path}/conf/params.json"
    db_default_download: bool = False

    @property
    def base_config(self):
        if self.__BASE_DIR is None:
            self.__BASE_DIR = self.load_json(self.db_base_path, self.db_default_download)
        return self.__BASE_DIR

    @property
    def db_config(self):
        return self.base_config["database_config"]

    @property
    def params_config(self):
        if self.__PARAMS_DIR is None:
            self.__PARAMS_DIR = self.load_json(self.db_params_path, self.db_default_download)
        return self.__PARAMS_DIR

    @property
    def size_dir(self):
        return self.params_config["size_config"]

    @property
    def func_dir(self):
        return self.params_config["func_config"]

    @property
    def wx_config(self):
        return self.base_config["wx_config"]

    def get_dbClient(self):
        return pymongo.MongoClient(self.db_config["connect_url"])

    @staticmethod
    def get_time(yyyymmdd=None, delta_date=0):
        """
        给出当前的时间
        :param yyyymmdd: 以yyyymmdd给出的日期时间
        :param delta_date: 获取减去delta_day后的时间,默认为0就是当天
        时间格式:yyyy_mm_dd
        """
        if yyyymmdd is None:
            now_time = (datetime.datetime.now() - datetime.timedelta(delta_date)).strftime("%Y-%m-%d")
            return now_time
        # 输入了yyyymmdd的数据和delta_date,通过这两个数据返回距离yyyymmdd   delta_date天的时间
        pre_time = datetime.datetime(int(yyyymmdd[0:4]), int(yyyymmdd[4:6]), int(yyyymmdd[6:8]))
        return (pre_time - datetime.timedelta(delta_date)).strftime("%Y-%m-%d")

    # 获得时间戳
    def get_timestamp(self, date_time:str=None) -> int:
        """
        输入的日期形式为:"2021-11-29 16:39:45.999"
        真正必须输入的是前十个字符,及精确到日期,后面的时间可以不输入,不输入则默认置零
        """
        def standardDateTime(dt:str) -> str:
            """
            规范化时间字符串
            """
            if len(dt) < 10:
                raise ValueError("你必须至少输入准确到天的日期!比如:2021-11-29")
            elif len(dt) == 10:
                return dt + " 00:00:00.0"
            else:
                try:
                    date, time = dt.split(" ")
                except ValueError:
                    raise ValueError("你只能也必须在日期与具体时间之间增加一个空格,其他地方不能出现空格!")
                while len(time) < 10:
                    if len(time) in (2, 5):
                        time += ":"
                    elif len(time) == 8:
                        time += "."
                    else:
                        time += "0"
                return date + " " + time
        if date_time is None:
            # 默认返回当前时间(str), date_time精确到毫秒
            date_time = datetime.datetime.now()
        # 转换成时间戳
        else:
            date_time = standardDateTime(dt=date_time)
            date_time = datetime.datetime.strptime(date_time, "%Y-%m-%d %H:%M:%S.%f")
        timestamp_ms = int(time.mktime(date_time.timetuple()) * 1000.0 + date_time.microsecond / 1000.0)
        return timestamp_ms

    @staticmethod
    def get_standardTime(yyyy_mm_dd: str):
        return yyyy_mm_dd[0:4] + yyyy_mm_dd[5:7] + yyyy_mm_dd[8:10]

    def find_oneDay_data(self, db_name: str, collection_name: str, date: str = None) -> dict:
        """
        获取指定天数的数据,如果date is None,就自动寻找距今最近的有数据的那一天的数据
        """
        df = None  # 应该被返回的数据
        collection = self.get_dbClient()[db_name][collection_name]
        if date is None:  # 自动寻找前几天的数据,最多三十天
            for delta_date in range(1, 31):
                date_yyyymmdd = self.get_standardTime(self.get_time(delta_date=delta_date))
                filter_ = {"date": date_yyyymmdd}
                df = collection.find_one(filter=filter_)
                if df is not None:
                    del df["_id"]
                    break
        else:
            filter_ = {"date": date}
            df = collection.find_one(filter=filter_)
            if df is not None:
                del df["_id"]
        return df

    def find_daysData_byPeriod(self, date_period: tuple, db_name: str, col_name: str):
        # 给出一个指定的范围日期,返回相应的数据(日期的两头都会被寻找)
        # 这个函数我们默认数据库中的数据是连续的,即不会出现在 20211221 到 20211229 之间有一天没有数据的情况
        if len(date_period) != 2:
            raise ValueError("date_period数据结构:(开始日期,截止日期)")
        start, end = date_period  # yyyymmdd
        delta_date = int(end) - int(start)
        if delta_date < 0:
            raise ValueError("传入的日期有误!")
        collection = self.get_dbClient()[db_name][col_name]
        date = start
        while int(date) <= int(end):
            yield collection.find_one(filter={"date": date})
            date = self.get_standardTime(self.get_time(date, -1))

    @staticmethod
    def find_biggest_valueDict(dict_: dict):
        # 寻找字典中数值最大的字段,要求输入的字典的字段值全为数字
        while len(dict_) > 0:
            max_value = 0
            p = None
            for key in dict_:
                if dict_[key] > max_value:
                    p = key
                    max_value = dict_[key]
            yield p, max_value
            del dict_[p]

    def copy_andAdd_dict(self, dict_base, dict_):
        # 深度拷贝字典,将后者赋值给前者
        # 如果后者的键名在前者已经存在,则直接相加。这就要求两者的数据是数值型
        for key in dict_:
            if key not in dict_base:
                dict_base[key] = dict_[key]
            else:
                if isinstance(dict_[key], int) or isinstance(dict_[key], float):
                    dict_base[key] = round(dict_[key] + dict_base[key], 2)
                else:
                    dict_base[key] = self.copy_andAdd_dict(dict_base[key], dict_[key])
        return dict_base

    @staticmethod
    def compare_data(dict1: dict, dict2: dict, suffix: str, save: int, **kwargs):
        """
        有两个字典,并且通过kwargs会传输一个新的字典,根据字典中的键值我们进行比对,处理成相应的数据格式
        并且在dict1中,生成一个新的键值,为kwargs中的元素+suffix
        save:保留几位小数
        """
        new_dict = dict1.copy()
        for key in kwargs:
            try:
                if kwargs[key] not in dict2 or int(dict2[kwargs[key]]) == -1 or float(dict1[kwargs[key]]) <= 0.0:
                    # 数据不存在
                    data_new = 5002
                else:
                    try:
                        data_new = round(
                            ((float(dict1[kwargs[key]]) - float(dict2[kwargs[key]])) / float(dict2[kwargs[key]])) * 100
                            , save)
                    except ZeroDivisionError:
                        data_new = 5002
                    if data_new == 0.0:
                        data_new = 0
            except TypeError as e:
                print(e)
                data_new = 5002  # 如果没有之前的数据,默认返回0
            new_dict[kwargs[key] + suffix] = data_new
        return new_dict

    @staticmethod
    def sum_dictList_byKey(dictList: list, **kwargs) -> dict:
        """
        有一个列表,列表中的元素为字典,并且所有字典都有一个键值为key的字段,字段值为数字
        我们将每一个字典的key字段提取后相加,得到该字段值之和.
        """
        sum_num = {}
        if kwargs is None:
            raise ImportError("Please input at least ONE key")
        for key in kwargs:
            sum_num[kwargs[key]] = 0
        for dict_ in dictList:
            if not isinstance(dict_, dict):
                raise TypeError("object is not DICT!")
            for key in kwargs:
                sum_num[kwargs[key]] += dict_[kwargs[key]]
        return sum_num

    @staticmethod
    def sum_2ListDict(list_dict1: list, list_dict2: list, key_name, data_name):
        """
        有两个列表,列表内的元素为字典,我们根据key所对应的键值寻找列表中键值相同的两个元素,将他们的data对应的键值相加
        生成新的列表字典(其余键值被删除)
        key仅在一个列表中存在,则直接加入新的列表字典
        """
        sum_list = []

        def find_sameKey(kn, key_, ld: list) -> int:
            for dic_ in ld:
                if dic_[kn] == key_:
                    post_ = ld.index(dic_)
                    return post_
            return -1

        for dic in list_dict1:
            key = dic[key_name]  # 键名
            post = find_sameKey(key_name, key, list_dict2)  # 在list2中寻找相同的位置
            data = dic[data_name] + list_dict2[post][data_name] if post != -1 else dic[data_name]
            sum_list.append({key_name: key, data_name: data})
        return sum_list

    @staticmethod
    def find_biggest_dictList(dictList: list, key: str = "key", data: str = "value"):
        """
        有一个列表,里面每一个元素都是一个字典
        这些字典有一些共通性质,那就是里面都有一个key键名和一个data键名,后者的键值必须是数字
        我们根据data键值的大小进行生成,每一次返回列表中data键值最大的数和它的key键值
        """
        while len(dictList) > 0:
            point = 0
            biggest_num = int(dictList[0][data])
            biggest_key = dictList[0][key]
            for i in range(len(dictList)):
                num = int(dictList[i][data])
                if num > biggest_num:
                    point = i
                    biggest_num = int(dictList[i][data])
                    biggest_key = dictList[i][key]
            yield str(biggest_key), biggest_num
            del dictList[point]

    def get_share_data(self, date_yyyymmdd: str):
        # 获得用户界面情况
        visitPage = self.find_oneDay_data(date=date_yyyymmdd,
                                          db_name="cuny-user-analysis",
                                          collection_name="daily-userVisitPage")
        if visitPage is not None:
            # 这一部分没有得到数据是可以容忍的.不用抛出模态框错误
            # 获得昨日用户分享情况
            sum_num = self.sum_dictList_byKey(dictList=visitPage["data_list"],
                                              key1="page_share_pv",
                                              key2="page_share_uv")
        else:
            # 此时将分享次数等置为-1
            sum_num = {"page_share_pv": -1, "page_share_uv": -1}
        return sum_num

    @staticmethod
    def compare_date(date1_yyyymmdd: str, date2_yyyymmdd: str):
        # 如果date1是date2的昨天,那么就返回True
        date1 = int(date1_yyyymmdd)
        date2 = int(date2_yyyymmdd)
        return True if date2 - date1 == 1 else False

    def change_time(self, date_yyyymmdd: str, mode: int):
        # 将yyyymmdd的数据分开为相应的数据形式
        if mode == 1:
            if self.compare_date(date_yyyymmdd, self.get_standardTime(self.get_time(delta_date=0))) is False:
                return date_yyyymmdd[0:4] + "年" + date_yyyymmdd[4:6] + "月" + date_yyyymmdd[6:8] + "日"
            else:
                return "昨日"
        elif mode == 2:
            date = date_yyyymmdd[0:4] + "." + date_yyyymmdd[4:6] + "." + date_yyyymmdd[6:8]
            if self.compare_date(date_yyyymmdd, self.get_standardTime(self.get_time(delta_date=0))) is True:
                return date + "~" + date + " | 昨日"
            else:
                return date + "~" + date

    @staticmethod
    def changeList_dict2List_list(dl: list, order: list):
        """
        列表内是一个个字典,本函数将字典拆解,以order的形式排列键值为列表
        考虑到一些格式的问题,这里我采用生成器的形式封装
        """
        for dic in dl:
            # dic是列表内的字典元素
            tmp = []
            for key_name in order:
                key = dic[key_name]
                tmp.append(key)
            yield tmp

    def dict_mapping(self, dict_name: str, id_: str):
        """
        进行字典映射,输入字典名称和键名,返回具体的键值
        如果不存在,则原路返回键名
        """
        try:
            return getattr(self, dict_name)[id_]
        except KeyError:
            return id_
        except AttributeError:
            print(f"[WARNING]: 本对象内部不存在{dict_name}!")
            return id_

    @staticmethod
    def dictAddKey(dic: dict, dic_tmp: dict, **kwargs):
        """
        往字典中加入参数,可迭代
        """
        for key in kwargs:
            dic[key] = dic_tmp[key]
        return dic


if __name__ == "__main__":
    dbu = DBUtils()