开发者

Python解析MySQL Binlog日志分析情况

目录
  • python解析mysql Binlog日志分析情况
    • 1. 分析目的
    • 2. 代码逻辑
    • 3. 实战分析
  • 总结

    Python解析MySQL Binlog日志分析情况

    1. 分析目的

    Binlog 中记录了 MySQL 数据变动,经常用于时间点恢复、数据闪回、分析一些 “奇怪” 的问题。

    例如是否有大事务,哪张表涉及的更新最多?是否有一些事务没有及时提交,都可以通过分析 binlog 来得到答案。

    2. 代码逻辑

    收集数据的第一步就是要解析 binlog 文件,binlog 是由事件组成的,例如:GTID 事件、XID 事件、Table Map 事件、DML 事件,只要获得所有的事件,就可以分析到很多东西。

    每个事件都由两部分组成,事件头 - 事件体,事件头的存储格式是这样的。

    类型占用
    timestamp4 bytes
    type_code1 bytes
    server_id4 bytes
    event_length4 bytes
    next_position4 bytes
    flags2 bytes

    一共占 19 bytes 我们通过 header 可以知道事件类型,发生时间、事件长度、下一个事件开始位置。

    读取头信息后,我们就可以通过 next_position 跳到下一个事件开始的位置,读取事件头,如果遇到 Table_map 事件则表示要开启一个 DML 事务,那么 Table Map 事件中存储的是什么呢?

    Python解析MySQL Binlog日志分析情况

    从 Table Map 中可以获得 DML 要操作的数据库与表信息,这样我们就可以定位到 DML 操作的是哪张表,开启一个事务时,binlog 会先记录 Table_map 事件,涉及到多张表就会有多个 Table_map 事件,然后就是 DML 事件,最后是一个 XID 事件,表示事务提交。

    脚本通过解析 Query_event 获得事务的起点,解析 Table map 事件获得涉及的表,通过 XID 事件获得事务结束。

    3. 实战分析

    直接上代码吧~

    需要安装 pandas 模块,这个模块处理数据非常方便,如果没有使用过的朋友,建议去把玩下,用好了提升工作效率。

    # -*- coding: utf-8 -*-
    import sys
    import math
    import time
    import struct
    import argparse
    import pandas as pd
    from datetime import datetime
    
    binlog_quer_event_stern = 4
    binlog_event_fix_part = 13
    table_map_event_fix_length = 8
    BINLOG_FILE_HEADER = b'\xFE\x62\x69\x6E'
    binlog_event_header_len = 19
    
    
    class BinlogEvent:
        UNKNOWN_EVENT = 0
        START_EVENT_V3 = 1
        QUERY_EVENT = 2
        STOP_EVENT = 3
        ROTATE_EVENT = 4
        INTVAR_EVENT = 5
        LOAD_EVENT = 6
        SLAVE_EVENT = 7
        CREATE_FILE_EVENT = 8
        APPEND_block_EVENT = 9
        EXEC_LOAD_EVENT = 10
        DELETE_FILE_EVENT = 11
        NEW_LOAD_EVENT = 12
        RAND_EVENT = 13
        USER_VAR_EVENT = 14
        FORMAT_DESCRIPTION_EVENT = 15
        XID_EVENT = 16
        BEGIN_LOAD_QUERY_EVENT = 17
        EXECUTE_LOAD_QUERY_EVENT = 18
        TABLE_MAP_EVENT = 19
        PRE_GA_WRITE_ROWS_EVENT = 20
        PRE_GA_UPDATE_ROWS_EVENT = 21
        PRE_GAzpMxEiZd_DELETE_ROWS_EVENT = 22
        WRITE_ROWS_EVENT = 23
        UPDATE_ROWS_EVENT = 24
        DELETE_ROWS_EVENT = 25
        INCIDENT_EVENT = 26
        HEARTBEAT_LOG_EVENT = 27
        IGNORABLE_LOG_EVENT = 28
        ROWS_QUERY_LOG_EVENT = 29
        WRITE_ROWS_EVENT_V2 = 30
        UPDATE_ROWS_EVENT_V2 = 31
        DELETE_ROWS_EVENT_V2 = 32
        GTID_LOG_EVENT = 33
        ANONYMOUS_GTID_LOG_EVENT = 34
        PREVIOUS_GTIDS_LOG_EVENT = 35
    
    
    class BinlogEventGet(object):
        def __init__(self, binlog_path, www.devze.comoutfile_path):
            self.file_handle = open(binlog_path, 'rb')
            # 分析文件导出的位置
            self.outfile_path = outfile_path
    
        def __del__(self):
            self.file_handle.close()
    
        def read_table_map_event(self, event_length, next_position):
            """
            fix_part = 8
                table_id : 6bytes
                Reserved : 2bytes
            variable_part:
                database_name_length : 1bytes
                database_name : database_name_length bytes + 1
                table_name_length : 1bytes
                table_name : table_name_length bytes + 1
                cloums_count : 1bytes
                colums_type_array : one byte per column
                mmetadata_lenth : 1bytes
                metadata : .....(only available in the variable length field,varchar:2bytes,text、blob:1bytes,time、timestamp、datetime: 1bytes
                                blob、float、decimal : 1bytes, char、enum、binary、set: 2bytes(column type id :1bytes metadatea: 1bytes))
                bit_filed : 1bytes
                crc : 4bytes
                .........
            :return:
            """
            self.read_bytes(table_map_event_fix_length)
            database_name_length, = struct.unpack('B', self.read_bytes(1))
            database_name, _a, = struct.unpack('{}ss'.format(database_name_length),
                                               self.read_bytes(database_name_length + 1))
    
            table_name_length, = struct.unpack('B', self.read_bytes(1))
            table_name, _a, = struct.unpack('{}ss'.format(table_name_length), self.read_bytes(table_name_length + 1))
    
            self.file_handle.seek(next_position, 0)
            return database_name, table_name
    
        def read_bytes(self, count):
            """
            读取固定 bytes 的数据
            :param count:
            :return:
            """
            return self.file_handle.read(count)
    
        def main(self):
            if not self.read_bytes(4) == BINLOG_FILE_HEADER:
                print("Error: Is not a standard binlog file format.")
                sys.exit(0)
    
            # 事务记录字典
            temp_transaction_dict = {
                'id': None,
                'db_name': None,
                'ld_table_name': None,
                'table_set': set(),
                'start_time': None,
                'end_time': None,
                'diff_second': None,
                'event_type': set(),
                'start_position': None,
                'end_position': None
            }
    
            tem_id = 0
            df = list()
            start_position, end_position = None, None
            print('loading.....')
            while True:
                type_code, event_length, timestamp, next_position = self.read_header()
    
                # 终止循环判断
                if type_code is None:
                    break
    
                # 事务信息收集逻辑判断
                if type_code == BinlogEvent.QUERY_EVENT:
                    thread_id, db_name, info = self.read_query_event(event_length)
    
                    if info == 'BEGIN':
                        temp_transaction_dict['start_position'] = next_position - event_length
                        temp_transaction_dict['start_time'] = timestamp
                        temp_transaction_dict['db_name'] = db_name
                        # print('Time:', timestamp, 'DB:', db_name, 'SQL:', info)
                    self.file_handle.seek(next_position, 0)
    
                elif type_code == BinlogEvent.TABLE_MAP_EVENT:
                    with_database, with_table = self.read_table_map_event(event_length, next_position)
                    # 只记录最开始的一张表
                    if temp_transaction_dict['ld_table_name'] is None:
                        temp_transaction_dict['ld_table_name'] = str(with_table.decode())
                    # 一个事务涉及的所有表集合
                    temp_transaction_dict['table_set'].add(str(with_table.decode()))
    
                elif type_code in (BinlogEvent.WRITE_ROWS_EVENT, BinlogEvent.WRITE_ROWS_EVENT_V2):
                    # print('INSERT:', type_code, event_length, timestamp, next_position)
                    temp_transaction_dict['event_type'].add('INSERT')
                    self.file_handle.seek(event_length - binlog_event_header_len, 1)
    
                elif type_code in (BinlogEvent.UPDATE_ROWS_EVENT, BinlogEvent.UPDATE_ROWS_EVENT_V2):
                    # print('UPDATE:', type_code, event_length, timestamp, next_position)
                    temp_transaction_dict['event_type'].add('UPDATE')
                    self.file_handle.seek(event_length - binlog_event_header_len, 1)
    
                elif type_code in (BinlogEvent.DELETE_ROWS_EVENT, BinlogEvent.DELETE_ROWS_EVENT_V2):
                    # print('DELETE:', type_code, event_length, timestamp, next_position)
                    temp_transaction_dict['event_type'].add('DELETE')
                    self.file_handle.seek(event_length - binlog_event_header_len, 1)
    
                elif type_code == BinlogEvent.XID_EVENT:
                    # 补充事务结束信息
                    temp_transaction_dict['id'] = tem_id
                    temp_transaction_dict['end_time'] = timestamp
                    temp_transaction_dict['end_position'] = next_position
    
                    _start = datetime.strptime(temp_transaction_dict['start_time'], '%Y-%m-%d %H:%M:%S')
                    _end = datetime.strptime(temp_transaction_dict['end_time'], '%Y-%m-%d %H:%M:%S')
    
                    temp_transaction_dict['diff_second'] = (_end - _start).seconds
    
                    df.append(temp_transaction_dict)
    
                    # print(temp_transaction_dict)
                    # 收尾
                    temp_transaction_dict = {
                        'id': None,
                        'db_name': None,
                        'ld_table_name': None,
                        'table_set': set(),
                        'start_time': None,
                        'end_time': None,
                        'diff_second': None,
                        'event_type': set(),
                        'start_position': None,
                        'end_position': None
                    }
                    self.fizpMxEiZdle_handle.seek(event_length - binlog_event_header_len, 1)
                    tem_id += 1
                else:
                    # 如果读取的是一个 header 事件,直接跳过即可。
                    self.file_handle.seek(event_length - binlog_event_header_len, 1)
    
            outfile = pd.DataFrame(df)
            outfile['transaction_size_bytes'] = (outfile['end_position'] - outfile['start_position'])
            outfile["transaction_size"] = outfile["transaction_size_bytes"].map(lambda x: self.bit_conversion(x))
            outfile.to_csv(self.outfile_path, encoding='utf_8_sig')
            print('File Export directory: {}'.format(self.outfile_path))
            print(android'complete ok!')
    
        def read_header(self):
            """
            binlog_event_header_len = 19
            timestamp : 4bytes
            type_code : 1bytes
            server_id : 4bytes
            event_length : 4bytes
            next_position : 4bytes
            flags : 2bytes
            """
            read_byte = self.read_bytes(binlog_event_header_len)
    
            if read_byte:
                result = struct.unpack('=IBIIIH', read_byte)
                type_code, event_length, timestamp, next_position = result[1], result[3], result[0], result[4]
    
                return type_code, event_length, time.strftime('%Y-%m-%d %H:%M:%S',
                                                              time.localtime(
                                                                  timestamp)), next_position
            else:
                return None, None, None, None
    
        def read_query_event(self, event_length=None):
            """
            fix_part = 13:
                    thread_id : 4bytes
                    execute_seconds : 4bytes
                    database_length : 1bytes
                    error_code : 2bytes
                    variable_block_length : 2bytes
                variable_part :
                    variable_block_length = fix_part.variable_block_length
                    database_name = fix_part.database_length
                    sql_statement = event_header.event_length - 19 - 13 - variable_block_length - database_length - 4
            """
            read_byte = self.read_bytes(binlog_event_fix_part)
            fix_result = struct.unpack('=IIBHH', read_byte)
            thread_id = fix_result[0]
            self.read_bytes(fix_result[4])
            read_byte = self.read_bytes(fix_result[2])
            database_name, = struct.unpack('{}s'.format(fix_result[2]), read_byte)
            statement_length = event_length - binlog_event_fix_part - binlog_event_header_len \
                               - fix_result[4] - fix_result[2] - binlog_quer_event_stern
            read_byte = self.read_bytes(statement_length)
            _a, sql_statement, = struct.unpack('1s{}s'.format(statement_length - 1), read_byte)
            return thread_id, database_name.decode(), sql_statement.decode()
    
        @staticmethod
        def bit_conversion(size, dot=2):
            size = float(size)
            if 0 <= size < 1:
                human_size = str(round(size / 0.125, dot)) + ' b'
            elif 1 <= size < 1024:
                human_si编程客栈ze = str(round(size, dot)) + ' B'
            elif math.pow(1024, 1) <= size < math.pow(1024, 2):
                human_size = str(round(size / math.pow(1024, 1), dot)) + ' KB'
            elif math.pow(1024, 2) <= size < math.pow(1024, 3):
                human_size = str(round(size / math.pow(1024, 2), dot)) + ' MB'
            elif math.pow(1024, 3) <= size < math.pow(1024, 4):
                human_size = str(round(size / math.pow(1024, 3), dot)) + ' GB'
            elif math.pow(1024, 4) <= size < math.pow(1024, 5):
                human_size = str(round(size / math.pow(1024, 4), dot)) + ' TB'
            elif math.pow(1024, 5) <= size < math.pow(1024, 6):
                human_size = str(round(size / math.pow(1024, 5), dot)) + ' PB'
            elif math.pow(1024, 6) <= size < math.pow(1024, 7):
                human_size = str(round(size / math.pow(1024, 6), dot)) + ' EB'
            elif math.pow(1024, 7) <= size < math.pow(1024, 8):
                human_size = str(round(size / math.pow(1024, 7), dot)) + ' ZB'
            elif math.pow(1024, 8) <= size < math.pow(1024, 9):
                human_size = str(round(size / math.pow(1024, 8), dot)) + ' YB'
            elif math.pow(1024, 9) <= size < math.pow(1024, 10):
                human_size = str(round(size / math.pow(1024, 9), dot)) + ' BB'
            elif math.pow(1024, 10) <= size < math.pow(1024, 11):
                human_size = str(round(size / math.pow(1024, 10), dot)) + ' NB'
            elif math.pow(1024, 11) <= size < math.pow(1024, 12):
                human_size = str(round(size / math.pow(1024, 11), dot)) + ' DB'
            elif math.pow(1024, 12) <= size:
                human_size = str(round(size / math.pow(1024, 12), dot)) + ' CB'
            else:
                raise ValueError('bit_conversion Error')
            return human_size
    
    
    if __name__ == '__main__':
        parser = argparse.ArgumentParser(description='A piece of binlog analysis code.')
        parser.add_argument('--binlog', type=str, help='Binlog file path.', default=None)
        parser.add_argument('--outfile', type=str, help='Analyze the file export directory.', default=None)
        args = parser.parse_args()
    
        if not args.binlog or not args.outfile:
            parser.print_help()
            sys.exit(0)
    
        binlog_show = BinlogEventGet(args.binlog, args.outfile)
        binlog_show.main()
    ➜  Desktop python3 BinlogShow.py --help
    usage: BinlogShow.py [-h] [--binlog BINLOG] [--outfile OUTFILE]
    
    A piece of binlog analysis code.
    
    optional arguments:
      -h, --help         show this help message and exit
      --binlog BINLOG    Binlog file path.
      --outfile OUTFILE  Analyze the file export directory.
    ➜  Desktop

    指定 binlog 文件目录和导出分析文件目录即可。

    ➜  Desktop python3 BinlogShow.py --binlog=/Users/cooh/Desktop/mysql-bin.009549 --outfile=/Users/cooh/Desktop/binlogshow.csv
    loading.....
    File Export directory: /Users/cooh/Desktop/binlogshow.csv
    complete ok!

    运行完成后就会得到程序解析后的信息,我们根据这份文件,写一些分析代码即可。

    Python解析MySQL Binlog日志分析情况

    总结

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新数据库

    数据库排行榜