Skip to content

Commit

Permalink
sql: prevent use of leaf txn with volatile routines
Browse files Browse the repository at this point in the history
This patch disallows leaf transactions during execution for two cases:
1. The plan contains a volatile routine.
2. The plan contains a routine with a PLpgSQL exception handler.

This is necessary because volatile routines use transaction stepping, and
exception handlers use internal savepoints. Both of these cases are
incompatible with concurrent leaf transactions, and may cause internal
errors is not handled. Note that (1) implies (2) currently, but (1) may
be relaxed in the future, so we explicitly prevent both to be defensive.

Fixes #112188

Release note (bug fix): Fixed a rare bug that could result in an internal
error when executing a volatile UDF in a query along with a lookup join.
  • Loading branch information
DrewKimball committed Mar 2, 2024
1 parent 96ccd91 commit 2130596
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 71 deletions.
2 changes: 2 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4857,6 +4857,7 @@ func (dsp *DistSQLPlanner) NewPlanningCtxWithOracle(
if !distribute {
if planner == nil ||
evalCtx.SessionData().Internal ||
planner.curPlan.flags.IsSet(planFlagMustUseRootTxn) ||
planner.curPlan.flags.IsSet(planFlagContainsMutation) ||
planner.curPlan.flags.IsSet(planFlagContainsLocking) {
// Don't parallelize the scans if we have a local plan if
Expand All @@ -4867,6 +4868,7 @@ func (dsp *DistSQLPlanner) NewPlanningCtxWithOracle(
// and returning early in this function allows us to avoid the race
// on dsp.spanResolver in fakedist logic test configs without adding
// any synchronization (see #116039);
// - a node (e.g. volatile routine) explicitly requires the RootTxn.
// - the plan contains a mutation operation - we currently don't
// support any parallelism when mutations are present;
// - the plan uses locking (see #94290).
Expand Down
148 changes: 77 additions & 71 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,84 +701,80 @@ func (dsp *DistSQLPlanner) Run(
localState.Collection = planCtx.planner.Descriptors()
}

// noMutations indicates whether we know for sure that the plan doesn't have
// any mutations. If we don't have the access to the planner (which can be
// the case not on the main query execution path, i.e. BulkIO, CDC, etc),
// then we are ignorant of the details of the execution plan, so we choose
// to be on the safe side and mark 'noMutations' as 'false'.
noMutations := planCtx.planner != nil && !planCtx.planner.curPlan.flags.IsSet(planFlagContainsMutation)

if txn == nil {
// Txn can be nil in some cases, like BulkIO flows. In such a case, we
// cannot create a LeafTxn, so we cannot parallelize scans.
planCtx.parallelizeScansIfLocal = false
} else {
if planCtx.isLocal && noMutations && planCtx.parallelizeScansIfLocal {
// canUseLeafTxn indicates whether the plan contains an expression that
// cannot tolerate a concurrent LeafTxn. This is the case if the expression
// must use a RootTxn.
//
// Txn and/or the planner can be nil, which can happen if this isn't the main
// query execution path, i.e. BulkIO, CDC, etcBulkIO flows. In such a case,
// we cannot create a LeafTxn.
canUseLeafTxn := txn != nil && planCtx.planner != nil
if planCtx.planner != nil {
// Some plan nodes (e.g. volatile UDFs) explicitly require RootTxn use.
canUseLeafTxn = canUseLeafTxn && !planCtx.planner.curPlan.flags.IsSet(planFlagMustUseRootTxn)
// Mutations use the RootTxn, and so they disallow LeafTxns.
// TODO(yuzefovich): we could be smarter here and allow the usage of
// the RootTxn by the mutations while still using the Streamer (that
// gets a LeafTxn) iff the plan is such that there is no concurrency
// between the root and the leaf txns.
canUseLeafTxn = canUseLeafTxn && !planCtx.planner.curPlan.flags.IsSet(planFlagContainsMutation)
// At the moment, we disable the usage of the Streamer API for local plans
// when locking is used by any of the processors. This is the case since
// the lock spans propagation doesn't happen for the leaf txns which can
// result in excessive contention for future reads (since the acquired
// locks are not cleaned up properly when the txn commits).
// TODO(yuzefovich): fix the propagation of the lock spans with the leaf
// txns and remove this check. See #94290.
canUseLeafTxn = canUseLeafTxn && !planCtx.planner.curPlan.flags.IsSet(planFlagContainsLocking)
}
if canUseLeafTxn {
// We also currently disable concurrency whenever we have a wrapped
// planNode. This is done to prevent scenarios where some of planNodes will
// use the RootTxn (via the internal executor) which prohibits the usage of
// the LeafTxn for this flow.
//
// Note that we're disallowing concurrency (e.g. the Streamer API) in more
// cases than strictly necessary, since there are planNodes that don't use
// the txn at all. However, auditing each planNode implementation to see
// which are using the internal executor is error-prone, so we just disable
// the Streamer API for the "super-set" of problematic cases.
for _, p := range plan.Processors {
if p.Spec.Core.LocalPlanNode != nil {
canUseLeafTxn = false
break
}
}
}

if canUseLeafTxn {
if planCtx.isLocal && planCtx.parallelizeScansIfLocal {
// Even though we have a single flow on the gateway node, we might
// have decided to parallelize the scans. If that's the case, we
// will need to use the Leaf txn.
for _, flow := range flows {
localState.HasConcurrency = localState.HasConcurrency || execinfra.HasParallelProcessors(flow)
}
}
if noMutations {
// Even if planCtx.isLocal is false (which is the case when we think
// it's worth distributing the query), we need to go through the
// processors to figure out whether any of them have concurrency.
//
// However, the concurrency requires the usage of LeafTxns which is
// only acceptable if we don't have any mutations in the plan.
// TODO(yuzefovich): we could be smarter here and allow the usage of
// the RootTxn by the mutations while still using the Streamer (that
// gets a LeafTxn) iff the plan is such that there is no concurrency
// between the root and the leaf txns.
//
// At the moment of writing, this is only relevant whenever the
// Streamer API might be used by some of the processors. The
// Streamer internally can have concurrency, so it expects to be
// given a LeafTxn. In order for that LeafTxn to be created later,
// during the flow setup, we need to populate leafInputState below,
// so we tell the localState that there is concurrency.

// At the moment, we disable the usage of the Streamer API for local plans
// when locking is used by any of the processors. This is the case since
// the lock spans propagation doesn't happen for the leaf txns which can
// result in excessive contention for future reads (since the acquired
// locks are not cleaned up properly when the txn commits).
// TODO(yuzefovich): fix the propagation of the lock spans with the leaf
// txns and remove this check. See #94290.
containsLocking := planCtx.planner != nil && planCtx.planner.curPlan.flags.IsSet(planFlagContainsLocking)

// We also currently disable the usage of the Streamer API whenever
// we have a wrapped planNode. This is done to prevent scenarios
// where some of planNodes will use the RootTxn (via the internal
// executor) which prohibits the usage of the LeafTxn for this flow.
//
// Note that we're disallowing the Streamer API in more cases than
// strictly necessary (i.e. there are planNodes that don't use the
// txn at all), but auditing each planNode implementation to see
// which are using the internal executor is error-prone, so we just
// disable the Streamer API for the "super-set" of problematic
// cases.
mustUseRootTxn := func() bool {
for _, p := range plan.Processors {
if p.Spec.Core.LocalPlanNode != nil {
return true
}
}
return false
}()
if !containsLocking && !mustUseRootTxn {
if evalCtx.SessionData().StreamerEnabled {
for _, proc := range plan.Processors {
if jr := proc.Spec.Core.JoinReader; jr != nil {
// Both index and lookup joins, with and without
// ordering, are executed via the Streamer API that has
// concurrency.
localState.HasConcurrency = true
break
}
}
// Even if planCtx.isLocal is false (which is the case when we think
// it's worth distributing the query), we need to go through the
// processors to figure out whether any of them have concurrency.
// However, the concurrency requires the usage of LeafTxns, so we have
// to check whether LeafTxns are acceptable for the plan.
//
// At the moment of writing, this is only relevant whenever the
// Streamer API might be used by some of the processors. The
// Streamer internally can have concurrency, so it expects to be
// given a LeafTxn. In order for that LeafTxn to be created later,
// during the flow setup, we need to populate leafInputState below,
// so we tell the localState that there is concurrency.
if evalCtx.SessionData().StreamerEnabled {
for _, proc := range plan.Processors {
if jr := proc.Spec.Core.JoinReader; jr != nil {
// Both index and lookup joins, with and without
// ordering, are executed via the Streamer API that has
// concurrency.
localState.HasConcurrency = true
break
}
}
}
Expand All @@ -798,6 +794,16 @@ func (dsp *DistSQLPlanner) Run(
}
leafInputState = tis
}
} else {
// We cannot use a LeafTxn, so we cannot parallelize scans.
planCtx.parallelizeScansIfLocal = false
if buildutil.CrdbTestBuild {
if planCtx.planner != nil && localState.MustUseLeafTxn() {
recv.SetError(errors.AssertionFailedf(
"MustUseLeafTxn() returned true when canUseLeafTxn is false",
))
}
}
}

if !planCtx.skipDistSQLDiagramGeneration && log.ExpensiveLogEnabled(ctx, 2) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/exec_factory_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func constructPlan(
if flags.IsSet(exec.PlanFlagCheckContainsLocking) {
res.flags.Set(planFlagCheckContainsLocking)
}
if flags.IsSet(exec.PlanFlagMustUseRootTxn) {
res.flags.Set(planFlagMustUseRootTxn)
}

return res, nil
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree/treewindow"
"github.com/cockroachdb/cockroach/pkg/sql/sem/volatility"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -3367,6 +3368,12 @@ func (b *Builder) buildCall(c *memo.CallExpr) (_ execPlan, outputCols colOrdMap,
}
}

// Volatile routines step the transaction, and routines with an exception
// block use internal savepoints.
if udf.Def.Volatility == volatility.Volatile || udf.Def.ExceptionBlock != nil {
b.flags.Set(exec.PlanFlagMustUseRootTxn)
}

// Create a tree.RoutinePlanFn that can plan the statements in the UDF body.
planGen := b.buildRoutinePlanGenerator(
udf.Def.Params,
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt/exec/execbuilder/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,12 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ
}
}

// Volatile routines step the transaction, and routines with an exception
// block use internal savepoints.
if udf.Def.Volatility == volatility.Volatile || udf.Def.ExceptionBlock != nil {
b.flags.Set(exec.PlanFlagMustUseRootTxn)
}

if udf.Def.BlockState != nil {
b.initRoutineExceptionHandler(udf.Def.BlockState, udf.Def.ExceptionBlock)
}
Expand Down
110 changes: 110 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/udf
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,113 @@ vectorized: true
columns: (f88259)
size: 1 column, 1 row
row 0, expr 0: f88259(333, 444)

# Regression test for #112188 - disable concurrency in the presence of a volatile
# routine.
subtest prevent_leaf_txn

statement ok
CREATE TABLE t1_112188 (a INT, b INT, c INT);

statement ok
CREATE TABLE t2_112188 (d INT PRIMARY KEY);

statement ok
CREATE FUNCTION fn_vol_112188(c INT) RETURNS t1_112188 LANGUAGE SQL AS $$
SELECT * FROM t1_112188 te
$$;

statement ok
CREATE FUNCTION fn_non_vol_112188(c INT) RETURNS t1_112188 IMMUTABLE LANGUAGE SQL AS $$
SELECT ROW(1, 2, 3);
$$;

# The streamer uses a leaf txn, so it should be disabled when the query
# has a volatile UDF.
query T
EXPLAIN ANALYZE
SELECT 1 FROM fn_vol_112188(1) AS tmp
INNER JOIN t2_112188 ON tmp.a = t2_112188.d;
----
planning time: 10µs
execution time: 100µs
distribution: <hidden>
vectorized: <hidden>
maximum memory usage: <hidden>
network usage: <hidden>
regions: <hidden>
isolation level: serializable
priority: normal
quality of service: regular
·
• render
└── • lookup join
│ nodes: <hidden>
│ regions: <hidden>
│ actual row count: 0
│ KV time: 0µs
│ KV contention time: 0µs
│ KV rows decoded: 0
│ KV bytes read: 0 B
│ KV gRPC calls: 0
│ estimated max memory allocated: 0 B
│ table: t2_112188@t2_112188_pkey
│ equality: (a) = (d)
│ equality cols are key
└── • project set
│ nodes: <hidden>
│ regions: <hidden>
│ actual row count: 0
│ estimated row count: 1
└── • emptyrow
nodes: <hidden>
regions: <hidden>
actual row count: 1

query T
EXPLAIN ANALYZE
SELECT 1 FROM fn_non_vol_112188(1) AS tmp
INNER JOIN t2_112188 ON tmp.a = t2_112188.d;
----
planning time: 10µs
execution time: 100µs
distribution: <hidden>
vectorized: <hidden>
maximum memory usage: <hidden>
network usage: <hidden>
regions: <hidden>
isolation level: serializable
priority: normal
quality of service: regular
·
• render
└── • lookup join (streamer)
│ nodes: <hidden>
│ regions: <hidden>
│ actual row count: 0
│ KV time: 0µs
│ KV contention time: 0µs
│ KV rows decoded: 0
│ KV bytes read: 0 B
│ KV gRPC calls: 0
│ estimated max memory allocated: 0 B
│ table: t2_112188@t2_112188_pkey
│ equality: (a) = (d)
│ equality cols are key
└── • project set
│ nodes: <hidden>
│ regions: <hidden>
│ actual row count: 1
│ estimated row count: 1
└── • emptyrow
nodes: <hidden>
regions: <hidden>
actual row count: 1

subtest end
5 changes: 5 additions & 0 deletions pkg/sql/opt/exec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ const (
// check plan uses locking. Typically this is set for plans with FK checks
// under read committed isolation.
PlanFlagCheckContainsLocking

// PlanFlagMustUseRootTxn is set if at least one node cannot be executed in
// the presence of concurrency. Note that PlanFlagMustUseRootTxn is not the
// only condition that forces use of the root transaction.
PlanFlagMustUseRootTxn
)

func (pf PlanFlags) IsSet(flag PlanFlags) bool {
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,12 @@ const (
// planFlagSessionMigration is set if the plan is being created during
// a session migration.
planFlagSessionMigration

// planFlagUseRootTxn is used to prevent usage of leaf transactions. This is
// necessary because some expressions cannot be executed in the presence of
// concurrency. Note that planFlagUseRootTxn is not the only condition that
// forces use of the root transaction.
planFlagMustUseRootTxn
)

func (pf planFlags) IsSet(flag planFlags) bool {
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -267,6 +268,11 @@ func (g *routineGenerator) startInternal(ctx context.Context, txn *kv.Txn) (err
// snapshot to advance, because we do not support restoring the txn's prior
// external read snapshot after returning from the volatile function.
if g.expr.EnableStepping {
if buildutil.CrdbTestBuild {
if txn.Type() != kv.RootTxn {
return errors.AssertionFailedf("routine: cannot step LeafTxn")
}
}
if err := txn.Step(ctx, false /* allowReadTimestampStep */); err != nil {
return err
}
Expand Down Expand Up @@ -310,6 +316,11 @@ func (g *routineGenerator) handleException(ctx context.Context, err error) error
if err == nil || blockState == nil || blockState.ExceptionHandler == nil {
return err
}
if buildutil.CrdbTestBuild {
if g.p.Txn().Type() != kv.RootTxn {
return errors.AssertionFailedf("routine: cannot handle exception in LeafTxn")
}
}
caughtCode := pgerror.GetPGCode(err)
if caughtCode == pgcode.Uncategorized {
// It is not safe to catch an uncategorized error.
Expand Down

0 comments on commit 2130596

Please sign in to comment.