-
Notifications
You must be signed in to change notification settings - Fork 687
Open
Description
import base64
from datetime import date, datetime
import json
import traceback
import pymysql
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
TableMapEvent
)
def default(obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, date):
return obj.isoformat()
elif isinstance(obj, bytes):
return base64.b64encode(obj).decode('ascii')
raise TypeError(f"Object of type {obj.class.name} is not JSON serializable")
class BinlogListener:
def init(self, mysql_settings):
self.mysql_settings = mysql_settings
self.stream = None
self.table_map = {}
def start_stream(self):
events = [DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent]
self.stream = BinLogStreamReader(
connection_settings=self.mysql_settings,
server_id=101,
only_events=events,
resume_stream=True,
blocking=True,
)
def process_events(self):
if self.stream is None:
self.start_stream()
for binlogevent in self.stream:
try:
binlogevent.dump()
for row in binlogevent.rows:
event = {"schema": binlogevent.schema, "table": binlogevent.table}
print(event)
if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["data"] = row["values"]
elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["data"] = row["after_values"]
elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["data"] = row["values"]
print(json.dumps(event, default=default))
except Exception as e:
traceback.print_exc()
def stop_stream(self):
if self.stream is not None:
self.stream.close()
self.stream = None
laminko
Metadata
Metadata
Assignees
Labels
No labels