Skip to content

Commit

Permalink
sorter: Stabilize Unified Sorter (#1210) (#1231)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Dec 24, 2020
1 parent caf4db8 commit 174e806
Show file tree
Hide file tree
Showing 36 changed files with 2,155 additions and 191 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ docker/logs

# Binary file when running intergration test
integration/integration

*.tmp
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ endif
ARCH := "`uname -s`"
LINUX := "Linux"
MAC := "Darwin"
PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto|ticdc\/tests|integration'
PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto|ticdc\/tests|integration|testing_utils'
PACKAGES := $$($(PACKAGE_LIST))
PACKAGE_DIRECTORIES := $(PACKAGE_LIST) | sed 's|github.com/pingcap/$(PROJECT)/||'
FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor|kv_gen|proto')
TEST_FILES := $$(find . -name '*_test.go' -type f | grep -vE 'vendor|kv_gen|integration')
TEST_FILES := $$(find . -name '*_test.go' -type f | grep -vE 'vendor|kv_gen|integration|testing_utils')
CDC_PKG := github.com/pingcap/ticdc
FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/pingcap/$(PROJECT)/"}|grep -v "github.com/pingcap/$(PROJECT)"; done)
FAILPOINT := bin/failpoint-ctl
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (info *ChangeFeedInfo) Unmarshal(data []byte) error {
func (info *ChangeFeedInfo) VerifyAndFix() error {
defaultConfig := config.GetDefaultReplicaConfig()
if info.Engine == "" {
info.Engine = SortInMemory
info.Engine = SortUnified
}
if info.Config.Filter == nil {
info.Config.Filter = defaultConfig.Filter
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) {

err := info.VerifyAndFix()
c.Assert(err, check.IsNil)
c.Assert(info.Engine, check.Equals, SortInMemory)
c.Assert(info.Engine, check.Equals, SortUnified)

marshalConfig1, err := info.Config.Marshal()
c.Assert(err, check.IsNil)
Expand Down
108 changes: 99 additions & 9 deletions cdc/puller/sorter/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"reflect"
"runtime/debug"
"sync"
Expand All @@ -29,9 +30,14 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/config"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"go.uber.org/zap"
)

const (
backgroundJobInterval = time.Second * 5
)

var (
pool *backEndPool // this is the singleton instance of backEndPool
poolMu sync.Mutex // this mutex is for delayed initialization of `pool` only
Expand All @@ -44,24 +50,39 @@ type backEndPool struct {
memPressure int32
cache [256]unsafe.Pointer
dir string
filePrefix string

// cancelCh needs to be unbuffered to prevent races
cancelCh chan struct{}
// cancelRWLock protects cache against races when the backEnd is exiting
cancelRWLock sync.RWMutex
isTerminating bool
}

func newBackEndPool(dir string, captureAddr string) *backEndPool {
ret := &backEndPool{
memoryUseEstimate: 0,
fileNameCounter: 0,
dir: dir,
cancelCh: make(chan struct{}),
filePrefix: fmt.Sprintf("%s/sort-%d-", dir, os.Getpid()),
}

go func() {
ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(backgroundJobInterval)
defer ticker.Stop()

metricSorterInMemoryDataSizeGauge := sorterInMemoryDataSizeGauge.WithLabelValues(captureAddr)
metricSorterOnDiskDataSizeGauge := sorterOnDiskDataSizeGauge.WithLabelValues(captureAddr)
metricSorterOpenFileCountGauge := sorterOpenFileCountGauge.WithLabelValues(captureAddr)

for {
<-ticker.C
select {
case <-ret.cancelCh:
log.Info("Unified Sorter backEnd is being cancelled")
return
case <-ticker.C:
}

metricSorterInMemoryDataSizeGauge.Set(float64(atomic.LoadInt64(&ret.memoryUseEstimate)))
metricSorterOnDiskDataSizeGauge.Set(float64(atomic.LoadInt64(&ret.onDiskDataSize)))
Expand All @@ -79,9 +100,10 @@ func newBackEndPool(dir string, captureAddr string) *backEndPool {

memPressure := m.Used * 100 / m.Total
atomic.StoreInt32(&ret.memPressure, int32(memPressure))
if memPressure > 50 {
log.Debug("unified sorter: high memory pressure", zap.Uint64("memPressure", memPressure),
zap.Int64("usedBySorter", atomic.LoadInt64(&ret.memoryUseEstimate)))

if memPressure := ret.memoryPressure(); memPressure > 50 {
log.Debug("unified sorter: high memory pressure", zap.Int32("memPressure", memPressure),
zap.Int64("usedBySorter", ret.sorterMemoryUsage()))
// Increase GC frequency to avoid necessary OOM
debug.SetGCPercent(10)
} else {
Expand Down Expand Up @@ -117,13 +139,20 @@ func newBackEndPool(dir string, captureAddr string) *backEndPool {

func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) {
sorterConfig := config.GetSorterConfig()
if atomic.LoadInt64(&p.memoryUseEstimate) < int64(sorterConfig.MaxMemoryConsumption) &&
atomic.LoadInt32(&p.memPressure) < int32(sorterConfig.MaxMemoryPressure) {
if p.sorterMemoryUsage() < int64(sorterConfig.MaxMemoryConsumption) &&
p.memoryPressure() < int32(sorterConfig.MaxMemoryPressure) {

ret := newMemoryBackEnd()
return ret, nil
}

p.cancelRWLock.RLock()
defer p.cancelRWLock.RUnlock()

if p.isTerminating {
return nil, cerrors.ErrUnifiedSorterBackendTerminating.GenWithStackByArgs()
}

for i := range p.cache {
ptr := &p.cache[i]
ret := atomic.SwapPointer(ptr, nil)
Expand All @@ -132,8 +161,10 @@ func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) {
}
}

fname := fmt.Sprintf("%s/sort-%d-%d", p.dir, os.Getpid(), atomic.AddUint64(&p.fileNameCounter, 1))
log.Debug("Unified Sorter: trying to create file backEnd", zap.String("filename", fname))
fname := fmt.Sprintf("%s%d.tmp", p.filePrefix, atomic.AddUint64(&p.fileNameCounter, 1))
log.Debug("Unified Sorter: trying to create file backEnd",
zap.String("filename", fname),
zap.String("table", tableNameFromCtx(ctx)))

ret, err := newFileBackEnd(fname, &msgPackGenSerde{})
if err != nil {
Expand All @@ -155,6 +186,13 @@ func (p *backEndPool) dealloc(backEnd backEnd) error {
failpoint.Return(nil)
}
})
p.cancelRWLock.RLock()
defer p.cancelRWLock.RUnlock()

if p.isTerminating {
return cerrors.ErrUnifiedSorterBackendTerminating.GenWithStackByArgs()
}

for i := range p.cache {
ptr := &p.cache[i]
if atomic.CompareAndSwapPointer(ptr, nil, unsafe.Pointer(b)) {
Expand All @@ -173,3 +211,55 @@ func (p *backEndPool) dealloc(backEnd backEnd) error {
}
return nil
}

func (p *backEndPool) terminate() {
p.cancelCh <- struct{}{}
defer close(p.cancelCh)
// the background goroutine can be considered terminated here

p.cancelRWLock.Lock()
defer p.cancelRWLock.Unlock()
p.isTerminating = true

// any new allocs and deallocs will not succeed from this point
// accessing p.cache without atomics is safe from now

for i := range p.cache {
ptr := &p.cache[i]
backend := (*fileBackEnd)(*ptr)
if backend == nil {
continue
}
_ = backend.free()
}

if p.filePrefix == "" {
// This should not happen. But to prevent accidents in production, we add this anyway.
log.Panic("Empty filePrefix, please report a bug")
}

files, err := filepath.Glob(p.filePrefix + "*")
if err != nil {
log.Warn("Unified Sorter clean-up failed", zap.Error(err))
}
for _, file := range files {
err = os.RemoveAll(file)
if err != nil {
log.Warn("Unified Sorter clean-up failed: failed to remove", zap.String("file-name", file), zap.Error(err))
}
}
}

func (p *backEndPool) sorterMemoryUsage() int64 {
failpoint.Inject("memoryUsageInjectPoint", func(val failpoint.Value) {
failpoint.Return(int64(val.(int)))
})
return atomic.LoadInt64(&p.memoryUseEstimate)
}

func (p *backEndPool) memoryPressure() int32 {
failpoint.Inject("memoryPressureInjectPoint", func(val failpoint.Value) {
failpoint.Return(int32(val.(int)))
})
return atomic.LoadInt32(&p.memPressure)
}
153 changes: 153 additions & 0 deletions cdc/puller/sorter/backend_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sorter

import (
"context"
"os"
"strconv"
"testing"
"time"

"github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/util/testleak"
)

func TestSuite(t *testing.T) { check.TestingT(t) }

type backendPoolSuite struct{}

var _ = check.Suite(&backendPoolSuite{})

func (s *backendPoolSuite) TestBasicFunction(c *check.C) {
defer testleak.AfterTest(c)()

err := os.MkdirAll("/tmp/sorter", 0o755)
c.Assert(err, check.IsNil)

config.SetSorterConfig(&config.SorterConfig{
MaxMemoryPressure: 90, // 90%
MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, // 16G
})

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint", "return(100)")
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()

backEndPool := newBackEndPool("/tmp/sorter", "")
c.Assert(backEndPool, check.NotNil)
defer backEndPool.terminate()

backEnd, err := backEndPool.alloc(ctx)
c.Assert(err, check.IsNil)
c.Assert(backEnd, check.FitsTypeOf, &fileBackEnd{})
fileName := backEnd.(*fileBackEnd).fileName
c.Assert(fileName, check.Not(check.Equals), "")

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint", "return(0)")
c.Assert(err, check.IsNil)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryUsageInjectPoint", "return(34359738368)")
c.Assert(err, check.IsNil)

backEnd1, err := backEndPool.alloc(ctx)
c.Assert(err, check.IsNil)
c.Assert(backEnd1, check.FitsTypeOf, &fileBackEnd{})
fileName1 := backEnd1.(*fileBackEnd).fileName
c.Assert(fileName1, check.Not(check.Equals), "")
c.Assert(fileName1, check.Not(check.Equals), fileName)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint", "return(0)")
c.Assert(err, check.IsNil)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryUsageInjectPoint", "return(0)")
c.Assert(err, check.IsNil)

backEnd2, err := backEndPool.alloc(ctx)
c.Assert(err, check.IsNil)
c.Assert(backEnd2, check.FitsTypeOf, &memoryBackEnd{})

err = backEndPool.dealloc(backEnd)
c.Assert(err, check.IsNil)

err = backEndPool.dealloc(backEnd1)
c.Assert(err, check.IsNil)

err = backEndPool.dealloc(backEnd2)
c.Assert(err, check.IsNil)

time.Sleep(backgroundJobInterval * 3 / 2)

_, err = os.Stat(fileName)
c.Assert(os.IsNotExist(err), check.IsTrue)

_, err = os.Stat(fileName1)
c.Assert(os.IsNotExist(err), check.IsTrue)
}

func (s *backendPoolSuite) TestCleanUp(c *check.C) {
defer testleak.AfterTest(c)()

err := os.MkdirAll("/tmp/sorter", 0o755)
c.Assert(err, check.IsNil)

config.SetSorterConfig(&config.SorterConfig{
MaxMemoryPressure: 90, // 90%
MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, // 16G
})

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint", "return(100)")
c.Assert(err, check.IsNil)

backEndPool := newBackEndPool("/tmp/sorter", "")
c.Assert(backEndPool, check.NotNil)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()

var fileNames []string
for i := 0; i < 20; i++ {
backEnd, err := backEndPool.alloc(ctx)
c.Assert(err, check.IsNil)
c.Assert(backEnd, check.FitsTypeOf, &fileBackEnd{})

fileName := backEnd.(*fileBackEnd).fileName
_, err = os.Stat(fileName)
c.Assert(err, check.IsNil)

fileNames = append(fileNames, fileName)
}

prefix := backEndPool.filePrefix
c.Assert(prefix, check.Not(check.Equals), "")

for j := 100; j < 120; j++ {
fileName := prefix + strconv.Itoa(j) + ".tmp"
f, err := os.Create(fileName)
c.Assert(err, check.IsNil)
err = f.Close()
c.Assert(err, check.IsNil)

fileNames = append(fileNames, fileName)
}

backEndPool.terminate()

for _, fileName := range fileNames {
_, err = os.Stat(fileName)
c.Assert(os.IsNotExist(err), check.IsTrue)
}
}
3 changes: 2 additions & 1 deletion cdc/puller/sorter/file_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func newFileBackEnd(fileName string, serde serializerDeserializer) (*fileBackEnd
}

func (f *fileBackEnd) reader() (backEndReader, error) {
fd, err := os.OpenFile(f.fileName, os.O_RDONLY, 0o644)
fd, err := os.OpenFile(f.fileName, os.O_RDWR, 0o644)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -119,6 +119,7 @@ func (f *fileBackEnd) free() error {
}
})

log.Debug("Removing file", zap.String("file", f.fileName))
err := os.Remove(f.fileName)
if err != nil {
failpoint.Inject("sorterDebug", func() {
Expand Down
Loading

0 comments on commit 174e806

Please sign in to comment.