Skip to content

Commit 63b11a9

Browse files
craig[bot]yuzefovich
andcommitted
Merge #26914
26914: distsql: add memory accounting to mergeJoiner r=yuzefovich a=yuzefovich Resolves: #26896. Adds accounting of memory allocated during computation of cartesion product of equal rows and adds the max allocated memory to the visual query plan stats. Release note: None Co-authored-by: yuzefovich <yahor@cockroachlabs.com>
2 parents f507674 + a7e79a7 commit 63b11a9

File tree

6 files changed

+105
-37
lines changed

6 files changed

+105
-37
lines changed

pkg/sql/distsqlrun/mergejoiner.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ import (
1919
"errors"
2020
"sync"
2121

22+
"fmt"
23+
2224
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
2325
"github.com/cockroachdb/cockroach/pkg/util"
26+
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
2427
"github.com/cockroachdb/cockroach/pkg/util/tracing"
2528
"github.com/opentracing/opentracing-go"
2629
)
@@ -84,18 +87,25 @@ func newMergeJoiner(
8487
spec.Type, spec.OnExpr, leftEqCols, rightEqCols, 0, post, output,
8588
procStateOpts{
8689
inputsToDrain: []RowSource{leftSource, rightSource},
90+
trailingMetaCallback: func() []ProducerMetadata {
91+
m.close()
92+
return nil
93+
},
8794
},
8895
); err != nil {
8996
return nil, err
9097
}
9198

99+
m.memMonitor = newMonitor(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, "mergejoiner-mem")
100+
92101
var err error
93102
m.streamMerger, err = makeStreamMerger(
94103
m.leftSource,
95104
convertToColumnOrdering(spec.LeftOrdering),
96105
m.rightSource,
97106
convertToColumnOrdering(spec.RightOrdering),
98107
spec.NullEquality,
108+
m.memMonitor,
99109
)
100110
if err != nil {
101111
return nil, err
@@ -248,10 +258,18 @@ func (m *mergeJoiner) ConsumerDone() {
248258
m.moveToDraining(nil /* err */)
249259
}
250260

261+
func (m *mergeJoiner) close() {
262+
if m.internalClose() {
263+
ctx := m.evalCtx.Ctx()
264+
m.streamMerger.close(ctx)
265+
m.memMonitor.Stop(ctx)
266+
}
267+
}
268+
251269
// ConsumerClosed is part of the RowSource interface.
252270
func (m *mergeJoiner) ConsumerClosed() {
253271
// The consumer is done, Next() will not be called again.
254-
m.internalClose()
272+
m.close()
255273
}
256274

257275
var _ DistSQLSpanStats = &MergeJoinerStats{}
@@ -267,15 +285,17 @@ func (mjs *MergeJoinerStats) Stats() map[string]string {
267285
for k, v := range rightInputStatsMap {
268286
statsMap[k] = v
269287
}
288+
statsMap[mergeJoinerTagPrefix+maxMemoryTagSuffix] = humanizeutil.IBytes(mjs.MaxAllocatedMem)
270289
return statsMap
271290
}
272291

273292
// StatsForQueryPlan implements the DistSQLSpanStats interface.
274293
func (mjs *MergeJoinerStats) StatsForQueryPlan() []string {
275-
return append(
294+
stats := append(
276295
mjs.LeftInputStats.StatsForQueryPlan("left "),
277296
mjs.RightInputStats.StatsForQueryPlan("right ")...,
278297
)
298+
return append(stats, fmt.Sprintf("%s: %s", maxMemoryQueryPlanSuffix, humanizeutil.IBytes(mjs.MaxAllocatedMem)))
279299
}
280300

281301
// outputStatsToTrace outputs the collected mergeJoiner stats to the trace. Will
@@ -295,6 +315,7 @@ func (m *mergeJoiner) outputStatsToTrace() {
295315
&MergeJoinerStats{
296316
LeftInputStats: lis,
297317
RightInputStats: ris,
318+
MaxAllocatedMem: m.memMonitor.MaximumBytes(),
298319
},
299320
)
300321
}

pkg/sql/distsqlrun/stats.pb.go

Lines changed: 59 additions & 31 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/sql/distsqlrun/stats.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ message DistinctStats {
5858
message MergeJoinerStats {
5959
InputStats left_input_stats = 1 [(gogoproto.nullable) = false];
6060
InputStats right_input_stats = 2 [(gogoproto.nullable) = false];
61+
int64 max_allocated_mem = 3;
6162
}
6263

6364
// SorterStats are the stats collected during a sorter run.

pkg/sql/distsqlrun/stream_group_accumulator.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2121
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
22+
"github.com/cockroachdb/cockroach/pkg/util/mon"
2223
"github.com/pkg/errors"
2324
)
2425

@@ -42,15 +43,18 @@ type streamGroupAccumulator struct {
4243
leftoverRow sqlbase.EncDatumRow
4344

4445
rowAlloc sqlbase.EncDatumRowAlloc
46+
47+
memAcc mon.BoundAccount
4548
}
4649

4750
func makeStreamGroupAccumulator(
48-
src RowSource, ordering sqlbase.ColumnOrdering,
51+
src RowSource, ordering sqlbase.ColumnOrdering, memMonitor *mon.BytesMonitor,
4952
) streamGroupAccumulator {
5053
return streamGroupAccumulator{
5154
src: src,
5255
types: src.OutputTypes(),
5356
ordering: ordering,
57+
memAcc: memMonitor.MakeBoundAccount(),
5458
}
5559
}
5660

@@ -84,6 +88,9 @@ func (s *streamGroupAccumulator) nextGroup(
8488
return s.curGroup, nil
8589
}
8690

91+
if err := s.memAcc.Grow(evalCtx.Ctx(), int64(row.Size())); err != nil {
92+
return nil, &ProducerMetadata{Err: err}
93+
}
8794
row = s.rowAlloc.CopyRow(row)
8895

8996
if len(s.curGroup) == 0 {
@@ -115,3 +122,7 @@ func (s *streamGroupAccumulator) nextGroup(
115122
}
116123
}
117124
}
125+
126+
func (s *streamGroupAccumulator) close(ctx context.Context) {
127+
s.memAcc.Close(ctx)
128+
}

pkg/sql/distsqlrun/stream_merger.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2121
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
2222
"github.com/cockroachdb/cockroach/pkg/util/encoding"
23+
"github.com/cockroachdb/cockroach/pkg/util/mon"
2324
"github.com/pkg/errors"
2425
)
2526

@@ -156,6 +157,11 @@ func CompareEncDatumRowForMerge(
156157
return 0, nil
157158
}
158159

160+
func (sm *streamMerger) close(ctx context.Context) {
161+
sm.left.close(ctx)
162+
sm.right.close(ctx)
163+
}
164+
159165
// makeStreamMerger creates a streamMerger, joining rows from leftSource with
160166
// rows from rightSource.
161167
//
@@ -166,6 +172,7 @@ func makeStreamMerger(
166172
rightSource RowSource,
167173
rightOrdering sqlbase.ColumnOrdering,
168174
nullEquality bool,
175+
memMonitor *mon.BytesMonitor,
169176
) (streamMerger, error) {
170177
if len(leftOrdering) != len(rightOrdering) {
171178
return streamMerger{}, errors.Errorf(
@@ -178,8 +185,8 @@ func makeStreamMerger(
178185
}
179186

180187
return streamMerger{
181-
left: makeStreamGroupAccumulator(leftSource, leftOrdering),
182-
right: makeStreamGroupAccumulator(rightSource, rightOrdering),
188+
left: makeStreamGroupAccumulator(leftSource, leftOrdering, memMonitor),
189+
right: makeStreamGroupAccumulator(rightSource, rightOrdering, memMonitor),
183190
nullEquality: nullEquality,
184191
}, nil
185192
}

pkg/sql/logictest/testdata/planner_test/distsql_auto_mode

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ INSERT INTO kw VALUES (1,1);
139139
query T
140140
SELECT "URL" FROM [EXPLAIN ANALYZE (DISTSQL) SELECT kv.k, avg(kw.k) FROM kv JOIN kw ON kv.k=kw.k GROUP BY kv.k]
141141
----
142-
https://cockroachdb.github.io/distsqlplan/decode.html?eJysk0Fv1DAQhe_8CmtOW2Fp46QnS0hZLqhQElQVJISiysRDsEjiaOzQVtX-dxT7sCRsFhb1tn6eb_zezOYJequxUB06kF9AQMVhIFujc5YmKRZc6QeQCQfTD6Of5IpDbQlBPoE3vkWQcKu-tniDSiNtE-Cg0SvThrYDmU7RY_7jJ3Age-8YodKSCeDgvGpb5k2HkiUOOJSjlywXUO052NEfHnReNQhS7Pn_mRIrpu6fwVS6aurgZewtaSTUMx_VRP6t5Eiy90gNvrWmR9qm82QtfvObXLy8eEWm-R5_Ag8yWwQN2h9pA7YsjeLaZHierg4nO2dju6YhbJS3tM3msfLJRRnnE5bBYVd8vivK27vi4_X1JhdTyt2nN5s8vfiHnXbqgXXYWXpko5taioS9M69XY1yeE-MG3WB7h8tdH-2cTAtG3WD8wzg7Uo0fyNbhmXgsAxcEjc7H2zQervpwFb6M32FxBpwu4fQknM3gZAlnJ-HLBVztX_wKAAD__5wEeH8=
142+
https://cockroachdb.github.io/distsqlplan/decode.html?eJysk0Fv1DAQhe_8CmtOW2FpY29PlpCyXFChbFBVkBCKKhMPwSKJo7FDW1X731Hsw5Jss7CIW_w8n_3ejPMEnTO40y16UF9AQMmhJ1eh945GKRVcmQdQGQfb9UMY5ZJD5QhBPUGwoUFQcKu_NniD2iCtM-BgMGjbxGN7sq2mx_zHT-BA7t4zQm0UE8DBB900LNgWFcs8cCiGoFguoNxzcEM4XOiDrhGU2PN_MyUWTN3_B1Ny0dTBy9A5MkhoJj7KkfxTyTPJ3iPV-NbZDmktp8ka_BZWuXh58Yps_T19Ao8ymwWN2lHaiM1Lk3hU2-oH1mLr6JENHo1iMmPv7OtDz3guF9u2OWeW27omrHVwtN5MA-ejvyJ1Lo6Jw3b3-W5X3N7tPl5fr3Ix5t9-erPK5cVfTPsok0iZlmJcnhPjBn3vOo_zV_Dsydk4ejQ1pqfk3UAVfiBXxWvSsohcFAz6kHZlWlx1cSv-M7_D4gxYzmF5Et5M4GwOb07ClzO43L_4FQAA__-EfYCR
143143

144144
# This query verifies stats collection for the hashJoiner, distinct and sorter.
145145
query T

0 commit comments

Comments
 (0)