Skip to content

Commit

Permalink
Merge branch 'master' into zixiong-fix-sorter-block
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix authored Jun 10, 2021
2 parents 9cad119 + ef9f56e commit eb87468
Show file tree
Hide file tree
Showing 19 changed files with 115 additions and 81 deletions.
45 changes: 24 additions & 21 deletions cdc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,12 @@ func (s *Server) handleChangefeedAdmin(w http.ResponseWriter, req *http.Request)
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
}
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
} else {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
}

err := req.ParseForm()
Expand Down Expand Up @@ -182,11 +183,12 @@ func (s *Server) handleRebalanceTrigger(w http.ResponseWriter, req *http.Request
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
}
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
} else {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
}

err := req.ParseForm()
Expand Down Expand Up @@ -223,11 +225,12 @@ func (s *Server) handleMoveTable(w http.ResponseWriter, req *http.Request) {
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
}
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
} else {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
}

err := req.ParseForm()
Expand Down Expand Up @@ -277,12 +280,12 @@ func (s *Server) handleChangefeedQuery(w http.ResponseWriter, req *http.Request)
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
}

if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
} else {
if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
}

err := req.ParseForm()
Expand Down
2 changes: 2 additions & 0 deletions cdc/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ func (s *Server) handleStatus(w http.ResponseWriter, req *http.Request) {
st.ID = s.captureV2.Info().ID
st.IsOwner = s.captureV2.IsOwner()
}
writeData(w, st)
return
}
s.ownerLock.RLock()
defer s.ownerLock.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
// other components in TiCDC, including worker pool task chan size, mounter
// chan size etc.
// TODO: unified channel buffer mechanism
regionWorkerInputChanSize = 12800
regionWorkerInputChanSize = 128
regionWorkerLowWatermark = int(float64(regionWorkerInputChanSize) * 0.2)
regionWorkerHighWatermark = int(float64(regionWorkerInputChanSize) * 0.7)
)
Expand Down
60 changes: 32 additions & 28 deletions cdc/model/reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type GlobalReactorState struct {
Owner map[string]struct{}
Captures map[CaptureID]*CaptureInfo
Changefeeds map[ChangeFeedID]*ChangefeedReactorState
pendingPatches []orchestrator.DataPatch
pendingPatches [][]orchestrator.DataPatch
}

// NewGlobalState creates a new global state
Expand Down Expand Up @@ -90,7 +90,7 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro
return errors.Trace(err)
}
if value == nil && !changefeedState.Exist() {
s.pendingPatches = append(s.pendingPatches, changefeedState.GetPatches()...)
s.pendingPatches = append(s.pendingPatches, changefeedState.getPatches())
delete(s.Changefeeds, k.ChangefeedID)
}
default:
Expand All @@ -100,38 +100,15 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro
}

// GetPatches implements the ReactorState interface
func (s *GlobalReactorState) GetPatches() []orchestrator.DataPatch {
func (s *GlobalReactorState) GetPatches() [][]orchestrator.DataPatch {
pendingPatches := s.pendingPatches
for _, changefeedState := range s.Changefeeds {
pendingPatches = append(pendingPatches, changefeedState.GetPatches()...)
pendingPatches = append(pendingPatches, changefeedState.getPatches())
}
s.pendingPatches = nil
return pendingPatches
}

// CheckCaptureAlive checks if the capture is alive, if the capture offline,
// the etcd worker will exit and throw the ErrLeaseExpired error.
func (s *GlobalReactorState) CheckCaptureAlive(captureID CaptureID) {
k := etcd.CDCKey{
Tp: etcd.CDCKeyTypeCapture,
CaptureID: captureID,
}
key := k.String()
patch := &orchestrator.SingleDataPatch{
Key: util.NewEtcdKey(key),
Func: func(v []byte) ([]byte, bool, error) {
// If v is empty, it means that the key-value pair of capture info is not exist.
// The key-value pair of capture info is written with lease,
// so if the capture info is not exist, the lease is expired
if len(v) == 0 {
return v, false, cerrors.ErrLeaseExpired.GenWithStackByArgs()
}
return v, false, nil
},
}
s.pendingPatches = append(s.pendingPatches, patch)
}

// ChangefeedReactorState represents a changefeed state which stores all key-value pairs of a changefeed in ETCD
type ChangefeedReactorState struct {
ID ChangeFeedID
Expand Down Expand Up @@ -250,12 +227,39 @@ func (s *ChangefeedReactorState) Active(captureID CaptureID) bool {
}

// GetPatches implements the ReactorState interface
func (s *ChangefeedReactorState) GetPatches() []orchestrator.DataPatch {
func (s *ChangefeedReactorState) GetPatches() [][]orchestrator.DataPatch {
return [][]orchestrator.DataPatch{s.getPatches()}
}

func (s *ChangefeedReactorState) getPatches() []orchestrator.DataPatch {
pendingPatches := s.pendingPatches
s.pendingPatches = nil
return pendingPatches
}

// CheckCaptureAlive checks if the capture is alive, if the capture offline,
// the etcd worker will exit and throw the ErrLeaseExpired error.
func (s *ChangefeedReactorState) CheckCaptureAlive(captureID CaptureID) {
k := etcd.CDCKey{
Tp: etcd.CDCKeyTypeCapture,
CaptureID: captureID,
}
key := k.String()
patch := &orchestrator.SingleDataPatch{
Key: util.NewEtcdKey(key),
Func: func(v []byte) ([]byte, bool, error) {
// If v is empty, it means that the key-value pair of capture info is not exist.
// The key-value pair of capture info is written with lease,
// so if the capture info is not exist, the lease is expired
if len(v) == 0 {
return v, false, cerrors.ErrLeaseExpired.GenWithStackByArgs()
}
return v, false, nil
},
}
s.pendingPatches = append(s.pendingPatches, patch)
}

// CheckChangefeedNormal checks if the changefeed state is runable,
// if the changefeed status is not runable, the etcd worker will skip all patch of this tick
// the processor should call this function every tick to make sure the changefeed is runable
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/reactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var _ = check.Suite(&stateSuite{})

func (s *stateSuite) TestCheckCaptureAlive(c *check.C) {
defer testleak.AfterTest(c)()
state := NewGlobalState().(*GlobalReactorState)
state := NewChangefeedReactorState("test")
stateTester := orchestrator.NewReactorStateTester(c, state, nil)
state.CheckCaptureAlive("6bbc01c8-0605-4f86-a0f9-b3119109b225")
c.Assert(stateTester.ApplyPatches(), check.ErrorMatches, ".*[CDC:ErrLeaseExpired].*")
Expand Down
1 change: 1 addition & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (c *changefeed) Tick(ctx cdcContext.Context, state *model.ChangefeedReactor
c.errCh <- errors.Trace(err)
return nil
})
state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID)
if err := c.tick(ctx, state, captures); err != nil {
log.Error("an error occurred in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err))
var code string
Expand Down
1 change: 1 addition & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func createChangefeed4Test(ctx cdcContext.Context, c *check.C) (*changefeed, *mo
info = ctx.ChangefeedVars().Info
return info, true, nil
})
tester.MustUpdate("/tidb/cdc/capture/"+ctx.GlobalVars().CaptureInfo.ID, []byte(`{"id":"`+ctx.GlobalVars().CaptureInfo.ID+`","address":"127.0.0.1:8300"}`))
tester.MustApplyPatches()
captures := map[model.CaptureID]*model.CaptureInfo{ctx.GlobalVars().CaptureInfo.ID: ctx.GlobalVars().CaptureInfo}
return cf, state, captures, tester
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState)
ctx := stdCtx.(cdcContext.Context)
state := rawState.(*model.GlobalReactorState)
o.updateMetrics(state)
state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID)
err = o.gcManager.updateGCSafePoint(ctx, state)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -259,7 +258,7 @@ func (o *Owner) handleJobs() {
case ownerJobTypeRebalance:
cfReactor.scheduler.Rebalance()
case ownerJobTypeDebugInfo:
panic("unimplemented") // TODO
// TODO: implement this function
}
close(job.done)
}
Expand Down
1 change: 0 additions & 1 deletion cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func NewManager4Test(
func (m *Manager) Tick(stdCtx context.Context, state orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) {
ctx := stdCtx.(cdcContext.Context)
globalState := state.(*model.GlobalReactorState)
globalState.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID)
if err := m.handleCommand(); err != nil {
return state, err
}
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func newProcessor4Test(ctx cdcContext.Context,
// The main logic of processor is in this function, including the calculation of many kinds of ts, maintain table pipeline, error handling, etc.
func (p *processor) Tick(ctx cdcContext.Context, state *model.ChangefeedReactorState) (orchestrator.ReactorState, error) {
p.changefeed = state
state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID)
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: state.ID,
Info: state.Info,
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func initProcessor4Test(ctx cdcContext.Context, c *check.C) (*processor, *orches
})
p.changefeed = model.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
return p, orchestrator.NewReactorStateTester(c, p.changefeed, map[string]string{
"/tidb/cdc/capture/" + ctx.GlobalVars().CaptureInfo.ID: `{"id":"` + ctx.GlobalVars().CaptureInfo.ID + `","address":"127.0.0.1:8300"}`,
"/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":0,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":".","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`,
"/tidb/cdc/job/" + ctx.ChangefeedVars().ID: `{"resolved-ts":0,"checkpoint-ts":0,"admin-job-type":0}`,
"/tidb/cdc/task/status/" + ctx.GlobalVars().CaptureInfo.ID + "/" + ctx.ChangefeedVars().ID: `{"tables":{},"operation":null,"admin-job-type":0}`,
Expand Down
1 change: 1 addition & 0 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func (s *Server) run(ctx context.Context) (err error) {
return err
}
s.capture = capture
s.etcdClient = &capture.etcdClient
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
21 changes: 15 additions & 6 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,

watchCh := worker.client.Watch(ctx1, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))
var (
pendingPatches []DataPatch
pendingPatches [][]DataPatch
exiting bool
sessionDone <-chan struct{}
)
Expand Down Expand Up @@ -143,16 +143,13 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,

if len(pendingPatches) > 0 {
// Here we have some patches yet to be uploaded to Etcd.
err := worker.applyPatches(ctx, pendingPatches, session)
pendingPatches, err = worker.applyPatchGroups(ctx, pendingPatches)
if err != nil {
if cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) {
continue
}
return errors.Trace(err)
}
// If we are here, all patches have been successfully applied to Etcd.
// `applyPatches` is all-or-none, so in case of success, we should clear all the pendingPatches.
pendingPatches = pendingPatches[:0]
} else {
if exiting {
// If exiting is true here, it means that the reactor returned `ErrReactorFinished` last tick, and all pending patches is applied.
Expand Down Expand Up @@ -232,7 +229,19 @@ func (worker *EtcdWorker) cloneRawState() map[util.EtcdKey][]byte {
return ret
}

func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch, session *concurrency.Session) error {
func (worker *EtcdWorker) applyPatchGroups(ctx context.Context, patchGroups [][]DataPatch) ([][]DataPatch, error) {
for len(patchGroups) > 0 {
patches := patchGroups[0]
err := worker.applyPatches(ctx, patches)
if err != nil {
return patchGroups, err
}
patchGroups = patchGroups[1:]
}
return patchGroups, nil
}

func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) error {
state := worker.cloneRawState()
changedSet := make(map[util.EtcdKey]struct{})
for _, patch := range patches {
Expand Down
22 changes: 12 additions & 10 deletions pkg/orchestrator/etcd_worker_bank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
type bankReactorState struct {
c *check.C
account []int
pendingPatch []DataPatch
pendingPatch [][]DataPatch
index int
notFirstTick bool
}
Expand All @@ -47,7 +47,7 @@ func (b *bankReactorState) Update(key util.EtcdKey, value []byte, isInit bool) e
return nil
}

func (b *bankReactorState) GetPatches() []DataPatch {
func (b *bankReactorState) GetPatches() [][]DataPatch {
pendingPatches := b.pendingPatch
b.pendingPatch = nil
return pendingPatches
Expand All @@ -70,8 +70,8 @@ func (b *bankReactorState) atoi(value string) int {
return i
}

func (b *bankReactorState) patchAccount(index int, fn func(int) int) {
b.pendingPatch = append(b.pendingPatch, &SingleDataPatch{
func (b *bankReactorState) patchAccount(index int, fn func(int) int) DataPatch {
return &SingleDataPatch{
Key: util.NewEtcdKey(fmt.Sprintf("%s%d", bankTestPrefix, index)),
Func: func(old []byte) (newValue []byte, changed bool, err error) {
oldMoney := b.atoi(string(old))
Expand All @@ -82,19 +82,21 @@ func (b *bankReactorState) patchAccount(index int, fn func(int) int) {
log.Debug("change money", zap.Int("account", index), zap.Int("from", oldMoney), zap.Int("to", newMoney))
return []byte(strconv.Itoa(newMoney)), true, nil
},
})
}
}

func (b *bankReactorState) TransferRandomly(transferNumber int) {
for i := 0; i < transferNumber; i++ {
accountA := rand.Intn(len(b.account))
accountB := rand.Intn(len(b.account))
transferMoney := rand.Intn(100)
b.patchAccount(accountA, func(money int) int {
return money - transferMoney
})
b.patchAccount(accountB, func(money int) int {
return money + transferMoney
b.pendingPatch = append(b.pendingPatch, []DataPatch{
b.patchAccount(accountA, func(money int) int {
return money - transferMoney
}),
b.patchAccount(accountB, func(money int) int {
return money + transferMoney
}),
})
log.Debug("transfer money", zap.Int("accountA", accountA), zap.Int("accountB", accountB), zap.Int("money", transferMoney))
}
Expand Down
Loading

0 comments on commit eb87468

Please sign in to comment.