- 1 # -*- coding: utf-8 -*-
- 2 __author__='xinysu'
- 3 __date__='2017/6/15 10:30'
- 4
- 5
- 6
- 7 import re
- 8 import os
- 9 import sys
- 10 import datetime
- 11 import time
- 12 import logging
- 13 import importlib
- 14 importlib.reload(logging)
- 15logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ')
- 16
- 17 import pymysql
- 18 frompymysql.cursorsimport DictCursor
- 19
- 20usage='''\nusage: python [script's path] [option]
- 21 ALL options need to assign:
- 22 \033[1;33;40m
- 23 -h : host, the database host,which database will store the results after analysis
- 24 -u : user, the db user
- 25 -p : password, the db user's password
- 26 -P : port, the db port
- 27
- 28 -f : file path, the binlog file
- 29 -t : table name, the table name to store the results after analysis , {dbname}.{tbname},
- 30 when you want to store in `test` db and the table name is `tbevent`,then this parameter
- 31 is test.tbevent
- 32 \033[1;34;40m
- 33 -oh : online host, the database host,which database have the online table schema
- 34 -ou : online user, the db user
- 35 -op : online password, the db user's password
- 36 -oP : online port, the db port
- 37 \033[1;32;40m
- 38 -a : action,
- 39 0 just analyse the binlog file ,and store sql in table;
- 40 1 after execute self.dotype=0, execute the undo_sql in the table
- 41 \033[0m
- 42 --help: help document
- 43 \033[1;35;40m
- 44 Example:
- 45 analysize binlog:
- 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent
- 47 -oh=192.168.9.244 -oP=3310 -u=root -op=***
- 48 -a=0
- 49
- 50 flash back:
- 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent
- 52 -oh=192.168.9.244 -oP=3310 -u=root -op=***
- 53 -a=1
- 54 \033[0m
- 55 '''
- 56
- 57 class flashback:
- 58 def __init__(self):
- 59self.host=''
- 60self.user=''
- 61self.password=''
- 62self.port='3306'
- 63self.fpath=''
- 64self.tbevent=''
- 65
- 66self.on_host=''
- 67self.on_user=''
- 68self.on_password=''
- 69self.on_port='3306'
- 70
- 71self.action=0# 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table
- 72
- 73self._get_db()# 从输入参数获取连接数据库的相关参数值
- 74
- 75 # 连接数据库,该数据库是用来存储binlog文件分析后的内容
- 76logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent))
- 77self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8')
- 78self.cur = self.mysqlconn.cursor(cursor=DictCursor)
- 79logging.info('MySQL which userd to store binlog event connection is ok')
- 80
- 81 # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致
- 82 # 该数据库用于提供 binlog file 文件中涉及到表结构分析
- 83logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port))
- 84self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8')
- 85self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor)
- 86logging.info('MySQL which userd to analyse online table schema connection is ok')
- 87
- 88logging.info('\033[33mMySQL connection is ok\033[0m')
- 89
- 90self.dml_sql=''
- 91self.undo_sql=''
- 92
- 93self.tbfield_where = []
- 94self.tbfield_set = []
- 95
- 96self.begin_time=''
- 97self.db_name=''
- 98self.tb_name=''
- 99self.end_time=''
- 100self.end_pos=''
- 101self.sqltype=0
- 102
- 103 #_get_db用于获取执行命令的输入参数
- 104 def _get_db(self):
- 105logging.info('begin to assign values to parameters')
- 106 iflen(sys.argv) == 1:
- 107 print(usage)
- 108sys.exit(1)
- 109 elifsys.argv[1] =='--help':
- 110 print(usage)
- 111 sys.exit()
- 112 eliflen(sys.argv) > 2:
- 113 foriinsys.argv[1:]:
- 114_argv = i.split('=')
- 115 if_argv[0] =='-h':
- 116self.host = _argv[1]
- 117 elif_argv[0] =='-u':
- 118self.user = _argv[1]
- 119 elif_argv[0] =='-P':
- 120self.port = int(_argv[1])
- 121 elif_argv[0] =='-f':
- 122self.fpath = _argv[1]
- 123 elif_argv[0] =='-t':
- 124self.tbevent = _argv[1]
- 125 elif_argv[0] =='-p':
- 126self.password = _argv[1]
- 127
- 128 elif_argv[0] =='-oh':
- 129self.on_host = _argv[1]
- 130 elif_argv[0] =='-ou':
- 131self.on_user = _argv[1]
- 132 elif_argv[0] =='-oP':
- 133self.on_port = int(_argv[1])
- 134 elif_argv[0] =='-op':
- 135self.on_password = _argv[1]
- 136
- 137 elif_argv[0] =='-a':
- 138self.action = _argv[1]
- 139
- 140 else:
- 141 print(usage)
- 142
- 143 #创建表格,用于存储分析后的BINLOG内容
- 144 def create_tab(self):
- 145logging.info('creating table {} to store binlog event'.format(self.tbevent))
- 146create_tb_sql ='''
- 147 CREATE TABLE IF NOT EXISTS {}(
- 148 auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,
- 149 binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',
- 150 dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',
- 151 dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',
- 152 end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',
- 153 db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',
- 154 table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',
- 155 sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',
- 156 dml_sql LONGTEXT NULL COMMENT 'what sql excuted',
- 157 undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',
- 158 PRIMARY KEY (auto_id),
- 159 INDEX sqltype(sqltype),
- 160 INDEX dml_start_time (dml_start_time),
- 161 INDEX dml_end_time (dml_end_time),
- 162 INDEX end_log_pos (end_log_pos),
- 163 INDEX db_name (db_name),
- 164 INDEX table_name (table_name)
- 165 )
- 166 COLLATE='utf8_general_ci' ENGINE=InnoDB;
- 167 TRUNCATE TABLE {};
- 168
- 169 '''.format(self.tbevent,self.tbevent)
- 170 self.cur.execute(create_tb_sql)
- 171logging.info('created table {} '.format(self.tbevent))
- 172
- 173 #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and
- 174 def tbschema(self,dbname,tbname):
- 175self.tbfield_where = []
- 176self.tbfield_set = []
- 177
- 178sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)
- 179
- 180 self.on_cur.execute(sql_tb)
- 181tbcol=self.on_cur.fetchall()
- 182
- 183i = 0
- 184 forlin tbcol:
- 185 #self.tbfield.append(l['Field'])
- 186 ifi==0:
- 187self.tbfield_where.append('`'+l['Field']+'`')
- 188self.tbfield_set.append('`'+l['Field']+'`')
- 189i+=1190 else:
- 191self.tbfield_where.append('/*where*/ and /*where*/'+'`'+l['Field']+'`')
- 192self.tbfield_set.append('/*set*/ , /*set*/'+'`'+l['Field']+'`' )
- 193
- 194 # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录
- 195 def rowrecord(self,bl_line):
- 196 try:
- 197 ifbl_line.find('Table_map:') != -1:
- 198l = bl_line.index('server')
- 199m = bl_line.index('end_log_pos')
- 200n = bl_line.index('Table_map')
- 201begin_time = bl_line[:l:].rstrip(' ').replace('#','20')
- 202
- 203self.begin_time = begin_time[0:4] +'-'+ begin_time[4:6] +'-'+ begin_time[6:]
- 204self.db_name = bl_line[n::].split(' ')[1].replace('`','').split('.')[0]
- 205self.tb_name = bl_line[n::].split(' ')[1].replace('`','').split('.')[1]
- 206
- 207 self.tbschema(self.db_name,self.tb_name)
- 208 except Exception:
- 209 return 'funtion rowrecord error'
- 210
- 211 def dml_tran(self,bl_line):
- 212 try:
- 213
- 214
- 215 ifbl_line.find('Xid =') != -1:
- 216
- 217l = bl_line.index('server')
- 218m = bl_line.index('end_log_pos')
- 219end_time = bl_line[:l:].rstrip(' ').replace('#','20')
- 220self.end_time = end_time[0:4] +'-'+ end_time[4:6] +'-'+ end_time[6:]
- 221self.end_pos = int(bl_line[m::].split(' ')[1])
- 222
- 223
- 224
- 225self.undo_sql = self.dml_sql.replace(' INSERT INTO',';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM',';INSERT INTO').replace(';DELETE FROM_su',';DELETE FROM').replace('WHERE','WHERE_marksu').replace('SET','WHERE').replace('WHERE_marksu','SET').replace('/*set*/ , /*set*/',' and ').replace('/*where*/ and /*where*/',' , ')
- 226self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/',' , ').replace('/*where*/ and /*where*/',' and ')
- 227
- 228 ifself.dml_sql.startswith(' INSERT INTO '):
- 229self.sqltype=1230 elifself.dml_sql.startswith(' UPDATE '):
- 231self.sqltype=2232 elifself.dml_sql.startswith(' DELETE '):
- 233self.sqltype=3234
- 235record_sql =''
- 236undosql_desc =''
- 237
- 238 #同个事务内部的行记录修改SQL,反序存储
- 239 forlin self.undo_sql.splitlines():
- 240 ifl.startswith(' ;UPDATE')orl.startswith(' ;INSERT')orl.startswith(' ;DELETE'):
- 241undosql_desc = record_sql + undosql_desc
- 242record_sql =''
- 243record_sql = record_sql + l
- 244 else:
- 245record_sql = record_sql + l
- 246
- 247self.undo_sql = record_sql + undosql_desc
- 248self.undo_sql = self.undo_sql.lstrip()[1:]+';'
- 249
- 250 #处理非空格的空白特殊字符
- 251self.dml_sql = self.esc_code(self.dml_sql)
- 252self.undo_sql = self.esc_code(self.undo_sql)
- 253
- 254 #单独处理 转移字符: \'
- 255self.dml_sql = self.dml_sql.replace("'","''").replace('\\x27',"''''")# + ';'
- 256self.undo_sql = self.undo_sql.replace("'","''").replace('\\x27',"''''")# + ';'
- 257
- 258 iflen(self.dml_sql)>500000000:
- 259with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql','w') as w_f:
- 260w_f.write('begin;'+'\n')
- 261 w_f.write(self.undo_sql)
- 262w_f.write('commit;'+'\n')
- 263self.dml_sql=''
- 264self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'
- 265logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))
- 266
- 267insert_sql ="INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(
- 268 self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,
- 269 self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)
- 270
- 271 self.cur.execute(insert_sql)
- 272 self.mysqlconn.commit()
- 273
- 274self.dml_sql =''
- 275self.undo_sql =''
- 276 except Exception:
- 277 print('funtion dml_tran error')
- 278
- 279
- 280 def analyse_binlog(self):
- 281 try:
- 282sqlcomma=0
- 283 self.create_tab()
- 284
- 285with open(self.fpath,'r') as binlog_file:
- 286logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')
- 287logging.info('\033[36manalyzing...\033[0m')
- 288 forblinein binlog_file:
- 289 ifbline.find('Table_map:') != -1:
- 290 self.rowrecord(bline)
- 291bline=''
- 292 elifbline.rstrip()=='### SET':
- 293bline = bline[3:]
- 294sqlcomma=1295 elifbline.rstrip()=='### WHERE':
- 296bline = bline[3:]
- 297sqlcomma = 2298 elifbline.startswith('### @'):
- 299len_f=len('### @')
- 300i=bline[len_f:].split('=')[0]
- 301
- 302 #处理timestamp类型
- 303 ifbline[8+len(i):].split(' ')[2] =='TIMESTAMP(0)':
- 304stop_pos = bline.find(' /* TIMESTAMP(0) meta=')
- 305bline = bline.split('=')[0] +'=from_unixtime('+ bline[:stop_pos].split('=')[1] +')'
- 306
- 307 #处理负数存储方式
- 308 ifbline.split('=')[1].startswith('-'):
- 309stop_pos = bline.find(' /* TIMESTAMP(0) meta=')
- 310bline = bline.split('=')[0] +'='+ bline.split('=')[1].split(' ')[0]+'\n'
- 311
- 312 ifsqlcomma==1:
- 313bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]
- 314 elifsqlcomma==2:
- 315bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]
- 316
- 317 elifbline.startswith('### DELETE')orbline.startswith('### INSERT')orbline.startswith('### UPDATE'):
- 318bline = bline[3:]
- 319
- 320 elifbline.find('Xid =') != -1:
- 321 self.dml_tran(bline)
- 322bline=''
- 323 else:
- 324bline =''
- 325
- 326 ifbline.rstrip('\n') !='':
- 327self.dml_sql = self.dml_sql + bline +' '
- 328 except Exception:
- 329 return 'function do error'
- 330
- 331 def esc_code(self,sql):
- 332esc={
- 333 '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',
- 334 #'\\x27':'\'',
- 335 '\\x22':'\"','\\x3f':'\?','\\x00':'\0'
- 336 }
- 337
- 338 fork,vin esc.items():
- 339sql=sql.replace(k,v)
- 340 return sql
- 341
- 342 def binlogdesc(self):
- 343
- 344countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)
- 345 print(countsql)
- 346 self.cur.execute(countsql)
- 347count_row=self.cur.fetchall()
- 348
- 349update_count=0
- 350insert_couont=0
- 351delete_count=0
- 352 forrowin count_row:
- 353 ifrow['sqltype']==1:
- 354insert_couont=row['numbers']
- 355 elifrow['sqltype']==2:
- 356update_count=row['numbers']
- 357 elifrow['sqltype']==3:
- 358delete_count=row['numbers']
- 359logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))
- 360
- 361 def undosql(self,number):
- 362 #这里会有几个问题:
- 363 #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;
- 364 #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;
- 365 #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理
- 366
- 367tran_num=1368id=0
- 369
- 370tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)
- 371
- 372 self.cur.execute(tran_num_sql)
- 373tran_rows=self.cur.fetchall()
- 374
- 375 fornumin tran_rows:
- 376tran_num=num['table_rows']
- 377
- 378logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))
- 379
- 380 whileid<=tran_num:
- 381logging.info('doing batch : {} '.format(int(id/number)+1))
- 382undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)
- 383 self.cur.execute(undo_sql)
- 384
- 385undo_rows=self.cur.fetchall()
- 386f_sql=''
- 387
- 388 foru_rowin undo_rows:
- 389 try:
- 390self.on_cur.execute(u_row['undo_sql'])
- 391 self.on_mysqlconn.commit()
- 392 except Exception:
- 393 print('auto_id:',u_row['auto_id'])
- 394id+=number
- 395
- 396
- 397 def undo_file(self,number):
- 398 # 也可以选择私用undo_file将undo_sql导入到文件中,然后再source
- 399
- 400tran_num=1401id=0
- 402
- 403tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)
- 404
- 405 self.cur.execute(tran_num_sql)
- 406tran_rows=self.cur.fetchall()
- 407
- 408 fornumin tran_rows:
- 409tran_num=num['table_rows']
- 410
- 411logging.info('copy undo_sql to undo file on : /tmp/flashback_undosql/undo_file_flashback.sql')
- 412logging.info('\033[32mThere has {} transactions ,need {} batchs to copy ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))
- 413
- 414with open('/tmp/flashback_undosql/undo_file_flashback.sql','w') as w_f:
- 415 whileid<=tran_num:
- 416logging.info('doing batch : {} '.format(int(id/number)+1))
- 417undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)
- 418 self.cur.execute(undo_sql)
- 419
- 420undo_rows=self.cur.fetchall()
- 421 foru_rowin undo_rows:
- 422 try:
- 423w_f.write('begin;'+'\n')
- 424w_f.write('# auto_id'+str(u_row['auto_id']) +'\n')
- 425w_f.write(u_row['undo_sql'] +'\n')
- 426w_f.write('commit;'+'\n')
- 427 except Exception:
- 428 print('auto_id',u_row['auto_id'])
- 429 #time.sleep(2)
- 430id+=number
- 431
- 432 def do(self):
- 433 ifself.action=='0':
- 434 self.analyse_binlog()
- 435logging.info('\033[36mfinished to analyze the binlog file !!!\033[0m')
- 436 #self.binlogdesc()
- 437 elifself.action=='1':
- 438self.undosql(10000)
- 439
- 440 def closeconn(self):
- 441 self.cur.close()
- 442 self.on_cur.close()
- 443logging.info('release all db connections')
- 444logging.info('\033[33mAll done,check the {} which stored binlog event on host {} , port {} \033[0m'.format(self.tbevent,self.host,self.port))
- 445
- 446 def main():
- 447p = flashback()
- 448 p.do()
- 449 p.closeconn()
- 450
- 451 if __name__=="__main__":
- 452main()
来源: http://www.cnblogs.com/xinysu/p/7052923.html