- 1 import pymysql
- 2 frompymysql.cursorsimport DictCursor
- 3 import re
- 4 import os
- 5 import sys
- 6 import datetime
- 7 import time
- 8 import logging
- 9 import importlib
- 10 importlib.reload(logging)
- 11logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ')
- 12
- 13
- 14usage=''' usage: python [script's path] [option]
- 15 ALL options need to assign:
- 16
- 17 -h : host, the database host,which database will store the results after analysis
- 18 -u : user, the db user
- 19 -p : password, the db user's password
- 20 -P : port, the db port
- 21 -f : file path, the binlog file
- 22 -tr : table name for record , the table name to store the row record
- 23 -tt : table name for transaction, the table name to store transactions
- 24 Example: python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow
- 25
- 26 '''
- 27
- 28 class queryanalyse:
- 29 def __init__(self):
- 30 #初始化
- 31self.host=''
- 32self.user=''
- 33self.password=''
- 34self.port='3306'
- 35self.fpath=''
- 36self.tbrow=''
- 37self.tbtran=''
- 38
- 39 self._get_db()
- 40logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tb_for_record={},tb_for_tran={}'.format(self.host,self.user,self.port,self.fpath,self.tbrow,self.tbtran))
- 41
- 42self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8')
- 43self.cur = self.mysqlconn.cursor(cursor=DictCursor)
- 44logging.info('MySQL which userd to store binlog event connection is ok')
- 45
- 46self.begin_time=''
- 47self.end_time=''
- 48self.db_name=''
- 49self.tb_name=''
- 50
- 51 def _get_db(self):
- 52 #解析用户输入的选项参数值,这里对password的处理是明文输入,可以自行处理成是input格式,
- 53 #由于可以拷贝binlog文件到非线上环境分析,所以password这块,没有特殊处理
- 54logging.info('begin to assign values to parameters')
- 55 iflen(sys.argv) == 1:
- 56 print(usage)
- 57sys.exit(1)
- 58 elifsys.argv[1] =='--help':
- 59 print(usage)
- 60 sys.exit()
- 61 eliflen(sys.argv) > 2:
- 62 foriinsys.argv[1:]:
- 63_argv = i.split('=')
- 64 if_argv[0] =='-h':
- 65self.host = _argv[1]
- 66 elif_argv[0] =='-u':
- 67self.user = _argv[1]
- 68 elif_argv[0] =='-P':
- 69self.port = int(_argv[1])
- 70 elif_argv[0] =='-f':
- 71self.fpath = _argv[1]
- 72 elif_argv[0] =='-tr':
- 73self.tbrow = _argv[1]
- 74 elif_argv[0] =='-tt':
- 75self.tbtran = _argv[1]
- 76 elif_argv[0] =='-p':
- 77self.password = _argv[1]
- 78 else:
- 79 print(usage)
- 80
- 81 def create_tab(self):
- 82 #创建两个表格:一个用户存储事务情况,一个用户存储每一行数据修改的情况
- 83 #注意,一个事务可以存储多行数据修改的情况
- 84logging.info('creating table ...')
- 85create_tb_sql ='''CREATE TABLE IF NOT EXISTS {} (
- 86 `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
- 87 `begin_time` datetime NOT NULL,
- 88 `end_time` datetime NOT NULL,
- 89 PRIMARY KEY (`auto_id`)
- 90 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 91 CREATE TABLE IF NOT EXISTS {} (
- 92 `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
- 93 `sqltype` int(11) NOT NULL COMMENT '1 is insert,2 is update,3 is delete',
- 94 `tran_num` int(11) NOT NULL COMMENT 'the transaction number',
- 95 `dbname` varchar(50) NOT NULL,
- 96 `tbname` varchar(50) NOT NULL,
- 97 PRIMARY KEY (`auto_id`),
- 98 KEY `sqltype` (`sqltype`),
- 99 KEY `dbname` (`dbname`),
- 100 KEY `tbname` (`tbname`)
- 101 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 102 truncate table {};
- 103 truncate table {};
- 104 '''.format(self.tbtran,self.tbrow,self.tbtran,self.tbrow)
- 105
- 106 self.cur.execute(create_tb_sql)
- 107logging.info('created table {} and {}'.format(self.tbrow,self.tbtran))
- 108
- 109 def rowrecord(self):
- 110 #处理每一行binlog
- 111 #事务的结束采用 'Xid =' 来划分
- 112 #分析结果,按照一个事务为单位存储提交一次到db
- 113 try:
- 114tran_num=1#事务数
- 115record_sql='' #行记录的insert sql
- 116tran_sql='' #事务的insert sql
- 117
- 118 self.create_tab()
- 119
- 120with open(self.fpath,'r') as binlog_file:
- 121logging.info('begining to analyze the binlog file ,this may be take a long time !!!')
- 122logging.info('analyzing...')
- 123
- 124 forblinein binlog_file:
- 125
- 126 ifbline.find('Table_map:') != -1:
- 127l = bline.index('server')
- 128n = bline.index('Table_map')
- 129begin_time = bline[:l:].rstrip(' ').replace('#','20')
- 130
- 131 ifrecord_sql=='':
- 132self.begin_time = begin_time[0:4] +'-'+ begin_time[4:6] +'-'+ begin_time[6:]
- 133
- 134self.db_name = bline[n::].split(' ')[1].replace('`','').split('.')[0]
- 135self.tb_name = bline[n::].split(' ')[1].replace('`','').split('.')[1]
- 136bline=''
- 137
- 138 elifbline.startswith('### INSERT INTO'):
- 139record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (1,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
- 140
- 141 elifbline.startswith('### UPDATE'):
- 142record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (2,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
- 143
- 144 elifbline.startswith('### DELETE FROM'):
- 145record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (3,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
- 146
- 147 elifbline.find('Xid =') != -1:
- 148
- 149l = bline.index('server')
- 150end_time = bline[:l:].rstrip(' ').replace('#','20')
- 151self.end_time = end_time[0:4] +'-'+ end_time[4:6] +'-'+ end_time[6:]
- 152tran_sql=record_sql+"insert into {}(begin_time,end_time) VALUES ('{}','{}')".format(self.tbtran,self.begin_time,self.end_time)
- 153
- 154 self.cur.execute(tran_sql)
- 155 self.mysqlconn.commit()
- 156record_sql =''
- 157tran_num += 1158
- 159 except Exception:
- 160 return 'funtion rowrecord error'
- 161
- 162 def binlogdesc(self):
- 163sql=''
- 164t_num=0
- 165r_num=0
- 166logging.info('Analysed result printing...\n')
- 167 #分析总的事务数跟行修改数量
- 168sql="select 'tbtran' name,count(*) nums from {} union all select 'tbrow' name,count(*) nums from {};".format(self.tbtran,self.tbrow)
- 169 self.cur.execute(sql)
- 170rows=self.cur.fetchall()
- 171 forrowin rows:
- 172 ifrow['name']=='tbtran':
- 173t_num = row['nums']
- 174 else:
- 175r_num = row['nums']
- 176 print('This binlog file has {} transactions, {} rows are changed '.format(t_num,r_num))
- 177
- 178 # 计算 最耗时 的单个事务
- 179 # 分析每个事务的耗时情况,分为5个时间段来描述
- 180 # 这里正常应该是 以毫秒来分析的,但是binlog中,只精确时间到second
- 181sql='''select
- 182 count(case when cost_sec between 0 and 1 then 1 end ) cos_1,
- 183 count(case when cost_sec between 1.1 and 5 then 1 end ) cos_5,
- 184 count(case when cost_sec between 5.1 and 10 then 1 end ) cos_10,
- 185 count(case when cost_sec between 10.1 and 30 then 1 end ) cos_30,
- 186 count(case when cost_sec >30.1 then 1 end ) cos_more,
- 187 max(cost_sec) cos_max
- 188 from
- 189 (
- 190 select
- 191 auto_id,timestampdiff(second,begin_time,end_time) cost_sec
- 192 from {}
- 193 ) a;'''.format(self.tbtran)
- 194 self.cur.execute(sql)
- 195rows=self.cur.fetchall()
- 196
- 197 forrowin rows:
- 198 print('The most cost time : {} '.format(row['cos_max']))
- 199 print('The distribution map of each transaction costed time: ')
- 200 print('Cost time between 0 and 1 second : {} , {}%'.format(row['cos_1'],int(row['cos_1']*100/t_num)))
- 201 print('Cost time between 1.1 and 5 second : {} , {}%'.format(row['cos_5'], int(row['cos_5'] * 100 / t_num)))
- 202 print('Cost time between 5.1 and 10 second : {} , {}%'.format(row['cos_10'], int(row['cos_10'] * 100 / t_num)))
- 203 print('Cost time between 10.1 and 30 second : {} , {}%'.format(row['cos_30'], int(row['cos_30'] * 100 / t_num)))
- 204 print('Cost time > 30.1 : {} , {}%\n'.format(row['cos_more'], int(row['cos_more'] * 100 / t_num)))
- 205
- 206 # 计算 单个事务影响行数最多 的行数量
- 207 # 分析每个事务 影响行数 情况,分为5个梯度来描述
- 208sql='''select
- 209 count(case when nums between 0 and 10 then 1 end ) row_1,
- 210 count(case when nums between 11 and 100 then 1 end ) row_2,
- 211 count(case when nums between 101 and 1000 then 1 end ) row_3,
- 212 count(case when nums between 1001 and 10000 then 1 end ) row_4,
- 213 count(case when nums >10001 then 1 end ) row_5,
- 214 max(nums) row_max
- 215 from
- 216 (
- 217 select
- 218 count(*) nums
- 219 from {} group by tran_num
- 220 ) a;'''.format(self.tbrow)
- 221 self.cur.execute(sql)
- 222rows=self.cur.fetchall()
- 223
- 224 forrowin rows:
- 225 print('The most changed rows for each row: {} '.format(row['row_max']))
- 226 print('The distribution map of each transaction changed rows : ')
- 227 print('Changed rows between 1 and 10 second : {} , {}%'.format(row['row_1'],int(row['row_1']*100/t_num)))
- 228 print('Changed rows between 11 and 100 second : {} , {}%'.format(row['row_2'], int(row['row_2'] * 100 / t_num)))
- 229 print('Changed rows between 101 and 1000 second : {} , {}%'.format(row['row_3'], int(row['row_3'] * 100 / t_num)))
- 230 print('Changed rows between 1001 and 10000 second : {} , {}%'.format(row['row_4'], int(row['row_4'] * 100 / t_num)))
- 231 print('Changed rows > 10001 : {} , {}%\n'.format(row['row_5'], int(row['row_5'] * 100 / t_num)))
- 232
- 233 # 分析 各个行数 DML的类型情况
- 234 # 描述 delete,insert,update的分布情况
- 235sql='select sqltype ,count(*) nums from {} group by sqltype ;'.format(self.tbrow)
- 236 self.cur.execute(sql)
- 237rows=self.cur.fetchall()
- 238
- 239 print('The distribution map of the {} changed rows : '.format(r_num))
- 240 forrowin rows:
- 241
- 242 ifrow['sqltype']==1:
- 243 print('INSERT rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))
- 244 ifrow['sqltype']==2:
- 245 print('UPDATE rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))
- 246 ifrow['sqltype']==3:
- 247 print('DELETE rows :{} , {}%\n '.format(row['nums'],int(row['nums']*100/r_num)))
- 248
- 249 # 描述 影响行数 最多的表格
- 250 # 可以分析是哪些表格频繁操作,这里显示前10个table name
- 251sql ='''select
- 252 dbname,tbname ,
- 253 count(*) ALL_rows,
- 254 count(*)*100/{} per,
- 255 count(case when sqltype=1 then 1 end) INSERT_rows,
- 256 count(case when sqltype=2 then 1 end) UPDATE_rows,
- 257 count(case when sqltype=3 then 1 end) DELETE_rows
- 258 from {}
- 259 group by dbname,tbname
- 260 order by ALL_rows desc
- 261 limit 10;'''.format(r_num,self.tbrow)
- 262 self.cur.execute(sql)
- 263rows = self.cur.fetchall()
- 264
- 265 print('The distribution map of the {} changed rows : '.format(r_num))
- 266 print('tablename'.ljust(50),
- 267 '|','changed_rows'.center(15),
- 268 '|','percent'.center(10),
- 269 '|','insert_rows'.center(18),
- 270 '|','update_rows'.center(18),
- 271 '|','delete_rows'.center(18)
- 272 )
- 273 print('-------------------------------------------------------------------------------------------------------------------------------------------------')
- 274 forrowin rows:
- 275 print((row['dbname']+'.'+row['tbname']).ljust(50),
- 276 '|',str(row['ALL_rows']).rjust(15),
- 277 '|',(str(int(row['per']))+'%').rjust(10),
- 278 '|',str(row['INSERT_rows']).rjust(10)+' , '+(str(int(row['INSERT_rows']*100/row['ALL_rows']))+'%').ljust(5),
- 279 '|',str(row['UPDATE_rows']).rjust(10)+' , '+(str(int(row['UPDATE_rows']*100/row['ALL_rows']))+'%').ljust(5),
- 280 '|',str(row['DELETE_rows']).rjust(10)+' , '+(str(int(row['DELETE_rows']*100/row['ALL_rows']))+'%').ljust(5),
- 281 )
- 282 print('\n')
- 283
- 284logging.info('Finished to analyse the binlog file !!!')
- 285
- 286 def closeconn(self):
- 287 self.cur.close()
- 288logging.info('release db connections\n')
- 289
- 290 def main():
- 291p = queryanalyse()
- 292 p.rowrecord()
- 293 p.binlogdesc()
- 294 p.closeconn()
- 295
- 296 if __name__=="__main__":
- 297main()
来源: http://www.cnblogs.com/xinysu/p/6908722.html