diff --git a/reader/mssql/mssql.go b/reader/mssql/mssql.go index fa1183448..e970fb827 100644 --- a/reader/mssql/mssql.go +++ b/reader/mssql/mssql.go @@ -469,7 +469,7 @@ func (r *MssqlReader) sendError(err error) { } func (r *MssqlReader) checkExit(idx int, db *sql.DB) (bool, int64) { - if len(r.offsetKey) <= 0 { + if idx >= len(r.offsets) || len(r.offsetKey) <= 0 { return true, -1 } rawSQL := strings.TrimSuffix(strings.TrimSpace(r.syncSQLs[idx]), ";") diff --git a/reader/mysql/mysql.go b/reader/mysql/mysql.go index 876ccc005..233c9633d 100644 --- a/reader/mysql/mysql.go +++ b/reader/mysql/mysql.go @@ -681,7 +681,7 @@ func (r *MysqlReader) getSQL(idx int, rawSQL string) string { } func (r *MysqlReader) checkExit(idx int, db *sql.DB) (bool, int64) { - if len(r.offsetKey) <= 0 { + if len(r.offsetKey) <= 0 || idx >= len(r.offsets) { return true, -1 } rawSQL := r.syncSQLs[idx] @@ -971,7 +971,7 @@ func (r *MysqlReader) execReadSql(curDB string, idx int, execSQL string, db *sql if maxOffset > 0 { r.offsets[idx] = maxOffset + 1 } - if exit { + if exit && !r.historyAll { var newOffsetIdx int64 exit, newOffsetIdx = r.checkExit(idx, db) if !exit { diff --git a/reader/postgres/postgres.go b/reader/postgres/postgres.go index 9a8cdb3a1..1780fb075 100644 --- a/reader/postgres/postgres.go +++ b/reader/postgres/postgres.go @@ -644,7 +644,7 @@ func (r *PostgresReader) getSQL(idx int, rawSQL string) string { } func (r *PostgresReader) checkExit(idx int, db *sql.DB) (bool, int64) { - if len(r.offsetKey) <= 0 && len(r.timestampKey) <= 0 { + if idx >= len(r.offsets) || (len(r.offsetKey) <= 0 && len(r.timestampKey) <= 0) { return true, -1 } rawSQL := r.syncSQLs[idx]