Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: implement disk-based hash join #12067

Merged
merged 14 commits into from
Sep 24, 2019
Prev Previous commit
Next Next commit
fix benchmark
  • Loading branch information
SunRunAway committed Sep 21, 2019
commit 2849e80185d981ce35aa0e64eb23b77cefe2d966
18 changes: 13 additions & 5 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,11 +557,6 @@ func prepare4Join(testCase *hashJoinTestCase, innerExec, outerExec Executor) *Ha
for _, keyIdx := range testCase.keyIdx {
joinKeys = append(joinKeys, cols0[keyIdx])
}
if testCase.disk {
testCase.ctx.GetSessionVars().MemQuotaHashJoin = 1
} else {
testCase.ctx.GetSessionVars().MemQuotaHashJoin = 32 * 1024 * 1024 * 1024
}
e := &HashJoinExec{
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, stringutil.StringerStr("HashJoin"), innerExec, outerExec),
concurrency: uint(testCase.concurrency),
Expand All @@ -580,6 +575,13 @@ func prepare4Join(testCase *hashJoinTestCase, innerExec, outerExec Executor) *Ha
e.joiners[i] = newJoiner(testCase.ctx, e.joinType, true, defaultValues,
nil, lhsTypes, rhsTypes)
}
memLimit := int64(-1)
if testCase.disk {
memLimit = 1
}
t := memory.NewTracker(stringutil.StringerStr("root of prepare4Join"), memLimit)
t.SetActionOnExceed(nil)
e.ctx.GetSessionVars().StmtCtx.MemTracker = t
return e
}

Expand Down Expand Up @@ -628,6 +630,9 @@ func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) {
b.Fatal(err)
}
b.StopTimer()
if exec.rowContainer.alreadySpilled() != casTest.disk {
b.Fatal("wrong usage with disk")
}
}
}

Expand Down Expand Up @@ -706,6 +711,9 @@ func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
b.Fatal(err)
}
b.StopTimer()
if exec.rowContainer.alreadySpilled() != casTest.disk {
b.Fatal("wrong usage with disk")
}
}
}

Expand Down
34 changes: 15 additions & 19 deletions executor/hash_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ func (s *pkgTestSuite) TestHashRowContainer(c *C) {
return h
}
s.testHashRowContainer(c, hashFuncCollision, false)
c.Check(h.count > 0, IsTrue)

c.Assert(h.count > 0, IsTrue)
}

func (s *pkgTestSuite) testHashRowContainer(c *C, hashFunc func() hash.Hash64, spill bool) {
Expand All @@ -151,19 +150,17 @@ func (s *pkgTestSuite) testHashRowContainer(c *C, hashFunc func() hash.Hash64, s
tracker.SetBytesLimit(1)
}
err = rowContainer.PutChunk(chk0)
c.Check(err, IsNil)
c.Assert(err, IsNil)
err = rowContainer.PutChunk(chk1)
c.Check(err, IsNil)

if spill {
c.Assert(rowContainer.alreadySpilled(), IsTrue)
c.Assert(rowContainer.alreadySpilledSafe(), IsTrue)
c.Check(rowContainer.GetMemTracker().BytesConsumed() == 0, Equals, true)
c.Check(rowContainer.GetDiskTracker().BytesConsumed() > 0, Equals, true)
} else {
c.Assert(rowContainer.alreadySpilled(), IsFalse)
c.Assert(rowContainer.alreadySpilledSafe(), IsFalse)
c.Check(rowContainer.GetMemTracker().BytesConsumed() > 0, Equals, true)
c.Assert(err, IsNil)

c.Assert(rowContainer.alreadySpilled(), Equals, spill)
c.Assert(rowContainer.alreadySpilledSafe(), Equals, spill)
c.Assert(rowContainer.GetMemTracker().BytesConsumed() == 0, Equals, spill)
c.Assert(rowContainer.GetMemTracker().BytesConsumed() > 0, Equals, !spill)
if rowContainer.alreadySpilled() {
c.Assert(rowContainer.GetDiskTracker(), NotNil)
c.Assert(rowContainer.GetDiskTracker().BytesConsumed() > 0, Equals, true)
}

probeChk, probeColType := initProbeChunk(2)
Expand All @@ -175,9 +172,8 @@ func (s *pkgTestSuite) testHashRowContainer(c *C, hashFunc func() hash.Hash64, s
probeCtx.hasNull = make([]bool, 1)
probeCtx.hashVals = append(hCtx.hashVals, hashFunc())
matched, err := rowContainer.GetMatchedRows(probeRow, probeCtx)
c.Check(err, IsNil)
if c.Check(len(matched), Equals, 2) {
c.Check(matched[0].GetDatumRow(colTypes), DeepEquals, chk0.GetRow(1).GetDatumRow(colTypes))
c.Check(matched[1].GetDatumRow(colTypes), DeepEquals, chk1.GetRow(1).GetDatumRow(colTypes))
}
c.Assert(err, IsNil)
c.Assert(len(matched), Equals, 2)
c.Assert(matched[0].GetDatumRow(colTypes), DeepEquals, chk0.GetRow(1).GetDatumRow(colTypes))
c.Assert(matched[1].GetDatumRow(colTypes), DeepEquals, chk1.GetRow(1).GetDatumRow(colTypes))
}
2 changes: 1 addition & 1 deletion executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
}

e.prepared = false
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaHashJoin)
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

e.closeCh = make(chan struct{})
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,8 @@ type Concurrency struct {
type MemQuota struct {
// MemQuotaQuery defines the memory quota for a query.
MemQuotaQuery int64

// TODO: remove them below sometime, it should have only one Quota(MemQuotaQuery).
// MemQuotaHashJoin defines the memory quota for a hash join executor.
MemQuotaHashJoin int64
// MemQuotaMergeJoin defines the memory quota for a merge join executor.
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const (
// "tidb_mem_quota_indexlookupjoin": control the memory quota of "IndexLookUpJoin".
// "tidb_mem_quota_nestedloopapply": control the memory quota of "NestedLoopApplyExec".
TIDBMemQuotaQuery = "tidb_mem_quota_query" // Bytes.
// TODO: remove them sometime, it should have only one Quota(TIDBMemQuotaQuery).
// TODO: remove them below sometime, it should have only one Quota(TIDBMemQuotaQuery).
TIDBMemQuotaHashJoin = "tidb_mem_quota_hashjoin" // Bytes.
TIDBMemQuotaMergeJoin = "tidb_mem_quota_mergejoin" // Bytes.
TIDBMemQuotaSort = "tidb_mem_quota_sort" // Bytes.
Expand Down