Skip to content

I can't retrieve the updated or written field names; all I get are things like UNKNOWN_COL0. #612

@xiaoyue9527

Description

@xiaoyue9527

import base64
from datetime import date, datetime
import json
import traceback
import pymysql
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
TableMapEvent
)

def default(obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, date):
return obj.isoformat()
elif isinstance(obj, bytes):
return base64.b64encode(obj).decode('ascii')
raise TypeError(f"Object of type {obj.class.name} is not JSON serializable")

class BinlogListener:
def init(self, mysql_settings):
self.mysql_settings = mysql_settings
self.stream = None
self.table_map = {}

def start_stream(self):
    events = [DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent]
    
    self.stream = BinLogStreamReader(
        connection_settings=self.mysql_settings,
        server_id=101,
        only_events=events,
        resume_stream=True,
        blocking=True,
    )

def process_events(self):
    if self.stream is None:
        self.start_stream()

    for binlogevent in self.stream:
        try:
            binlogevent.dump()  
            for row in binlogevent.rows:
                
                event = {"schema": binlogevent.schema, "table": binlogevent.table}
                print(event)
                if isinstance(binlogevent, DeleteRowsEvent):
                    event["action"] = "delete"
                    event["data"] = row["values"]

                elif isinstance(binlogevent, UpdateRowsEvent):
                    event["action"] = "update"
                    event["data"] = row["after_values"] 

                elif isinstance(binlogevent, WriteRowsEvent):
                    event["action"] = "insert"
                    event["data"] = row["values"]

                print(json.dumps(event, default=default))

        except Exception as e:
            traceback.print_exc()


def stop_stream(self):
    if self.stream is not None:
        self.stream.close()
        self.stream = None

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions