From 99426896d87458992035165be7938757b92b4f43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Mon, 4 Dec 2023 12:32:52 +0100 Subject: [PATCH] This is an automated cherry-pick of #10205 Signed-off-by: ti-chi-bot --- dm/syncer/syncer.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index da23e77bd17..d4c6ea960a1 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -2049,6 +2049,42 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if err2 != nil { return err2 } +<<<<<<< HEAD +======= + case *replication.TransactionPayloadEvent: + for _, tpev := range ev.Events { + switch tpevt := tpev.Event.(type) { + case *replication.RowsEvent: + eventIndex++ + s.metricsProxies.Metrics.BinlogEventRowHistogram.Observe(float64(len(tpevt.Rows))) + ec.header.EventType = tpev.Header.EventType + sourceTable, err2 = s.handleRowsEvent(tpevt, ec) + if sourceTable != nil && err2 == nil && s.cfg.EnableGTID { + if _, ok := affectedSourceTables[sourceTable.Schema]; !ok { + affectedSourceTables[sourceTable.Schema] = make(map[string]struct{}) + } + affectedSourceTables[sourceTable.Schema][sourceTable.Name] = struct{}{} + } + case *replication.QueryEvent: + originSQL = strings.TrimSpace(string(tpevt.Query)) + err2 = s.ddlWorker.HandleQueryEvent(tpevt, ec, originSQL) + case *replication.XIDEvent: + eventType = "XID" + needContinue, err2 = funcCommit() + case *replication.TableMapEvent: + case *replication.FormatDescriptionEvent: + default: + s.tctx.L().Warn("unhandled event from transaction payload", zap.String("type", fmt.Sprintf("%T", tpevt))) + } + } + if needContinue { + continue + } + case *replication.TableMapEvent: + case *replication.FormatDescriptionEvent: + default: + s.tctx.L().Warn("unhandled event", zap.String("type", fmt.Sprintf("%T", ev))) +>>>>>>> 1fa2752c10 (syncer(dm): Improve logging of ignored TableMap/FormatDesc (#10205)) } if err2 != nil { if err := s.handleEventError(err2, startLocation, currentLocation, e.Header.EventType == replication.QUERY_EVENT, originSQL); err != nil {