Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into add-output-old-va…
Browse files Browse the repository at this point in the history
…lue-config
  • Loading branch information
sdojjy committed Apr 19, 2024
2 parents 4dc1f1d + 0ffc857 commit 171311f
Show file tree
Hide file tree
Showing 60 changed files with 1,293 additions and 896 deletions.
99 changes: 99 additions & 0 deletions cdc/async/async_initializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package async

import (
"context"
"sync"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/workerpool"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// Initializer is a helper struct to initialize a changefeed asynchronously.
type Initializer struct {
// state related fields
initialized *atomic.Bool
initializing *atomic.Bool
initError *atomic.Error

// func to cancel the changefeed initialization
cancelInitialize context.CancelFunc
initialWaitGroup sync.WaitGroup

initFunc func(ctx context.Context) error
}

// NewInitializer creates a new initializer.
func NewInitializer(initFunc func(ctx context.Context) error) *Initializer {
return &Initializer{
initialized: atomic.NewBool(false),
initializing: atomic.NewBool(false),
initError: atomic.NewError(nil),
initFunc: initFunc,
}
}

// TryInitialize tries to initialize the module asynchronously.
// returns true if the module is already initialized or initialized successfully.
// returns false if the module is initializing or failed to initialize.
// returns error if the module failed to initialize.
// It will only initialize the module once.
// It's not thread-safe.
func (initializer *Initializer) TryInitialize(ctx context.Context, pool workerpool.AsyncPool) (bool, error) {
if initializer.initialized.Load() {
return true, nil
}
if initializer.initializing.CompareAndSwap(false, true) {
initialCtx, cancelInitialize := context.WithCancel(ctx)
initializer.initialWaitGroup.Add(1)
initializer.cancelInitialize = cancelInitialize
log.Info("submit async initializer task to the worker pool")
err := pool.Go(initialCtx, func() {
defer initializer.initialWaitGroup.Done()
if err := initializer.initFunc(initialCtx); err != nil {
initializer.initError.Store(errors.Trace(err))
} else {
initializer.initialized.Store(true)
}
})
if err != nil {
log.Error("failed to submit async initializer task to the worker pool", zap.Error(err))
initializer.initialWaitGroup.Done()
return false, errors.Trace(err)
}
}
if initializer.initError.Load() != nil {
return false, errors.Trace(initializer.initError.Load())
}
return initializer.initialized.Load(), nil
}

// Terminate terminates the initializer.
// It will cancel the initialization if it is initializing. and wait for the initialization to finish.
// It's not thread-safe.
func (initializer *Initializer) Terminate() {
if initializer.initializing.Load() {
if initializer.cancelInitialize != nil {
initializer.cancelInitialize()
}
initializer.initialWaitGroup.Wait()
}
initializer.initializing.Store(false)
initializer.initialized.Store(false)
initializer.initError.Store(nil)
}
124 changes: 124 additions & 0 deletions cdc/async/async_initializer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package async

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/vars"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

type fakePool struct {
f func()
submitErr error
submitTimes int
}

func (f *fakePool) Go(_ context.Context, fn func()) error {
f.f = fn
f.submitTimes++
return f.submitErr
}

func (f *fakePool) Run(_ context.Context) error {
return nil
}

func TestTryInitialize(t *testing.T) {
initializer := NewInitializer(func(ctx context.Context) error {
return nil
})
pool := &vars.NonAsyncPool{}
initialized, err := initializer.TryInitialize(context.Background(), pool)
require.Nil(t, err)
require.True(t, initialized)
// Try to initialize again
initialized, err = initializer.TryInitialize(context.Background(), pool)
require.Nil(t, err)
require.True(t, initialized)
// init failed
initializer = NewInitializer(func(ctx context.Context) error {
return errors.New("failed to init")
})
initialized, err = initializer.TryInitialize(context.Background(), pool)
require.NotNil(t, err)
require.False(t, initializer.initialized.Load())
require.True(t, initializer.initializing.Load())
require.False(t, initialized)
initialized, err = initializer.TryInitialize(context.Background(), pool)
require.NotNil(t, err)
require.False(t, initializer.initialized.Load())
require.True(t, initializer.initializing.Load())
require.False(t, initialized)

// test submit error
initializer = NewInitializer(func(ctx context.Context) error {
return nil
})
initialized, err = initializer.TryInitialize(context.Background(), &fakePool{submitErr: errors.New("submit error")})
require.NotNil(t, err)
require.False(t, initialized)
require.False(t, initializer.initialized.Load())
require.True(t, initializer.initializing.Load())
}

func TestTerminate(t *testing.T) {
initializer := NewInitializer(func(ctx context.Context) error {
return nil
})
pool := &vars.NonAsyncPool{}
initialized, err := initializer.TryInitialize(context.Background(), pool)
require.Nil(t, err)
require.True(t, initialized)
initializer.Terminate()
require.False(t, initializer.initialized.Load())
require.False(t, initializer.initializing.Load())

// test submit error
initializer = NewInitializer(func(ctx context.Context) error {
return nil
})
fpool := &fakePool{}
initialized, err = initializer.TryInitialize(context.Background(), fpool)
require.Nil(t, err)
require.False(t, initialized)
require.True(t, initializer.initializing.Load())
require.Equal(t, 1, fpool.submitTimes)

initialized, err = initializer.TryInitialize(context.Background(), fpool)
require.Nil(t, err)
require.False(t, initialized)
require.True(t, initializer.initializing.Load())
require.Equal(t, 1, fpool.submitTimes)

wg := sync.WaitGroup{}
wg.Add(1)
terminated := atomic.NewInt32(1)
go func() {
defer wg.Done()
initializer.Terminate()
require.Equal(t, int32(2), terminated.Swap(3))
}()
require.Equal(t, int32(1), terminated.Swap(2))
time.Sleep(1 * time.Second)
fpool.f()
wg.Wait()
require.Equal(t, int32(3), terminated.Load())
}
30 changes: 24 additions & 6 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"github.com/pingcap/tiflow/pkg/workerpool"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
Expand All @@ -49,7 +50,11 @@ import (
"golang.org/x/time/rate"
)

const cleanMetaDuration = 10 * time.Second
const (
cleanMetaDuration = 10 * time.Second
// changefeedAsyncInitWorkerCount is the size of the worker pool for changefeed initialization processing.
changefeedAsyncInitWorkerCount = 8
)

// Capture represents a Capture server, it monitors the changefeed
// information in etcd and schedules Task on it.
Expand Down Expand Up @@ -99,6 +104,9 @@ type captureImpl struct {

sortEngineFactory *factory.SortEngineFactory

// ChangefeedThreadPool is the thread pool for changefeed initialization
ChangefeedThreadPool workerpool.AsyncPool

// MessageServer is the receiver of the messages from the other nodes.
// It should be recreated each time the capture is restarted.
MessageServer *p2p.MessageServer
Expand Down Expand Up @@ -279,12 +287,14 @@ func (c *captureImpl) reset(ctx context.Context) (*vars.GlobalVars, error) {
messageClientConfig.AdvertisedAddr = advertiseAddr

c.MessageRouter = p2p.NewMessageRouterWithLocalClient(c.info.ID, c.config.Security, messageClientConfig)
c.ChangefeedThreadPool = workerpool.NewDefaultAsyncPool(changefeedAsyncInitWorkerCount)
globalVars := &vars.GlobalVars{
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
MessageServer: c.MessageServer,
MessageRouter: c.MessageRouter,
SortEngineFactory: c.sortEngineFactory,
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
MessageServer: c.MessageServer,
MessageRouter: c.MessageRouter,
SortEngineFactory: c.sortEngineFactory,
ChangefeedThreadPool: c.ChangefeedThreadPool,
}
c.processorManager = c.newProcessorManager(
c.info, c.upstreamManager, &c.liveness, c.config.Debug.Scheduler, globalVars)
Expand Down Expand Up @@ -416,6 +426,14 @@ func (c *captureImpl) run(stdCtx context.Context) error {
return c.MessageServer.Run(stdCtx, c.MessageRouter.GetLocalChannel())
})

poolCtx, cancelPool := context.WithCancel(stdCtx)
defer func() {
cancelPool()
log.Info("workerpool exited", zap.Error(err))
}()
g.Go(func() error {
return c.ChangefeedThreadPool.Run(poolCtx)
})
return errors.Trace(g.Wait())
}

Expand Down
Loading

0 comments on commit 171311f

Please sign in to comment.