Skip to content

rowexec: upgrade hyperloglog for table stats processors #138561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -487,5 +487,6 @@ replace golang.org/x/time => github.com/cockroachdb/x-time v0.3.1-0.202305251236

replace github.com/gogo/protobuf => github.com/cockroachdb/gogoproto v1.3.3-0.20241216150617-2358cdb156a1

// TODO(138546): hard-code the version that we use for table stats.
// TODO(yuzefovich): remove this version once compatibility with 24.3 is no
// longer needed.
replace github.com/axiomhq/hyperloglog/000 => github.com/axiomhq/hyperloglog v0.0.0-20181223111420-4b99d0c2c99e
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1938,6 +1938,7 @@ GO_TARGETS = [
"//pkg/sql/execinfrapb:execinfrapb_test",
"//pkg/sql/execstats:execstats",
"//pkg/sql/execstats:execstats_test",
"//pkg/sql/execversion:execversion",
"//pkg/sql/exprutil:evalexpr",
"//pkg/sql/exprutil:exprutil",
"//pkg/sql/exprutil:exprutil_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ go_library(
"//pkg/sql/execinfra/execreleasable",
"//pkg/sql/execinfrapb",
"//pkg/sql/execstats",
"//pkg/sql/execversion",
"//pkg/sql/exprutil",
"//pkg/sql/faketreeeval",
"//pkg/sql/flowinfra",
Expand Down
41 changes: 6 additions & 35 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2439,8 +2439,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
}
copyRes := ex.clientComm.CreateCopyOutResult(copyCmd, pos)
res = copyRes
stmtCtx := withStatement(ctx, copyTo)
ev, payload = ex.execCopyOut(stmtCtx, copyCmd, copyRes)
ev, payload = ex.execCopyOut(ctx, copyCmd, copyRes)
return nil
}

Expand Down Expand Up @@ -2516,8 +2515,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
case PrepareStmt:
ex.curStmtAST = tcmd.AST
res = ex.clientComm.CreatePrepareResult(pos)
stmtCtx := withStatement(ctx, ex.curStmtAST)
ev, payload = ex.execPrepare(stmtCtx, tcmd)
ev, payload = ex.execPrepare(ctx, tcmd)
case DescribeStmt:
descRes := ex.clientComm.CreateDescribeResult(pos)
res = descRes
Expand Down Expand Up @@ -2566,8 +2564,7 @@ func (ex *connExecutor) execCmd() (retErr error) {

copyRes := ex.clientComm.CreateCopyInResult(tcmd, pos)
res = copyRes
stmtCtx := withStatement(ctx, tcmd.Stmt)
ev, payload = ex.execCopyIn(stmtCtx, tcmd, copyRes)
ev, payload = ex.execCopyIn(ctx, tcmd, copyRes)

// Note: we write to ex.statsCollector.phaseTimes, instead of ex.phaseTimes,
// because:
Expand All @@ -2582,8 +2579,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionEndParse, tcmd.ParseEnd)
copyRes := ex.clientComm.CreateCopyOutResult(tcmd, pos)
res = copyRes
stmtCtx := withStatement(ctx, tcmd.Stmt)
ev, payload = ex.execCopyOut(stmtCtx, tcmd, copyRes)
ev, payload = ex.execCopyOut(ctx, tcmd, copyRes)

// Note: we write to ex.statsCollector.phaseTimes, instead of ex.phaseTimes,
// because:
Expand Down Expand Up @@ -3952,7 +3948,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
err := func() error {
ex.mu.Lock()
defer ex.mu.Unlock()
return ex.machine.ApplyWithPayload(withStatement(ex.Ctx(), ex.curStmtAST), ev, payload)
return ex.machine.ApplyWithPayload(ex.Ctx(), ev, payload)
}()
if err != nil {
if errors.HasType(err, (*fsm.TransitionNotFoundError)(nil)) {
Expand Down Expand Up @@ -4738,23 +4734,6 @@ func (ps connExPrepStmtsAccessor) DeleteAll(ctx context.Context) {
)
}

var contextStatementKey = ctxutil.RegisterFastValueKey()

// withStatement adds a SQL statement to the provided context. The statement
// will then be included in crash reports which use that context.
func withStatement(ctx context.Context, stmt tree.Statement) context.Context {
return ctxutil.WithFastValue(ctx, contextStatementKey, stmt)
}

// statementFromCtx returns the statement value from a context, or nil if unset.
func statementFromCtx(ctx context.Context) tree.Statement {
stmt := ctxutil.FastValue(ctx, contextStatementKey)
if stmt == nil {
return nil
}
return stmt.(tree.Statement)
}

var contextPlanGistKey = ctxutil.RegisterFastValueKey()

func withPlanGist(ctx context.Context, gist string) context.Context {
Expand All @@ -4773,15 +4752,7 @@ func planGistFromCtx(ctx context.Context) string {
}

func init() {
// Register a function to include the anonymized statement in crash reports.
logcrash.RegisterTagFn("statement", func(ctx context.Context) string {
stmt := statementFromCtx(ctx)
if stmt == nil {
return ""
}
// Anonymize the statement for reporting.
return anonymizeStmtAndConstants(stmt, nil /* VirtualTabler */, nil /* ClientNoticeSender */)
})
// Register a function to include the plan gist in crash reports.
logcrash.RegisterTagFn("gist", func(ctx context.Context) string {
return planGistFromCtx(ctx)
})
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,6 @@ func (ex *connExecutor) execStmtInOpenState(
ctx, sp = tracing.ChildSpan(ctx, "sql query")
// TODO(andrei): Consider adding the placeholders as tags too.
sp.SetTag("statement", attribute.StringValue(parserStmt.SQL))
ctx = withStatement(ctx, ast)
defer sp.Finish()

makeErrEvent := func(err error) (fsm.Event, fsm.EventPayload, error) {
Expand Down Expand Up @@ -1224,7 +1223,6 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
ctx, sp = tracing.ChildSpan(ctx, "sql query")
// TODO(andrei): Consider adding the placeholders as tags too.
sp.SetTag("statement", attribute.StringValue(parserStmt.SQL))
ctx = withStatement(ctx, vars.ast)
if portal.isPausable() {
portal.pauseInfo.execStmtInOpenState.spCtx = ctx
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfra/execopnode",
"//pkg/sql/execinfrapb",
"//pkg/sql/execversion",
"//pkg/sql/faketreeeval",
"//pkg/sql/flowinfra",
"//pkg/sql/rowflow",
Expand Down Expand Up @@ -81,6 +82,7 @@ go_test(
"//pkg/sql/execinfra",
"//pkg/sql/execinfra/execagg",
"//pkg/sql/execinfrapb",
"//pkg/sql/execversion",
"//pkg/sql/flowinfra",
"//pkg/sql/randgen",
"//pkg/sql/rowenc",
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execversion"
"github.com/cockroachdb/cockroach/pkg/sql/faketreeeval"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/rowflow"
Expand Down Expand Up @@ -207,14 +208,15 @@ func (ds *ServerImpl) setupFlow(
}
}()

if req.Version < execinfra.MinAcceptedVersion || req.Version > execinfra.Version {
if req.Version < execversion.MinAccepted || req.Version > execversion.Latest {
err := errors.Errorf(
"version mismatch in flow request: %d; this node accepts %d through %d",
req.Version, execinfra.MinAcceptedVersion, execinfra.Version,
req.Version, execversion.MinAccepted, execversion.Latest,
)
log.Warningf(ctx, "%v", err)
return ctx, nil, nil, err
}
ctx = execversion.WithVersion(ctx, req.Version)

const opName = "flow"
if parentSpan == nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql/setup_flow_after_drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execversion"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -47,7 +47,7 @@ func TestSetupFlowAfterDrain(t *testing.T) {
)

// We create some flow; it doesn't matter what.
req := execinfrapb.SetupFlowRequest{Version: execinfra.Version}
req := execinfrapb.SetupFlowRequest{Version: execversion.Latest}
req.Flow = execinfrapb.FlowSpec{
Processors: []execinfrapb.ProcessorSpec{
{
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execversion"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -405,9 +407,13 @@ func (dsp *DistSQLPlanner) setupFlows(
if len(statementSQL) > setupFlowRequestStmtMaxLength {
statementSQL = statementSQL[:setupFlowRequestStmtMaxLength]
}
execVersion := execversion.V24_3
if dsp.st.Version.IsActive(ctx, clusterversion.V25_1) {
execVersion = execversion.V25_1
}
setupReq := execinfrapb.SetupFlowRequest{
LeafTxnInputState: leafInputState,
Version: execinfra.Version,
Version: execVersion,
TraceKV: evalCtx.Tracing.KVTracingEnabled(),
CollectStats: planCtx.collectExecStats,
StatementSQL: statementSQL,
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ go_library(
"server_config.go",
"testutils.go",
"utils.go",
"version.go",
":gen-consumerstatus-stringer", # keep
":gen-procstate-stringer", # keep
],
Expand Down Expand Up @@ -90,7 +89,6 @@ go_test(
srcs = [
"base_test.go",
"main_test.go",
"version_test.go",
],
embed = [":execinfra"],
deps = [
Expand All @@ -106,7 +104,6 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"@com_github_stretchr_testify//require",
],
)

Expand Down
Loading
Loading