Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro committed Mar 19, 2020
1 parent 85c4d75 commit 79b9e4b
Show file tree
Hide file tree
Showing 19 changed files with 615 additions and 252 deletions.
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
### Makefile for ticdc
.PHONY: build test check clean fmt cdc kafka_consumer coverage \
integration_test_build integration_test
integration_test_build integration_test integration_test_mysql integration_test_kafka

PROJECT=ticdc

Expand Down Expand Up @@ -86,8 +86,13 @@ integration_test_build: check_failpoint_ctl
|| { $(FAILPOINT_DISABLE); exit 1; }
$(FAILPOINT_DISABLE)

integration_test: check_third_party_binary
tests/run.sh $(CASE)
integration_test: integration_test_mysql

integration_test_mysql: check_third_party_binary
tests/run.sh $(CASE) mysql

integration_test_kafka: check_third_party_binary
tests/run.sh $(CASE) kafka

fmt:
@echo "gofmt (simplify)"
Expand Down
11 changes: 11 additions & 0 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func (c *changeFeed) reAddTable(id, startTs uint64) {
}
}

func (c *changeFeed) run(ctx context.Context) error {
return c.sink.Run(ctx)
}

func (c *changeFeed) addTable(sid, tid, startTs uint64, table entry.TableName) {
if c.filter.ShouldIgnoreTable(table.Schema, table.Table) {
return
Expand Down Expand Up @@ -654,6 +658,12 @@ func (o *ownerImpl) loadChangeFeeds(ctx context.Context) error {
return errors.Annotatef(err, "create change feed %s", changeFeedID)
}
o.changeFeeds[changeFeedID] = newCf
go func() {
err := newCf.run(ctx)
if errors.Cause(err) != context.Canceled {
log.Error("run changefeed failed", zap.Error(err))
}
}()
}

for _, changefeed := range o.changeFeeds {
Expand Down Expand Up @@ -1020,6 +1030,7 @@ func (o *ownerImpl) Run(ctx context.Context, tickTime time.Duration) error {
ownerChanged = true
continue
}
ctx := util.SetOwnerInCtx(ctx)
if ownerChanged {
// Do something initialize when the capture becomes an owner.
ownerChanged = false
Expand Down
117 changes: 90 additions & 27 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"

"github.com/pingcap/ticdc/pkg/util"

"github.com/pingcap/log"
Expand All @@ -21,28 +23,38 @@ import (
)

type mqSink struct {
mqProducer mqProducer.Producer
partitionNum int32
mqProducer mqProducer.Producer
partitionNum int32
lastSentMsgIndex uint64

sinkCheckpointTsCh chan uint64
globalResolvedTs uint64
checkpointTs uint64
filter *util.Filter
sinkCheckpointTsCh chan struct {
ts uint64
index uint64
}
globalResolvedTs uint64
checkpointTs uint64
filter *util.Filter

changefeedID string

count int64

errCh chan error
}

func newMqSink(mqProducer mqProducer.Producer, filter *util.Filter, opts map[string]string) *mqSink {
partitionNum := mqProducer.GetPartitionNum()
changefeedID := opts[OptChangefeedID]
return &mqSink{
mqProducer: mqProducer,
partitionNum: partitionNum,
sinkCheckpointTsCh: make(chan uint64, 128),
filter: filter,
changefeedID: changefeedID,
mqProducer: mqProducer,
partitionNum: partitionNum,
sinkCheckpointTsCh: make(chan struct {
ts uint64
index uint64
}, 128),
filter: filter,
changefeedID: changefeedID,
errCh: make(chan error, 1),
}
}

Expand All @@ -56,7 +68,12 @@ func (k *mqSink) EmitCheckpointEvent(ctx context.Context, ts uint64) error {
if err != nil {
return errors.Trace(err)
}
err = k.mqProducer.BroadcastMessage(ctx, keyByte, nil)
_, err = k.mqProducer.BroadcastMessage(ctx, keyByte, nil, func(err error) {
if err != nil {
log.Error("failed to send checkpoint event to kafka", zap.Error(err), zap.Uint64("ts", ts))
return
}
})
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -84,20 +101,28 @@ func (k *mqSink) EmitRowChangedEvent(ctx context.Context, rows ...*model.RowChan
if err != nil {
return errors.Trace(err)
}
err = k.mqProducer.SendMessage(ctx, keyByte, valueByte, partition)
k.lastSentMsgIndex, err = k.mqProducer.SendMessage(ctx, keyByte, valueByte, partition, func(err error) {
if err != nil {
log.Error("failed to send row changed event to kafka", zap.Error(err), zap.Reflect("row", row))
return
}
atomic.AddInt64(&k.count, 1)
})
if err != nil {
log.Error("send message failed", zap.ByteStrings("row", [][]byte{keyByte, valueByte}), zap.Int32("partition", partition))
return errors.Trace(err)
}
atomic.AddInt64(&k.count, 1)
}
if sinkCheckpointTs == 0 {
return nil
}
// handle sink checkpoint ts
select {
case <-ctx.Done():
return ctx.Err()
case k.sinkCheckpointTsCh <- sinkCheckpointTs:
case k.sinkCheckpointTsCh <- struct {
ts uint64
index uint64
}{ts: sinkCheckpointTs, index: k.lastSentMsgIndex}:
}
return nil
}
Expand Down Expand Up @@ -147,11 +172,16 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
if err != nil {
return errors.Trace(err)
}
err = k.mqProducer.BroadcastMessage(ctx, keyByte, valueByte)
_, err = k.mqProducer.BroadcastMessage(ctx, keyByte, valueByte, func(err error) {
if err != nil {
log.Error("failed to send row changed event to kafka", zap.Error(err), zap.String("ddl", ddl.Query))
return
}
atomic.AddInt64(&k.count, 1)
})
if err != nil {
return errors.Trace(err)
}
atomic.AddInt64(&k.count, 1)
return nil
}

Expand All @@ -160,24 +190,57 @@ func (k *mqSink) CheckpointTs() uint64 {
}

func (k *mqSink) Run(ctx context.Context) error {
wg, cctx := errgroup.WithContext(ctx)
log.Info("IsOwner", zap.Bool("b", util.IsOwnerFromCtx(ctx)))
if !util.IsOwnerFromCtx(ctx) {
wg.Go(func() error {
return k.run(cctx)
})
}
wg.Go(func() error {
return k.mqProducer.Run(cctx)
})
return wg.Wait()
}

func (k *mqSink) run(ctx context.Context) error {
closeSink := func() {
err := k.mqProducer.Close()
if err != nil {
log.Error("close MQ Producer failed", zap.Error(err))
}
close(k.errCh)
}
for {
var sinkCheckpointTs uint64
var sinkCheckpoint struct {
ts uint64
index uint64
}
select {
case <-ctx.Done():
err := k.mqProducer.Close()
if err != nil {
log.Error("close MQ Producer failed", zap.Error(err))
}
closeSink()
return ctx.Err()
case sinkCheckpointTs = <-k.sinkCheckpointTsCh:
case err := <-k.errCh:
closeSink()
return err
case sinkCheckpoint = <-k.sinkCheckpointTsCh:
}

// wait mq producer send message successfully
for sinkCheckpoint.index > k.mqProducer.MaxSuccessesIndex() {
log.Info("wait index", zap.Uint64("sinkCheckpoint.index", sinkCheckpoint.index))
time.Sleep(20 * time.Millisecond)
}
globalResolvedTs := atomic.LoadUint64(&k.globalResolvedTs)
log.Info("global resolved", zap.Uint64("globalResolved", globalResolvedTs))
// when local resolvedTS is fallback, we will postpone to pushing global resolvedTS
// check if the global resolvedTS is postponed
if globalResolvedTs < sinkCheckpointTs {
sinkCheckpointTs = globalResolvedTs

log.Info("sinkCheckpoint.ts", zap.Uint64("sinkCheckpoint.ts", sinkCheckpoint.ts))
if globalResolvedTs < sinkCheckpoint.ts {
sinkCheckpoint.ts = globalResolvedTs
}
atomic.StoreUint64(&k.checkpointTs, sinkCheckpointTs)
atomic.StoreUint64(&k.checkpointTs, sinkCheckpoint.ts)
}
}

Expand Down
Loading

0 comments on commit 79b9e4b

Please sign in to comment.