File size: 4,519 Bytes
dc82c71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""

@Time: 2022/11/03

@Author: LiuShu

@File: 数据库操作类库

"""
import pymysql
from utility.loggers import logger
from utility.utils import config


class Cur_db(object):
    def __init__(self):
        self.config = config
        self.db_name = self.config['database']['DB']

    def pymysql_cur(self, reback=5):
        """ 连接数据库 """
        try:
            self.conn = pymysql.connect(host=self.config['database']['HOST'], user=self.config['database']['USER'],
                                        password=self.config['database']['PWD'], db=self.db_name,
                                        port=int(self.config['database']['PORT']),
                                        charset='utf8')
        except Exception as e:
            if reback == 0:
                logger.exception('Exception occurred.')
                return
            else:
                logger.exception('Exception occurred.')
                reback -= 1
                return self.pymysql_cur(reback)

    def get_db_name(self):
        """



        :return:

        """
        return self.db_name

    def select(self, sql, params, reback=2):
        """ 查询单条语句,并返回查询所有的结果 """
        try:
            cur = self.conn.cursor()
            cur.execute(sql, params)
            # 单条
            res = cur.fetchone()
            cur.close()
            if res:
                return res
            return
        except Exception as e:
            logger.exception('Exception occurred.')
            if reback > 0:
                reback -= 1
                return self.select(sql, reback)
            else:
                logger.info(str('*' * 100))
                return

    def _select(self, sql, reback=2):
        try:
            cur = self.conn.cursor()
            cur.execute(sql)
            # 单条
            res = cur.fetchone()
            cur.close()
            if res:
                return res[0]
            return
        except Exception as e:
            logger.exception('Exception occurred.')
            if reback > 0:
                reback -= 1
                return self.select(sql, reback)
            else:
                logger.info(str('*' * 100))
                return

    def selectMany(self, sql, reback=2):
        try:
            cur = self.conn.cursor()
            cur.execute(sql)
            res = cur.fetchall()
            cur.close()
            if res:
                return res
            logger.info(str(sql))
            return
        except Exception as e:
            logger.exception('Exception occurred.')
            if reback > 0:
                reback -= 1
                return self.selectMany(sql, reback)
            else:
                logger.info(str('*' * 100))
                return

    def insert(self, sql, params):
        cur = self.conn.cursor()
        cur.execute(sql, params)
        self.conn.commit()
        return

    def _insert(self, sql):
        cur = self.conn.cursor()
        cur.execute(sql)
        self.conn.commit()

    def insert_batch(self, sql, data_list):
        """

        将dataframe批量入库

        :param sql: 插入语句

        :return:

        """
        cur = self.conn.cursor()
        # 开启事务
        self.conn.begin()
        try:
            cur.executemany(sql, data_list)
            self.conn.commit()
            cur.close()
            self.conn.close()
            return True
        except:
            # 万一失败了,要进行回滚操作
            self.conn.rollback()
            cur.close()
            self.conn.close()
            return False

    def update(self, sql, params):
        cur = self.conn.cursor()
        cur.execute(sql, params)
        self.conn.commit()
        return

    def _update(self, sql):
        try:
            cur = self.conn.cursor()
            cur.execute(sql)
            self.conn.commit()
        except Exception as e:
            logger.exception('Exception occurred.')

    def close(self):
        self.conn.close()
        pass


if __name__ == '__main__':
    db_con = Cur_db()
    logger.info(str(db_con.config['database']['HOST']))
    print(str(db_con.config['database']['HOST']))
    db_con.pymysql_cur()
    sql = "SELECT * FROM cargo"
    res = db_con.selectMany(sql)
    print(str(res))
    db_con.close()