Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into add-output-old-va…
Browse files Browse the repository at this point in the history
…lue-config

# Conflicts:
#	pkg/sink/codec/open/open_protocol_encoder_test.go
  • Loading branch information
sdojjy committed Apr 23, 2024
2 parents 4c5e279 + 5ebadff commit 0576660
Show file tree
Hide file tree
Showing 63 changed files with 1,666 additions and 2,589 deletions.
3 changes: 0 additions & 3 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build intest
// +build intest

package entry

import (
Expand Down
3 changes: 0 additions & 3 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build intest
// +build intest

package entry

import (
Expand Down
3 changes: 0 additions & 3 deletions cdc/entry/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build intest
// +build intest

package entry

import (
Expand Down
6 changes: 3 additions & 3 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (

// SchemaTestHelper is a test helper for schema which creates an internal tidb instance to generate DDL jobs with meta information
type SchemaTestHelper struct {
t *testing.T
t testing.TB
tk *testkit.TestKit
storage kv.Storage
domain *domain.Domain
Expand All @@ -54,7 +54,7 @@ type SchemaTestHelper struct {
// NewSchemaTestHelperWithReplicaConfig creates a SchemaTestHelper
// by using the given replica config.
func NewSchemaTestHelperWithReplicaConfig(
t *testing.T, replicaConfig *config.ReplicaConfig,
t testing.TB, replicaConfig *config.ReplicaConfig,
) *SchemaTestHelper {
store, err := mockstore.NewMockStore()
require.NoError(t, err)
Expand Down Expand Up @@ -101,7 +101,7 @@ func NewSchemaTestHelperWithReplicaConfig(
}

// NewSchemaTestHelper creates a SchemaTestHelper
func NewSchemaTestHelper(t *testing.T) *SchemaTestHelper {
func NewSchemaTestHelper(t testing.TB) *SchemaTestHelper {
return NewSchemaTestHelperWithReplicaConfig(t, config.GetDefaultReplicaConfig())
}

Expand Down
1 change: 1 addition & 0 deletions cdc/kv/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/pingcap/tiflow/pkg/workerpool.(*worker).run"),
goleak.IgnoreTopFunction("sync.runtime_Semacquire"),
goleak.IgnoreTopFunction("github.com/pingcap/goleveldb/leveldb.(*DB).mpoolDrain"),
}

leakutil.SetUpLeakTest(m, opts...)
Expand Down
127 changes: 12 additions & 115 deletions cdc/kv/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package kv

import (
"runtime"
"sync"

"github.com/pingcap/tiflow/cdc/kv/regionlock"
Expand All @@ -23,9 +22,6 @@ import (
)

const (
minRegionStateBucket = 4
maxRegionStateBucket = 16

stateNormal uint32 = 0
stateStopped uint32 = 1
stateRemoved uint32 = 2
Expand All @@ -45,12 +41,13 @@ type regionInfo struct {

// The table that the region belongs to.
subscribedTable *subscribedTable
lockedRange *regionlock.LockedRange
// The state of the locked range of the region.
lockedRangeState *regionlock.LockedRangeState
}

func (s regionInfo) isStoped() bool {
// lockedRange only nil when the region's subscribedTable is stopped.
return s.lockedRange == nil
return s.lockedRangeState == nil
}

func newRegionInfo(
Expand All @@ -68,7 +65,7 @@ func newRegionInfo(
}

func (s regionInfo) resolvedTs() uint64 {
return s.lockedRange.ResolvedTs.Load()
return s.lockedRangeState.ResolvedTs.Load()
}

type regionErrorInfo struct {
Expand Down Expand Up @@ -149,24 +146,24 @@ func (s *regionFeedState) takeError() (err error) {
}

func (s *regionFeedState) isInitialized() bool {
return s.region.lockedRange.Initialzied.Load()
return s.region.lockedRangeState.Initialzied.Load()
}

func (s *regionFeedState) setInitialized() {
s.region.lockedRange.Initialzied.Store(true)
s.region.lockedRangeState.Initialzied.Store(true)
}

func (s *regionFeedState) getRegionID() uint64 {
return s.region.verID.GetID()
}

func (s *regionFeedState) getLastResolvedTs() uint64 {
return s.region.lockedRange.ResolvedTs.Load()
return s.region.lockedRangeState.ResolvedTs.Load()
}

// updateResolvedTs update the resolved ts of the current region feed
func (s *regionFeedState) updateResolvedTs(resolvedTs uint64) {
state := s.region.lockedRange
state := s.region.lockedRangeState
for {
last := state.ResolvedTs.Load()
if last > resolvedTs {
Expand All @@ -176,12 +173,13 @@ func (s *regionFeedState) updateResolvedTs(resolvedTs uint64) {
break
}
}

if s.region.subscribedTable != nil {
s.region.subscribedTable.postUpdateRegionResolvedTs(
// When resolvedTs is received, we need to try to resolve the lock of the region.
// Because the updated resolvedTs may less than the target resolvedTs we want advance to.
s.region.subscribedTable.tryResolveLock(
s.region.verID.GetID(),
s.region.verID.GetVer(),
state,
s.region.span,
)
}
}
Expand All @@ -193,104 +191,3 @@ func (s *regionFeedState) getRegionInfo() regionInfo {
func (s *regionFeedState) getRegionMeta() (uint64, tablepb.Span, string) {
return s.region.verID.GetID(), s.region.span, s.region.rpcCtx.Addr
}

type syncRegionFeedStateMap struct {
mu sync.RWMutex
// statesInternal is an internal field and must not be accessed from outside.
statesInternal map[uint64]*regionFeedState
}

func newSyncRegionFeedStateMap() *syncRegionFeedStateMap {
return &syncRegionFeedStateMap{
mu: sync.RWMutex{},
statesInternal: make(map[uint64]*regionFeedState),
}
}

func (m *syncRegionFeedStateMap) iter(fn func(requestID uint64, state *regionFeedState) bool) {
m.mu.Lock()
defer m.mu.Unlock()
for requestID, state := range m.statesInternal {
if !fn(requestID, state) {
break
}
}
}

func (m *syncRegionFeedStateMap) setByRequestID(requestID uint64, state *regionFeedState) {
m.mu.Lock()
defer m.mu.Unlock()
m.statesInternal[requestID] = state
}

func (m *syncRegionFeedStateMap) setByRegionID(regionID uint64, state *regionFeedState) {
m.mu.Lock()
defer m.mu.Unlock()
m.statesInternal[regionID] = state
}

func (m *syncRegionFeedStateMap) getByRegionID(regionID uint64) (*regionFeedState, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
result, ok := m.statesInternal[regionID]
return result, ok
}

func (m *syncRegionFeedStateMap) delByRegionID(regionID uint64) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.statesInternal, regionID)
}

type regionStateManagerInterface interface {
getState(regionID uint64) (*regionFeedState, bool)
setState(regionID uint64, state *regionFeedState)
delState(regionID uint64)
}

// regionStateManager provides the get/put way like a sync.Map, and it is divided
// into several buckets to reduce lock contention
type regionStateManager struct {
bucket int
states []*syncRegionFeedStateMap
}

func newRegionStateManager(bucket int) *regionStateManager {
if bucket <= 0 {
bucket = runtime.NumCPU()
if bucket > maxRegionStateBucket {
bucket = maxRegionStateBucket
}
if bucket < minRegionStateBucket {
bucket = minRegionStateBucket
}
}
rsm := &regionStateManager{
bucket: bucket,
states: make([]*syncRegionFeedStateMap, bucket),
}
for i := range rsm.states {
rsm.states[i] = newSyncRegionFeedStateMap()
}
return rsm
}

func (rsm *regionStateManager) getBucket(regionID uint64) int {
return int(regionID) % rsm.bucket
}

func (rsm *regionStateManager) getState(regionID uint64) (*regionFeedState, bool) {
bucket := rsm.getBucket(regionID)
state, ok := rsm.states[bucket].getByRegionID(regionID)
return state, ok
}

func (rsm *regionStateManager) setState(regionID uint64, state *regionFeedState) {
bucket := rsm.getBucket(regionID)
rsm.states[bucket].setByRegionID(regionID, state)
}

func (rsm *regionStateManager) delState(regionID uint64) {
bucket := rsm.getBucket(regionID)
rsm.states[bucket].delByRegionID(regionID)
}
Loading

0 comments on commit 0576660

Please sign in to comment.