Skip to content

Commit

Permalink
This is an automated cherry-pick of #8723
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Apr 10, 2023
1 parent e221139 commit 1b92c10
Show file tree
Hide file tree
Showing 7 changed files with 933 additions and 0 deletions.
32 changes: 32 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,12 @@ type DDLEvent struct {
PreTableInfo *SimpleTableInfo `msg:"pre-table-info"`
Query string `msg:"query"`
Type model.ActionType `msg:"-"`
<<<<<<< HEAD
=======
Done bool `msg:"-"`
Charset string `msg:"-"`
Collate string `msg:"-"`
>>>>>>> 4ab802a50e (ddl(ticdc): add charset and collate to ddl event (#8723))
}

// RedoDDLEvent represents DDL event used in redo log persistent
Expand All @@ -465,7 +471,18 @@ func (d *DDLEvent) FromJob(job *model.Job, preTableInfo *TableInfo) {
d.CommitTs = job.BinlogInfo.FinishedTS
d.Query = job.Query
d.Type = job.Type
<<<<<<< HEAD
d.fillPreTableInfo(preTableInfo)
=======
d.PreTableInfo = preTableInfo
d.TableInfo = tableInfo

d.Charset = job.Charset
d.Collate = job.Collate
// rebuild the query if necessary
rebuildQuery()
}
>>>>>>> 4ab802a50e (ddl(ticdc): add charset and collate to ddl event (#8723))

switch d.Type {
case model.ActionRenameTables:
Expand All @@ -475,6 +492,7 @@ func (d *DDLEvent) FromJob(job *model.Job, preTableInfo *TableInfo) {
default:
}

<<<<<<< HEAD
// Fill TableInfo for the event.
if job.BinlogInfo.TableInfo != nil {
tableName := job.BinlogInfo.TableInfo.Name.O
Expand Down Expand Up @@ -505,6 +523,20 @@ func (d *DDLEvent) fillPreTableInfo(preTableInfo *TableInfo) {
d.PreTableInfo.ColumnInfo[i] = new(ColumnInfo)
d.PreTableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo)
}
=======
d.StartTs = job.StartTS
d.CommitTs = job.BinlogInfo.FinishedTS
oldTableName := preTableInfo.Name.O
newTableName := tableInfo.Name.O
d.Query = fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`",
oldSchemaName, oldTableName, newSchemaName, newTableName)
d.Type = model.ActionRenameTable
d.PreTableInfo = preTableInfo
d.TableInfo = tableInfo

d.Charset = job.Charset
d.Collate = job.Collate
>>>>>>> 4ab802a50e (ddl(ticdc): add charset and collate to ddl event (#8723))
}

// SingleTableTxn represents a transaction which includes many row events in a single table
Expand Down
65 changes: 65 additions & 0 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,23 @@ func (s *ddlSinkImpl) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent)
// the DDL event is executing and not finished yet, return false
return false, nil
}
<<<<<<< HEAD
=======

query, err := s.addSpecialComment(ddl)
if err != nil {
log.Error("Add special comment failed",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.Error(err),
zap.Any("ddl", ddl))
s.mu.Unlock()
return false, errors.Trace(err)
}
ddl.Query = query
s.mu.Unlock()

>>>>>>> 4ab802a50e (ddl(ticdc): add charset and collate to ddl event (#8723))
select {
case <-ctx.Done():
return false, errors.Trace(ctx.Err())
Expand Down Expand Up @@ -240,3 +257,51 @@ func (s *ddlSinkImpl) close(ctx context.Context) (err error) {
}
return nil
}
<<<<<<< HEAD
=======

func (s *ddlSinkImpl) isInitialized() bool {
return s.initialized.Load().(bool)
}

// addSpecialComment translate tidb feature to comment
func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) {
stms, _, err := parser.New().Parse(ddl.Query, ddl.Charset, ddl.Collate)
if err != nil {
return "", errors.Trace(err)
}
if len(stms) != 1 {
log.Panic("invalid ddlQuery statement size",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.String("ddlQuery", ddl.Query))
}
var sb strings.Builder
// translate TiDB feature to special comment
restoreFlags := format.RestoreTiDBSpecialComment
// escape the keyword
restoreFlags |= format.RestoreNameBackQuotes
// upper case keyword
restoreFlags |= format.RestoreKeyWordUppercase
// wrap string with single quote
restoreFlags |= format.RestoreStringSingleQuotes
// remove placement rule
restoreFlags |= format.SkipPlacementRuleForRestore
// force disable ttl
restoreFlags |= format.RestoreWithTTLEnableOff
if err = stms[0].Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil {
return "", errors.Trace(err)
}

result := sb.String()
log.Info("add special comment to DDL",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.String("DDL", ddl.Query),
zap.String("charset", ddl.Charset),
zap.String("collate", ddl.Collate),
zap.String("result", result))

return result, nil
}
>>>>>>> 4ab802a50e (ddl(ticdc): add charset and collate to ddl event (#8723))
Loading

0 comments on commit 1b92c10

Please sign in to comment.