0%

python3封装mysql操作类

类视图如下

本文来自:http://www.wmhfly.com/php/python3-mysql-class.html

我自己改造了一次,加入到自己的自动化接口测试框架里面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
  FIND_BY_SQL = "findBySql" # 根据sql查找
COUNT_BY_SQL = "countBySql" # 自定义sql 统计影响行数
INSERT = "insert" # 插入
UPDATE_BY_ATTR = "updateByAttr" # 更新数据
DELETE_BY_ATTR = "deleteByAttr" # 删除数据
FIND_BY_ATTR = "findByAttr" # 根据条件查询一条记录
FIND_ALL_BY_ATTR = "findAllByAttr" #根据条件查询多条记录
COUNT = "count" # 统计行
EXIST = "exist" # 是否存在该记录


import mysql.connector
import mysql.connector.errors
from common.customConst import Const
class MySQLet:
"""Connection to a MySQL"""
# def __init__(self,user='',password='',database='',charset=None,port=3306):
def __init__(self,**kwargs):
try:
self._conn = mysql.connector.connect(host=kwargs["host"], user=kwargs["user"], password=kwargs["password"],
charset=kwargs["charset"], database=kwargs["database"], port=kwargs["port"])
self.__cursor = None
print("连接数据库")
#set charset charset = ('latin1','latin1_general_ci')
except mysql.connector.errors.ProgrammingError as err:
print('mysql连接错误:' + err.msg)

# def findBySql(self, sql, params={}, limit=0, join='AND'):
def findBySql(self, **kwargs):
"""
自定义sql语句查找
limit = 是否需要返回多少行
params = dict(field=value)
join = 'AND | OR'
"""
cursor = self.__getCursor()
# sql = self.__joinWhere(kwargs["sql"], kwargs["params"], kwargs["join"])
if kwargs.get("join", 0) == 0: kwargs["join"] = "AND"
sql = self.__joinWhere(**kwargs)
cursor.execute(sql, tuple(kwargs["params"].values()))
rows = cursor.fetchmany(size=kwargs["limit"]) if kwargs["limit"] > 0 else cursor.fetchall()
result = [dict(zip(cursor.column_names,row)) for row in rows] if rows else None
return len(result)

# def countBySql(self,sql,params = {},join = 'AND'):
def countBySql(self, **kwargs):
"""自定义sql 统计影响行数"""
if kwargs.get("join", 0) == 0: kwargs["join"] = "AND"
cursor = self.__getCursor()
# sql = self.__joinWhere(kwargs["sql"], kwargs["params"], kwargs["join"])
sql = self.__joinWhere(**kwargs)
cursor.execute(sql, tuple(kwargs["params"].values()))
result = cursor.fetchall() # fetchone是一条记录, fetchall 所有记录
return len(result) if result else 0

# def insert(self,table,data):
def insert(self, **kwargs):
"""新增一条记录
table: 表名
data: dict 插入的数据
"""
fields = ','.join('`'+k+'`' for k in kwargs["data"].keys())
values = ','.join(("%s", ) * len(kwargs["data"]))
sql = 'INSERT INTO `%s` (%s) VALUES (%s)' % (kwargs["table"], fields, values)
cursor = self.__getCursor()
cursor.execute(sql, tuple(kwargs["data"].values()))
insert_id = cursor.lastrowid
self._conn.commit()
return insert_id

# def updateByAttr(self,table,data,params={},join='AND'):
def updateByAttr(self, **kwargs):
# """更新数据"""
if kwargs.get("params", 0) == 0:
kwargs["params"] = {}
if kwargs.get("join", 0) == 0:
kwargs["join"] = "AND"
fields = ','.join('`' + k + '`=%s' for k in kwargs["data"].keys())
values = list(kwargs["data"].values())


values.extend(list(kwargs["params"].values()))
sql = "UPDATE `%s` SET %s " % (kwargs["table"], fields)
kwargs["sql"] = sql
sql = self.__joinWhere(**kwargs)
cursor = self.__getCursor()
cursor.execute(sql, tuple(values))
self._conn.commit()
return cursor.rowcount


# def updateByPk(self,table,data,id,pk='id'):
def updateByPk(self, **kwargs):
"""根据主键更新,默认是id为主键"""
return self.updateByAttr(**kwargs)

# def deleteByAttr(self,table,params={},join='AND'):
def deleteByAttr(self, **kwargs):
"""删除数据"""
if kwargs.get("params", 0) == 0:
kwargs["params"] = {}
if kwargs.get("join", 0) == 0:
kwargs["join"] = "AND"
# fields = ','.join('`'+k+'`=%s' for k in kwargs["params"].keys())
sql = "DELETE FROM `%s` " % kwargs["table"]
kwargs["sql"] = sql
# sql = self.__joinWhere(sql, kwargs["params"], kwargs["join"])
sql = self.__joinWhere(**kwargs)
cursor = self.__getCursor()
cursor.execute(sql, tuple(kwargs["params"].values()))
self._conn.commit()
return cursor.rowcount

# def deleteByPk(self,table,id,pk='id'):
def deleteByPk(self, **kwargs):
"""根据主键删除,默认是id为主键"""
return self.deleteByAttr(**kwargs)

# def findByAttr(self,table,criteria = {}):
def findByAttr(self, **kwargs):
"""根據條件查找一條記錄"""
return self.__query(**kwargs)

# def findByPk(self,table,id,pk='id'):
def findByPk(self, **kwargs):
return self.findByAttr(**kwargs)

# def findAllByAttr(self,table,criteria={}, whole=true):
def findAllByAttr(self, **kwargs):
"""根據條件查找記錄"""
return self.__query(**kwargs)

# def count(self,table,params={},join='AND'):
def count(self, **kwargs):
"""根据条件统计行数"""
if kwargs.get("join", 0) == 0: kwargs["join"] = "AND"
sql = 'SELECT COUNT(*) FROM `%s`' % kwargs["table"]
# sql = self.__joinWhere(sql, kwargs["params"], kwargs["join"])
kwargs["sql"] = sql
sql = self.__joinWhere(**kwargs)
cursor = self.__getCursor()
cursor.execute(sql, tuple(kwargs["params"].values()))
result = cursor.fetchone()
return result[0] if result else 0

# def exist(self,table,params={},join='AND'):
def exist(self, **kwargs):
"""判断是否存在"""
return self.count(**kwargs) > 0

def close(self):
"""关闭游标和数据库连接"""
if self.__cursor is not None:
self.__cursor.close()
self._conn.close()

def __getCursor(self):
"""获取游标"""
if self.__cursor is None:
self.__cursor = self._conn.cursor()
return self.__cursor

# def __joinWhere(self,sql,params,join):
def __joinWhere(self, **kwargs):
"""转换params为where连接语句"""
if kwargs["params"]:
keys,_keys = self.__tParams(**kwargs)
where = ' AND '.join(k+'='+_k for k,_k in zip(keys,_keys)) if kwargs["join"] == 'AND' else ' OR '.join(k+'='+_k for k,_k in zip(keys,_keys))
kwargs["sql"]+=' WHERE ' + where
return kwargs["sql"]

# def __tParams(self,params):
def __tParams(self, **kwargs):
keys = ['`'+k+'`' for k in kwargs["params"].keys()]
_keys = ['%s' for k in kwargs["params"].keys()]
return keys,_keys

# def __query(self,table,criteria,whole=False):
def __query(self, **kwargs):
if kwargs.get("whole", False) == False or kwargs["whole"] is not True:
kwargs["whole"] = False
kwargs["criteria"]['limit'] = 1
# sql = self.__contact_sql(kwargs["table"], kwargs["criteria"])
sql = self.__contact_sql(**kwargs)
cursor = self.__getCursor()
cursor.execute(sql)
rows = cursor.fetchall() if kwargs["whole"] else cursor.fetchone()
result = [dict(zip(cursor.column_names, row)) for row in rows] if kwargs["whole"] else dict(zip(cursor.column_names, rows)) if rows else None
return result

# def __contact_sql(self,table,criteria):
def __contact_sql(self, **kwargs):
sql = 'SELECT '
if kwargs["criteria"] and type(kwargs["criteria"]) is dict:
#select fields
if 'select' in kwargs["criteria"]:
fields = kwargs["criteria"]['select'].split(',')
sql+= ','.join('`'+field+'`' for field in fields)
else:
sql+=' * '
#table
sql+=' FROM `%s`'% kwargs["table"]
#where
if 'where' in kwargs["criteria"]:
sql+=' WHERE '+ kwargs["criteria"]['where']
#group by
if 'group' in kwargs["criteria"]:
sql+=' GROUP BY '+ kwargs["criteria"]['group']
#having
if 'having' in kwargs["criteria"]:
sql+=' HAVING '+ kwargs["criteria"]['having']
#order by
if 'order' in kwargs["criteria"]:
sql+=' ORDER BY '+ kwargs["criteria"]['order']
#limit
if 'limit' in kwargs["criteria"]:
sql+=' LIMIT '+ str(kwargs["criteria"]['limit'])
#offset
if 'offset' in kwargs["criteria"]:
sql+=' OFFSET '+ str(kwargs["criteria"]['offset'])
else:
sql+=' * FROM `%s`'% kwargs["table"]
return sql
def findKeySql(self, key ,**kwargs):
sqlOperate = {
Const.COUNT: lambda: self.count(**kwargs),
Const.COUNT_BY_SQL: lambda: self.countBySql(**kwargs),
Const.DELETE_BY_ATTR: lambda: self.deleteByAttr(**kwargs),
Const.EXIST: lambda: self.exist(**kwargs),
Const.FIND_ALL_BY_ATTR: lambda: self.findAllByAttr(**kwargs),
Const.INSERT: lambda: self.insert(**kwargs),
Const.FIND_BY_ATTR: lambda: self.findByAttr(**kwargs),
Const.UPDATE_BY_ATTR: lambda: self.updateByAttr(**kwargs),
Const.FIND_BY_SQL: lambda: self.findBySql(**kwargs)

}
return sqlOperate[key]()


if __name__ == "__main__":
mysqlet = MySQLet(host="127.0.0.1", user="root", password="", charset="utf8", database="userinfo", port=3306)
# 根据字段统计count, join>>AND,OR,可以不传,默认为AND
print(mysqlet.findKeySql(Const.COUNT, table="info", params={"id": "11", "name": "666"}, join="OR"))
# 自定义sql语句统计count
print(mysqlet.findKeySql(Const.COUNT_BY_SQL, sql="select * from info", params={"name": "666"}, join="AND"))
#插入数据
print(mysqlet.findKeySql(Const.INSERT, table="info", data={"name":"333", "pwd": "111"}))
#根据字段删除,不传params参数,就是删除全部
print(mysqlet.findKeySql(Const.DELETE_BY_ATTR, table="info", params={"id": 20}))
# 查找是否存在该记录,不传params参数,就是查找全部.join同上
print(mysqlet.findKeySql(Const.EXIST, table="info", params={"id": 180},join='AND'))
#根据字段查找多条记录,whole不传就查一条记录,criteria里面可以传where,group by,having,order by,limt,offset
print(mysqlet.findKeySql(Const.FIND_ALL_BY_ATTR, table="info", criteria= {"where": "name=333"}, whole=True))
# 根据字段查一条记录,和上面的查多条记录参数基本一样,少了个whole参数
print(mysqlet.findKeySql(Const.FIND_BY_ATTR, table="info", criteria= {"where": "name=333"}))
# 根据字段更新数据库中的记录,join可以传AND,OR,不传默认取AND
print(mysqlet.findKeySql(Const.UPDATE_BY_ATTR, table="info",data={"name": "-09"}, params={"id": 18, "name": "333"}, join='AND'))
# 根据自定义sql语句查询记录,limit:0表示所有记录,join:AND|OR.不传取AND
print(mysqlet.findKeySql(Const.FIND_BY_SQL, sql="select * from info", params={"name": "333", "id": 18}, limit=0))