File size: 4,519 Bytes
dc82c71 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
"""
@Time: 2022/11/03
@Author: LiuShu
@File: 数据库操作类库
"""
import pymysql
from utility.loggers import logger
from utility.utils import config
class Cur_db(object):
def __init__(self):
self.config = config
self.db_name = self.config['database']['DB']
def pymysql_cur(self, reback=5):
""" 连接数据库 """
try:
self.conn = pymysql.connect(host=self.config['database']['HOST'], user=self.config['database']['USER'],
password=self.config['database']['PWD'], db=self.db_name,
port=int(self.config['database']['PORT']),
charset='utf8')
except Exception as e:
if reback == 0:
logger.exception('Exception occurred.')
return
else:
logger.exception('Exception occurred.')
reback -= 1
return self.pymysql_cur(reback)
def get_db_name(self):
"""
:return:
"""
return self.db_name
def select(self, sql, params, reback=2):
""" 查询单条语句,并返回查询所有的结果 """
try:
cur = self.conn.cursor()
cur.execute(sql, params)
# 单条
res = cur.fetchone()
cur.close()
if res:
return res
return
except Exception as e:
logger.exception('Exception occurred.')
if reback > 0:
reback -= 1
return self.select(sql, reback)
else:
logger.info(str('*' * 100))
return
def _select(self, sql, reback=2):
try:
cur = self.conn.cursor()
cur.execute(sql)
# 单条
res = cur.fetchone()
cur.close()
if res:
return res[0]
return
except Exception as e:
logger.exception('Exception occurred.')
if reback > 0:
reback -= 1
return self.select(sql, reback)
else:
logger.info(str('*' * 100))
return
def selectMany(self, sql, reback=2):
try:
cur = self.conn.cursor()
cur.execute(sql)
res = cur.fetchall()
cur.close()
if res:
return res
logger.info(str(sql))
return
except Exception as e:
logger.exception('Exception occurred.')
if reback > 0:
reback -= 1
return self.selectMany(sql, reback)
else:
logger.info(str('*' * 100))
return
def insert(self, sql, params):
cur = self.conn.cursor()
cur.execute(sql, params)
self.conn.commit()
return
def _insert(self, sql):
cur = self.conn.cursor()
cur.execute(sql)
self.conn.commit()
def insert_batch(self, sql, data_list):
"""
将dataframe批量入库
:param sql: 插入语句
:return:
"""
cur = self.conn.cursor()
# 开启事务
self.conn.begin()
try:
cur.executemany(sql, data_list)
self.conn.commit()
cur.close()
self.conn.close()
return True
except:
# 万一失败了,要进行回滚操作
self.conn.rollback()
cur.close()
self.conn.close()
return False
def update(self, sql, params):
cur = self.conn.cursor()
cur.execute(sql, params)
self.conn.commit()
return
def _update(self, sql):
try:
cur = self.conn.cursor()
cur.execute(sql)
self.conn.commit()
except Exception as e:
logger.exception('Exception occurred.')
def close(self):
self.conn.close()
pass
if __name__ == '__main__':
db_con = Cur_db()
logger.info(str(db_con.config['database']['HOST']))
print(str(db_con.config['database']['HOST']))
db_con.pymysql_cur()
sql = "SELECT * FROM cargo"
res = db_con.selectMany(sql)
print(str(res))
db_con.close() |