@@ -14,7 +14,7 @@ import (
1414)
1515
1616var (
17- expCreateTable = regexp .MustCompile ("(?i)^CREATE\\ sTABLE(\\ sIF\\ sNOT\\ sEXISTS)?\\ s`{0,1}(.*?)`{0,1}\\ .{0,1}`{0,1}([^`\\ .]+?)`{0,1}\\ s.*" )
17+ expCreateTable = regexp .MustCompile ("(?i)^CREATE\\ sTABLE(\\ sIF\\ sNOT\\ sEXISTS)?\\ s`{0,1}(.*?)`{0,1}\\ .{0,1}`{0,1}([^`\\ .]+?)`{0,1}\\ s.*" )
1818 expAlterTable = regexp .MustCompile ("(?i)^ALTER\\ sTABLE\\ s.*?`{0,1}(.*?)`{0,1}\\ .{0,1}`{0,1}([^`\\ .]+?)`{0,1}\\ s.*" )
1919 expRenameTable = regexp .MustCompile ("(?i)^RENAME\\ sTABLE\\ s.*?`{0,1}(.*?)`{0,1}\\ .{0,1}`{0,1}([^`\\ .]+?)`{0,1}\\ s{1,}TO\\ s.*?" )
2020 expDropTable = regexp .MustCompile ("(?i)^DROP\\ sTABLE(\\ sIF\\ sEXISTS){0,1}\\ s`{0,1}(.*?)`{0,1}\\ .{0,1}`{0,1}([^`\\ .]+?)`{0,1}(?:$|\\ s)" )
@@ -115,9 +115,9 @@ func (c *Canal) runSyncBinlog() error {
115115 }
116116 case * replication.QueryEvent :
117117 var (
118- mb [][]byte
118+ mb [][]byte
119119 schema []byte
120- table []byte
120+ table []byte
121121 )
122122 regexps := []regexp.Regexp {* expCreateTable , * expAlterTable , * expRenameTable , * expDropTable }
123123 for _ , reg := range regexps {
@@ -143,10 +143,14 @@ func (c *Canal) runSyncBinlog() error {
143143 force = true
144144 c .ClearTableCache (schema , table )
145145 log .Infof ("table structure changed, clear table cache: %s.%s\n " , schema , table )
146- if err = c .eventHandler .OnDDL ( pos , e ); err != nil {
146+ if err = c .eventHandler .OnTableChanged ( string ( schema ), string ( table ) ); err != nil {
147147 return errors .Trace (err )
148148 }
149149
150+ // Now we only handle Table Changed DDL, maybe we will support more later.
151+ if err = c .eventHandler .OnDDL (pos , e ); err != nil {
152+ return errors .Trace (err )
153+ }
150154 default :
151155 continue
152156 }
0 commit comments