Skip to content

Commit d9ef65a

Browse files
authored
support filtering commands with RETRY field (#1011)
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
1 parent ad31059 commit d9ef65a

File tree

6 files changed

+114
-27
lines changed

6 files changed

+114
-27
lines changed

cmd/replayer/main.go

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func main() {
6060
serviceMode := rootCmd.PersistentFlags().Bool("service-mode", false, "run replayer in service mode")
6161
logLevel := rootCmd.PersistentFlags().String("log-level", "info", "the log level: debug, info, warn, error, dpanic, panic, fatal")
6262
startTime := rootCmd.PersistentFlags().Time("start-time", time.Now(), []string{time.RFC3339, time.RFC3339Nano}, "the time to start the replay. Format is RFC3339. Default is the current time.")
63+
filterCommandWithRetry := rootCmd.PersistentFlags().Bool("filter-command-with-retry", false, "filter out commands that are retries according to the audit log.")
6364

6465
rootCmd.RunE = func(cmd *cobra.Command, _ []string) error {
6566
// set up general managers
@@ -129,24 +130,25 @@ func main() {
129130
} else {
130131
// start replay
131132
replayCfg := replay.ReplayConfig{
132-
Input: *input,
133-
Speed: *speed,
134-
Username: *username,
135-
Password: *password,
136-
Format: *format,
137-
ReadOnly: *readonly,
138-
StartTime: *startTime,
139-
CommandStartTime: *cmdStartTime,
140-
CommandEndTime: *cmdEndTime,
141-
IgnoreErrs: *ignoreErrs,
142-
BufSize: *bufSize,
143-
PSCloseStrategy: replaycmd.PSCloseStrategy(*psCloseStrategy),
144-
DryRun: *dryRun,
145-
CheckPointFilePath: *checkPointFilePath,
146-
DynamicInput: *dynamicInput,
147-
ReplayerCount: *replayerCount,
148-
ReplayerIndex: *replayerIndex,
149-
OutputPath: *outputPath,
133+
Input: *input,
134+
Speed: *speed,
135+
Username: *username,
136+
Password: *password,
137+
Format: *format,
138+
ReadOnly: *readonly,
139+
StartTime: *startTime,
140+
CommandStartTime: *cmdStartTime,
141+
CommandEndTime: *cmdEndTime,
142+
IgnoreErrs: *ignoreErrs,
143+
BufSize: *bufSize,
144+
PSCloseStrategy: replaycmd.PSCloseStrategy(*psCloseStrategy),
145+
DryRun: *dryRun,
146+
CheckPointFilePath: *checkPointFilePath,
147+
DynamicInput: *dynamicInput,
148+
ReplayerCount: *replayerCount,
149+
ReplayerIndex: *replayerIndex,
150+
OutputPath: *outputPath,
151+
FilterCommandWithRetry: *filterCommandWithRetry,
150152
}
151153
if err := r.StartReplay(replayCfg); err != nil {
152154
cancel()

pkg/server/api/traffic.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ func (h *Server) TrafficReplay(c *gin.Context) {
156156
cfg.OutputPath = c.PostForm("outputpath")
157157
cfg.Addr = c.PostForm("addr")
158158
cfg.DryRun = strings.EqualFold(c.PostForm("dryrun"), "true")
159+
cfg.FilterCommandWithRetry = strings.EqualFold(c.PostForm("filtercommandwithretry"), "true")
159160
h.lg.Info("request: traffic replay", zap.Any("cfg", cfg))
160161

161162
if err := h.mgr.ReplayJobMgr.StartReplay(cfg); err != nil {

pkg/sqlreplay/cmd/audit_log_plugin.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const (
2929
auditPluginKeyEvent = "EVENT"
3030
auditPluginKeyCostTime = "COST_TIME"
3131
auditPluginKeyPreparedStmtID = "PREPARED_STMT_ID"
32+
auditPluginKeyRetry = "RETRY"
3233

3334
auditPluginClassGeneral = "GENERAL"
3435
auditPluginClassTableAccess = "TABLE_ACCESS"
@@ -101,11 +102,12 @@ type AuditLogPluginDecoder struct {
101102
commandStartTime time.Time
102103
commandEndTime time.Time
103104
// pendingCmds contains the commands that has not been returned yet.
104-
pendingCmds []*Command
105-
psCloseStrategy PSCloseStrategy
106-
idAllocator *ConnIDAllocator
107-
dedup *DeDup
108-
lg *zap.Logger
105+
pendingCmds []*Command
106+
psCloseStrategy PSCloseStrategy
107+
filterCommandWithRetry bool
108+
idAllocator *ConnIDAllocator
109+
dedup *DeDup
110+
lg *zap.Logger
109111
}
110112

111113
// ConnIDAllocator allocates connection IDs for new connections.
@@ -446,9 +448,18 @@ func (decoder *AuditLogPluginDecoder) parseGeneralEvent(kvs map[string]string, s
446448
if err != nil {
447449
return nil, errors.Wrapf(err, "unquote sql failed: %s", kvs[auditPluginKeySQL])
448450
}
449-
// deduplicate DML and SELECT FOR UPDATE
450-
if decoder.isDuplicatedWrite(connInfo.lastCmd, kvs, cmdStr, sql, startTs, endTs) {
451-
return nil, nil
451+
if decoder.filterCommandWithRetry {
452+
if retryStr, ok := kvs[auditPluginKeyRetry]; ok {
453+
if retryStr == "true" {
454+
// skip the retried command
455+
return nil, nil
456+
}
457+
}
458+
} else {
459+
// deduplicate DML and SELECT FOR UPDATE
460+
if decoder.isDuplicatedWrite(connInfo.lastCmd, kvs, cmdStr, sql, startTs, endTs) {
461+
return nil, nil
462+
}
452463
}
453464
}
454465

@@ -641,3 +652,8 @@ func (decoder *AuditLogPluginDecoder) isDuplicatedWrite(lastCmd *Command, kvs ma
641652
decoder.dedup.Unlock()
642653
return true
643654
}
655+
656+
// EnableFilterCommandWithRetry enables filtering out commands that are retries according to the audit log.
657+
func (decoder *AuditLogPluginDecoder) EnableFilterCommandWithRetry() {
658+
decoder.filterCommandWithRetry = true
659+
}

pkg/sqlreplay/cmd/audit_log_plugin_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,3 +1657,65 @@ func decodeCmds(decoder CmdDecoder, reader LineReader) ([]*Command, error) {
16571657
}
16581658
return cmds, err
16591659
}
1660+
1661+
func TestFilterOutCommandsWithRetry(t *testing.T) {
1662+
tests := []struct {
1663+
name string
1664+
lines string
1665+
enable bool
1666+
cmdCount int
1667+
}{
1668+
{
1669+
name: "filter out retried query command",
1670+
lines: `[2025/09/18 17:48:20.614 +08:10] [INFO] [logger.go:77] [ID=17581889006155] [TIMESTAMP=2025/09/18 17:48:20.614 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1000] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="SELECT * FROM test"] [ROWS=1] [CONNECTION_ID=1001] [CLIENT_PORT=50112] [PID=542193] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [RETRY=true] [EVENT=COMPLETED]`,
1671+
enable: true,
1672+
cmdCount: 0,
1673+
},
1674+
{
1675+
name: "do not filter when this function is disabled",
1676+
lines: `[2025/09/18 17:48:20.614 +08:10] [INFO] [logger.go:77] [ID=17581889006155] [TIMESTAMP=2025/09/18 17:48:20.614 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1000] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="SELECT * FROM test"] [ROWS=1] [CONNECTION_ID=1001] [CLIENT_PORT=50112] [PID=542193] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [RETRY=true] [EVENT=COMPLETED]`,
1677+
enable: false,
1678+
cmdCount: 1,
1679+
},
1680+
{
1681+
name: "do not filter non-retried commands",
1682+
lines: `[2025/09/18 17:48:20.614 +08:10] [INFO] [logger.go:77] [ID=17581889006155] [TIMESTAMP=2025/09/18 17:48:20.614 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1000] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="SELECT * FROM test"] [ROWS=1] [CONNECTION_ID=1001] [CLIENT_PORT=50112] [PID=542193] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED]`,
1683+
enable: true,
1684+
cmdCount: 1,
1685+
},
1686+
{
1687+
name: "mixed retried and non-retried commands",
1688+
lines: `[2025/09/18 17:48:20.614 +08:10] [INFO] [logger.go:77] [ID=17581889006155] [TIMESTAMP=2025/09/18 17:48:20.614 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1000] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="SELECT 1"] [ROWS=1] [CONNECTION_ID=1001] [CLIENT_PORT=50112] [PID=542193] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED]
1689+
[2025/09/18 17:48:20.714 +08:10] [INFO] [logger.go:77] [ID=17581889006156] [TIMESTAMP=2025/09/18 17:48:20.714 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1000] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="SELECT 2"] [ROWS=1] [CONNECTION_ID=1001] [CLIENT_PORT=50112] [PID=542193] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [RETRY=true] [EVENT=COMPLETED]
1690+
[2025/09/18 17:48:20.814 +08:10] [INFO] [logger.go:77] [ID=17581889006157] [TIMESTAMP=2025/09/18 17:48:20.814 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1000] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="SELECT 3"] [ROWS=1] [CONNECTION_ID=1001] [CLIENT_PORT=50112] [PID=542193] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED]`,
1691+
enable: true,
1692+
cmdCount: 2,
1693+
},
1694+
{
1695+
name: "retry flag with other value should not be filtered",
1696+
lines: `[2025/09/18 17:48:20.614 +08:10] [INFO] [logger.go:77] [ID=17581889006155] [TIMESTAMP=2025/09/18 17:48:20.614 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1000] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="SELECT * FROM test"] [ROWS=1] [CONNECTION_ID=1001] [CLIENT_PORT=50112] [PID=542193] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [RETRY=false] [EVENT=COMPLETED]`,
1697+
enable: true,
1698+
cmdCount: 1,
1699+
},
1700+
}
1701+
1702+
for _, tt := range tests {
1703+
t.Run(tt.name, func(t *testing.T) {
1704+
decoder := NewAuditLogPluginDecoder(NewDeDup(), zap.NewNop())
1705+
if tt.enable {
1706+
decoder.EnableFilterCommandWithRetry()
1707+
}
1708+
mr := mockReader{data: append([]byte(tt.lines), '\n')}
1709+
cmds := make([]*Command, 0, 4)
1710+
for {
1711+
cmd, err := decoder.Decode(&mr)
1712+
if cmd == nil {
1713+
require.ErrorIs(t, err, io.EOF)
1714+
break
1715+
}
1716+
cmds = append(cmds, cmd)
1717+
}
1718+
require.Equal(t, tt.cmdCount, len(cmds))
1719+
})
1720+
}
1721+
}

pkg/sqlreplay/replay/replay.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ type ReplayConfig struct {
122122
OutputPath string
123123
// Addr is the downstream address to connect to
124124
Addr string
125+
// FilterCommandWithRetry indicates whether to filter out commands that are retries according to the audit log.
126+
FilterCommandWithRetry bool
125127
// the following fields are for testing
126128
readers []cmd.LineReader
127129
report report.Report
@@ -616,6 +618,10 @@ func (r *replay) constructDecoderForReader(ctx context.Context, reader cmd.LineR
616618
auditLogDecoder.SetPSCloseStrategy(r.cfg.PSCloseStrategy)
617619
auditLogDecoder.SetIDAllocator(idAllocator)
618620
auditLogDecoder.SetCommandEndTime(r.cfg.CommandEndTime)
621+
622+
if r.cfg.FilterCommandWithRetry {
623+
auditLogDecoder.EnableFilterCommandWithRetry()
624+
}
619625
}
620626

621627
var decoder decoder

pkg/sqlreplay/replay/replay_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -744,7 +744,7 @@ func TestDynamicInput(t *testing.T) {
744744
}
745745
}()
746746

747-
dirWatcherInterval := 10 * time.Millisecond
747+
dirWatcherInterval := 100 * time.Millisecond
748748
store.SetDirWatcherPollIntervalForTest(dirWatcherInterval)
749749

750750
auditLog := `[2025/09/08 21:16:29.585 +08:00] [INFO] [logger.go:77] [ID=17573373891] [TIMESTAMP=2025/09/06 16:16:29.585 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1057.834] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select 1"] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=52611] [PID=89967] [COMMAND=Query] [SQL_STATEMENTS=Set] [EXECUTE_PARAMS="[]"] [CURRENT_DB=] [EVENT=COMPLETED]

0 commit comments

Comments
 (0)