Skip to content

Commit

Permalink
scexec: treat BatchTimestampBeforeGCError as permanent
Browse files Browse the repository at this point in the history
This uses the same check and testing that was added in
a7cce24 for the materialized view
backfill.

Release note (bug fix): Fixed a bug where the error
"batch timestamp T must be after replica GC threshold" could occur
during a schema change backfill operation, and cause the schema change
job to retry infinitely. Now this error is treated as permanent, and
will cause the job to enter the failed state.
  • Loading branch information
rafiss committed Jan 16, 2025
1 parent f6e0097 commit ebad56f
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 92 deletions.
277 changes: 186 additions & 91 deletions pkg/sql/backfill_protected_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package sql_test
import (
"context"
gosql "database/sql"
"fmt"
"regexp"
"strings"
"sync"
Expand All @@ -23,6 +24,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -220,10 +223,10 @@ func TestValidationWithProtectedTS(t *testing.T) {
}
}

// TestBackfillQueryWithProtectedTS backfills a query into a table and confirms
// that a protected timestamp is setup. It also confirms that if the protected
// timestamp is not ready in time we do not infinitely retry.
func TestBackfillQueryWithProtectedTS(t *testing.T) {
// TestBackfillWithProtectedTS runs operations that backfill into a table and
// confirms that a protected timestamp is setup. It also confirms that if the
// protected timestamp is not ready in time we do not infinitely retry.
func TestBackfillWithProtectedTS(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -244,26 +247,68 @@ func TestBackfillQueryWithProtectedTS(t *testing.T) {
SQLEvalContext: &eval.TestingKnobs{
ForceProductionValues: true,
},
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx2 context.Context, request *kvpb.BatchRequest) *kvpb.Error {
// Detect the first operation on the backfill operation, this is before
// the PTS is setup, so the query will fail.
if blockBackFillsForPTSFailure.Load() &&
request.Txn != nil &&
request.Txn.Name == "schemaChangerBackfill" {
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
RunBeforeBackfill: func() error {
// Cause the backfill to pause before adding the protected
// timestamp. This knob is for testing schema changes that
// are on the declarative schema changer.
if blockBackFillsForPTSFailure.Load() {
if !blockBackFillsForPTSFailure.Swap(false) {
return nil
}
backfillQueryWait <- struct{}{}
<-backfillQueryResume
}
return nil
},
},
DistSQL: &execinfra.TestingKnobs{
RunBeforeBackfillChunk: func(sp roachpb.Span) error {
// Cause the backfill to pause after it already began running
// and has installed a protected timestamp. This knob is for
// testing schema changes that use the index backfiller.
if blockBackFillsForPTSCheck.Load() {
_, prefix, err := s.Codec().DecodeTablePrefix(sp.Key)
if err != nil || prefix != tableID {
//nolint:returnerrcheck
return nil
}
if !blockBackFillsForPTSCheck.Swap(false) {
return nil
}
backfillQueryWait <- struct{}{}
<-backfillQueryResume
}
return nil
},
},
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeQueryBackfill: func() error {
// Cause the backfill to pause before adding the protected
// timestamp. This knob is for testing CREATE MATERIALIZED VIEW.
if blockBackFillsForPTSFailure.Load() {
if !blockBackFillsForPTSFailure.Swap(false) {
return nil
}
backfillQueryWait <- struct{}{}
<-backfillQueryResume
}
// Detect the first scan on table from the backfill, this is after the
// PTS has been set-up.
return nil
},
},
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error {
// Detect the first scan on table from the backfill, which is
// after the PTS has been set up. This knob is for testing CREATE
// MATERIALIZED VIEW.
if blockBackFillsForPTSCheck.Load() &&
request.Txn != nil &&
request.Txn.Name == "schemaChangerBackfill" &&
request.Requests[0].GetInner().Method() == kvpb.Scan {
scan := request.Requests[0].GetScan()
_, prefix, err := s.Codec().DecodeTablePrefix(scan.Key)
if err != nil || prefix != tableID {
//nolint:returnerrcheck
return nil
}
if !blockBackFillsForPTSCheck.Swap(false) {
Expand All @@ -287,7 +332,7 @@ func TestBackfillQueryWithProtectedTS(t *testing.T) {
rSys := sqlutils.MakeSQLRunner(systemSqlDb)

// Refreshes the in-memory protected timestamp state to asOf.
refreshTo := func(t *testing.T, tableKey roachpb.Key, asOf hlc.Timestamp) error {
refreshTo := func(ctx context.Context, tableKey roachpb.Key, asOf hlc.Timestamp) error {
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
if err != nil {
return err
Expand All @@ -310,7 +355,7 @@ func TestBackfillQueryWithProtectedTS(t *testing.T) {
return repl.ReadProtectedTimestampsForTesting(ctx)
}
// Refresh forces the PTS cache to update to at least asOf.
refreshPTSCacheTo := func(t *testing.T, asOf hlc.Timestamp) error {
refreshPTSCacheTo := func(ctx context.Context, asOf hlc.Timestamp) error {
ptp := ts.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider
return ptp.Refresh(ctx, asOf)
}
Expand All @@ -322,87 +367,137 @@ func TestBackfillQueryWithProtectedTS(t *testing.T) {
} {
rSys.Exec(t, sql)
}
for _, sql := range []string{
"SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false",
"ALTER DATABASE defaultdb CONFIGURE ZONE USING gc.ttlseconds = 1",
"CREATE TABLE t(n int)",
"ALTER TABLE t CONFIGURE ZONE USING range_min_bytes = 0, range_max_bytes = 67108864, gc.ttlseconds = 1",
"INSERT INTO t(n) SELECT * FROM generate_series(1, 500000)",
} {
r.Exec(t, sql)
}

getTableID := func() (tableID uint32) {
r.QueryRow(t, `SELECT table_id FROM crdb_internal.tables`+
` WHERE name = 't' AND database_name = current_database()`).Scan(&tableID)
return tableID
}
tableID = getTableID()
tableKey := ts.Codec().TablePrefix(tableID)
const initialRowCount = 500000
const rowsDeletedPerIteration = 200000
const rowsAddedPerIteration = 1

grp := ctxgroup.WithContext(ctx)
grp.Go(func() error {
// We are going to do this twice, first to cause a PTS related failure,
// and a second time for the successful case. The first time we will cause
// the GC to happen before the PTS is setup. The second time we will allow
// the PTS to be installed and then cause the GC.
for i := 0; i < 2; i++ {
<-backfillQueryWait
if _, err := db.ExecContext(ctx, "SET sql_safe_updates=off"); err != nil {
return err
}
if _, err := db.ExecContext(ctx, "BEGIN; DELETE FROM t LIMIT 250000; INSERT INTO t VALUES('9999999'); COMMIT"); err != nil {
return err
}
if err := refreshTo(t, tableKey, ts.Clock().Now()); err != nil {
return err
for _, tc := range []struct {
name string
backfillSchemaChange string
jobDescriptionPrefix string
postTestQuery string
expectedCount int
}{
{
name: "create materialized view",
backfillSchemaChange: "CREATE MATERIALIZED VIEW test AS (SELECT n from t)",
jobDescriptionPrefix: "CREATE MATERIALIZED VIEW",
postTestQuery: "SELECT count(*) FROM test",
expectedCount: initialRowCount - rowsDeletedPerIteration + rowsAddedPerIteration,
},
{
name: "create index",
backfillSchemaChange: "CREATE INDEX idx ON t(n)",
jobDescriptionPrefix: "CREATE INDEX idx",
postTestQuery: "SELECT count(*) FROM t@idx",
expectedCount: initialRowCount - 2*rowsDeletedPerIteration + 2*rowsAddedPerIteration,
},
} {
t.Run(tc.name, func(t *testing.T) {
for _, sql := range []string{
"SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false",
"ALTER DATABASE defaultdb CONFIGURE ZONE USING gc.ttlseconds = 1",
"DROP TABLE IF EXISTS t CASCADE",
"CREATE TABLE t(n int)",
"ALTER TABLE t CONFIGURE ZONE USING range_min_bytes = 0, range_max_bytes = 67108864, gc.ttlseconds = 1",
fmt.Sprintf("INSERT INTO t(n) SELECT * FROM generate_series(1, %d)", initialRowCount),
} {
r.Exec(t, sql)
}
if err := refreshPTSCacheTo(t, ts.Clock().Now()); err != nil {
return err

getTableID := func() (tableID uint32) {
r.QueryRow(t, `SELECT 't'::regclass::oid`).Scan(&tableID)
return tableID
}
if _, err := db.ExecContext(ctx, `SELECT crdb_internal.kv_enqueue_replica(range_id, 'mvccGC', true)
tableID = getTableID()
tableKey := ts.Codec().TablePrefix(tableID)

grp := ctxgroup.WithContext(ctx)
grp.GoCtx(func(ctx context.Context) error {
// We are going to do this twice, first to cause a PTS related failure,
// and a second time for the successful case. The first time we will cause
// the GC to happen before the PTS is setup. The second time we will allow
// the PTS to be installed and then cause the GC.
for i := 0; i < 2; i++ {
<-backfillQueryWait
if _, err := db.ExecContext(ctx, "SET sql_safe_updates=off"); err != nil {
return err
}
if _, err := db.ExecContext(ctx, fmt.Sprintf(
"BEGIN; DELETE FROM t LIMIT %d; INSERT INTO t VALUES('9999999'); COMMIT",
rowsDeletedPerIteration,
)); err != nil {
return err
}
if err := refreshTo(ctx, tableKey, ts.Clock().Now()); err != nil {
return err
}
if err := refreshPTSCacheTo(ctx, ts.Clock().Now()); err != nil {
return err
}
if _, err := db.ExecContext(ctx, `
SELECT crdb_internal.kv_enqueue_replica(range_id, 'mvccGC', true)
FROM (SELECT range_id FROM [SHOW RANGES FROM TABLE t] ORDER BY start_key);`); err != nil {
return err
}
row := db.QueryRow("SELECT count(*) FROM system.protected_ts_records WHERE meta_type='jobs'")
var count int
if err := row.Scan(&count); err != nil {
return err
}
// First iteration is before the PTS is setup, so it will be 0. Second
// iteration the PTS should be setup.
expectedCount := i
if count != expectedCount {
return errors.AssertionFailedf("no protected timestamp was set up by the schema change job (expected %d, got : %d)", expectedCount, count)
}
backfillQueryResume <- struct{}{}
}
return nil
})
grp.Go(func() error {
// Backfill with the PTS being not setup early enough, which will
// lead to failure.
blockBackFillsForPTSFailure.Swap(true)
_, err := db.ExecContext(ctx, `CREATE MATERIALIZED VIEW test AS (SELECT n from t)`)
if err == nil || !testutils.IsError(err, "unable to retry backfill since fixed timestamp is before the GC timestamp") {
return errors.AssertionFailedf("expected error was not hit")
}
// Next backfill with the PTS being setup on time, which should always
// succeed.
blockBackFillsForPTSCheck.Swap(true)
_, err = db.ExecContext(ctx, `CREATE MATERIALIZED VIEW test AS (SELECT n from t)`)
return err
})
return err
}
row := db.QueryRowContext(ctx, "SELECT count(*) FROM system.protected_ts_records WHERE meta_type='jobs'")
var count int
if err := row.Scan(&count); err != nil {
return err
}
// First iteration is before the PTS is setup, so it will be 0. Second
// iteration the PTS should be setup.
expectedCount := i
if count != expectedCount {
return errors.AssertionFailedf("no protected timestamp was set up by the schema change job (expected %d, got : %d)", expectedCount, count)
}
backfillQueryResume <- struct{}{}
}
return nil
})
grp.GoCtx(func(ctx context.Context) error {
// Backfill with the PTS being not setup early enough, which will
// lead to failure.
blockBackFillsForPTSFailure.Swap(true)
_, err := db.ExecContext(ctx, tc.backfillSchemaChange)
if err == nil || !testutils.IsError(err, "unable to retry backfill since fixed timestamp is before the GC timestamp") {
return errors.AssertionFailedf("expected error was not hit")
}
testutils.SucceedsSoon(t, func() error {
// Wait until schema change is fully rolled back.
var status string
err = db.QueryRowContext(ctx, fmt.Sprintf(
"SELECT status FROM crdb_internal.jobs WHERE description LIKE '%s%%'",
tc.jobDescriptionPrefix,
)).Scan(&status)
if err != nil {
return err
}
if status != "failed" {
return errors.Newf("schema change not rolled back yet; status=%s", status)
}
return nil
})
// Next backfill with the PTS being setup on time, which should always
// succeed.
blockBackFillsForPTSCheck.Swap(true)
_, err = db.ExecContext(ctx, tc.backfillSchemaChange)
if err != nil {
return err
}
return nil
})

require.NoError(t, grp.Wait())
var rowCount int
res := r.QueryRow(t, `SELECT count(*) FROM test`)
res.Scan(&rowCount)
// Half the row count plus the one row inserted above.
const expectedCount = 250000 + 1
if rowCount != expectedCount {
t.Errorf("expected %d entries, got %d", expectedCount, rowCount)
require.NoError(t, grp.Wait())
var rowCount int
res := r.QueryRow(t, tc.postTestQuery)
res.Scan(&rowCount)
if rowCount != tc.expectedCount {
t.Errorf("expected %d entries, got %d", tc.expectedCount, rowCount)
}
require.Falsef(t, blockBackFillsForPTSFailure.Load(), "no backfill txn was detected in testing knob.")
require.Falsef(t, blockBackFillsForPTSCheck.Load(), "no backfill txn was detected in testing knob.")
})
}
require.NoError(t, db.Close())
require.Equalf(t, false, blockBackFillsForPTSFailure.Load(), "no backfill txn was dected in testing knob.")
}
19 changes: 18 additions & 1 deletion pkg/sql/schemachanger/scexec/exec_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,26 @@ func executeBackfillOps(ctx context.Context, deps Dependencies, execute []scop.O
return err
}
if err := runBackfiller(ctx, deps, tracker, backfillProgresses, mergeProgresses, deps.TransactionalJobRegistry().CurrentJob(), tables); err != nil {
if errors.HasType(err, &kvpb.InsufficientSpaceError{}) {
if errors.HasType(err, (*kvpb.InsufficientSpaceError)(nil)) {
return jobs.MarkPauseRequestError(errors.UnwrapAll(err))
}
if errors.HasType(err, (*kvpb.BatchTimestampBeforeGCError)(nil)) {
// We will not ever move the timestamp forward so this will fail forever.
// Mark as a permanent error.
if scerrors.HasSchemaChangerUserError(err) {
// We need to unwrap this so that the PermanentJobError is marked
// at the correct level.
err = scerrors.UnwrapSchemaChangerUserError(err)
}
return scerrors.SchemaChangerUserError(
jobs.MarkAsPermanentJobError(
errors.Wrap(
err,
"unable to retry backfill since fixed timestamp is before the GC timestamp",
),
),
)
}
return err
}
return nil
Expand Down

0 comments on commit ebad56f

Please sign in to comment.