Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) histor
return runner
}

// makeFixedTimestampRunner creates a HistoricalTxnRunner suitable for use by the helpers.
// makeFixedTimestampInternalExecRunner creates a HistoricalInternalExecTxnRunner suitable for use by the helpers.
func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
readAsOf hlc.Timestamp,
) descs.HistoricalInternalExecTxnRunner {
Expand Down Expand Up @@ -205,7 +205,7 @@ func (sc *SchemaChanger) fixedTimestampTxnWithExecutor(
return err
}
return retryable(ctx, txn)
})
}, WithBulkPri(true))
}

// runBackfill runs the backfill for the schema changer.
Expand Down Expand Up @@ -1024,6 +1024,7 @@ func (sc *SchemaChanger) distIndexBackfill(
var planCtx *PlanningCtx
// The txn is used to fetch a tableDesc, partition the spans and set the
// evalCtx ts all of which is during planning of the DistSQL flow.
// Can run with normal priority since creating the plan is relatively fast.
if err := sc.txn(ctx, func(
ctx context.Context, txn descs.Txn,
) error {
Expand Down Expand Up @@ -1355,6 +1356,8 @@ func (sc *SchemaChanger) distColumnBackfill(
// Make sure not to update todoSpans inside the transaction closure as it
// may not commit. Instead write the updated value for todoSpans to this
// variable and assign to todoSpans after committing.
// Can run with normal priority since creating the plan is relatively fast,
// and distSQL manages its own transactions.
var updatedTodoSpans []roachpb.Span
if err := sc.txn(ctx, func(
ctx context.Context, txn descs.Txn,
Expand Down
39 changes: 37 additions & 2 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2900,18 +2900,53 @@ type SchemaChangerTestingKnobs struct {
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
func (*SchemaChangerTestingKnobs) ModuleTestingKnobs() {}

// txnOptions holds options for schema changer transactions.
type txnOptions struct {
// bulkPri would be set to true if the transaction must run with bulk priority
bulkPri bool
}

// TxnOption is an interface for configuring transaction options.
type TxnOption interface {
apply(*txnOptions)
}

// bulkPriOption implements TxnOption for bulk priority.
type bulkPriOption bool

func (b bulkPriOption) apply(t *txnOptions) {
t.bulkPri = bool(b)
}

// WithBulkPri returns a TxnOption that sets the bulk priority flag.
func WithBulkPri(b bool) TxnOption {
return bulkPriOption(b)
}

// txn is a convenient wrapper around descs.Txn().
func (sc *SchemaChanger) txn(ctx context.Context, f func(context.Context, descs.Txn) error) error {
func (sc *SchemaChanger) txn(
ctx context.Context, f func(context.Context, descs.Txn) error, opts ...TxnOption,
) error {
if fn := sc.testingKnobs.RunBeforeDescTxn; fn != nil {
if err := fn(sc.job.ID()); err != nil {
return err
}
}
options := txnOptions{}
for _, opt := range opts {
opt.apply(&options)
}
// Use normal priority by default unless specified. This prevents trivial
// schema changes like CREATE TABLE from being blocked during system overload.
priority := admissionpb.NormalPri
if options.bulkPri {
priority = admissionpb.BulkNormalPri
}
return sc.execCfg.InternalDB.DescsTxn(ctx, func(
ctx context.Context, txn descs.Txn,
) error {
return f(ctx, txn)
}, isql.WithPriority(admissionpb.BulkNormalPri))
}, isql.WithPriority(priority))
}

// createSchemaChangeEvalCtx creates an extendedEvalContext() to be used for backfills.
Expand Down