Skip to content

Commit

Permalink
sinkv2(ticdc): remove duplicate DDL statistics (pingcap#8072) (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Mar 1, 2023
1 parent 08ed958 commit e4c2306
Showing 1 changed file with 19 additions and 22 deletions.
41 changes: 19 additions & 22 deletions cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,35 +152,32 @@ func (m *mysqlDDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error
start := time.Now()
log.Info("Start exec DDL", zap.Any("DDL", ddl), zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID))
err := m.statistics.RecordDDLExecution(func() error {
tx, err := m.db.BeginTx(ctx, nil)
if err != nil {
return err
}

if shouldSwitchDB {
_, err = tx.ExecContext(ctx, "USE "+quotes.QuoteName(ddl.TableInfo.TableName.Schema)+";")
if err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID), zap.Error(err))
}
return err
}
}
tx, err := m.db.BeginTx(ctx, nil)
if err != nil {
return err
}

if _, err = tx.ExecContext(ctx, ddl.Query); err != nil {
if shouldSwitchDB {
_, err = tx.ExecContext(ctx, "USE "+quotes.QuoteName(ddl.TableInfo.TableName.Schema)+";")
if err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("sql", ddl.Query),
zap.String("namespace", m.id.Namespace),
log.Error("Failed to rollback", zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID), zap.Error(err))
}
return err
}
}

return errors.Trace(tx.Commit())
})
if err != nil {
if _, err = tx.ExecContext(ctx, ddl.Query); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("sql", ddl.Query),
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID), zap.Error(err))
}
return err
}

if err = tx.Commit(); err != nil {
log.Error("Failed to exec DDL", zap.String("sql", ddl.Query),
zap.Duration("duration", time.Since(start)),
zap.String("namespace", m.id.Namespace),
Expand Down

0 comments on commit e4c2306

Please sign in to comment.