Skip to content

Commit

Permalink
This is an automated cherry-pick of #11339
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
asddongmen authored and ti-chi-bot committed Jul 4, 2024
1 parent f203805 commit a56e151
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 193 deletions.
26 changes: 7 additions & 19 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/factory"
tablesinkmetrics "github.com/pingcap/tiflow/cdc/sink/metrics/tablesink"
"github.com/pingcap/tiflow/cdc/sink/tablesink"
<<<<<<< HEAD

Check failure on line 36 in cdc/processor/sinkmanager/manager.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

missing import path

Check failure on line 36 in cdc/processor/sinkmanager/manager.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

missing import path
"github.com/pingcap/tiflow/pkg/config"
=======
pconfig "github.com/pingcap/tiflow/pkg/config"
>>>>>>> 695f93240c (processor: fix a bug that will cause processor Tick get stuck when downstream is Kafka (#11339))
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/spanz"
Expand Down Expand Up @@ -333,12 +337,6 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
}
}

func (m *SinkManager) needsStuckCheck() bool {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ
}

func (m *SinkManager) initSinkFactory() (chan error, uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
Expand Down Expand Up @@ -406,19 +404,6 @@ func (m *SinkManager) clearSinkFactory() {
}
}

func (m *SinkManager) putSinkFactoryError(err error, version uint64) (success bool) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
if version == m.sinkFactory.version {
select {
case m.sinkFactory.errors <- err:
default:
}
return true
}
return false
}

func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool) {
for i := 0; i < sinkWorkerNum; i++ {
w := newSinkWorker(m.changefeedID, m.sourceManager,
Expand Down Expand Up @@ -1030,6 +1015,7 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
m.sinkMemQuota.Release(span, checkpointTs)
m.redoMemQuota.Release(span, checkpointTs)

<<<<<<< HEAD
advanceTimeoutInSec := util.GetOrZero(m.changefeedInfo.Config.Sink.AdvanceTimeoutInSec)
if advanceTimeoutInSec <= 0 {
advanceTimeoutInSec = config.DefaultAdvanceTimeoutInSec
Expand All @@ -1049,6 +1035,8 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
}
}

=======
>>>>>>> 695f93240c (processor: fix a bug that will cause processor Tick get stuck when downstream is Kafka (#11339))
var resolvedTs model.Ts
// If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts.
if m.redoDMLMgr != nil {
Expand Down
15 changes: 0 additions & 15 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,21 +375,6 @@ func TestSinkManagerRunWithErrors(t *testing.T) {
}
}

func TestSinkManagerNeedsStuckCheck(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

require.False(t, manager.needsStuckCheck())
}

func TestSinkManagerRestartTableSinks(t *testing.T) {
failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause", "return")
defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause")
Expand Down
19 changes: 0 additions & 19 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,25 +453,6 @@ func (t *tableSinkWrapper) cleanRangeEventCounts(upperBound engine.Position, min
return shouldClean
}

func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint64) {
t.getCheckpointTs()

t.tableSink.RLock()
defer t.tableSink.RUnlock()
t.tableSink.innerMu.Lock()
defer t.tableSink.innerMu.Unlock()

// What these conditions mean:
// 1. the table sink has been associated with a valid sink;
// 2. its checkpoint hasn't been advanced for a while;
version := t.tableSink.version
advanced := t.tableSink.advanced
if version > 0 && time.Since(advanced) > stuckCheck {
return true, version
}
return false, uint64(0)
}

func handleRowChangedEvents(
changefeed model.ChangeFeedID, span tablepb.Span, events ...*model.PolymorphicEvent,
) ([]*model.RowChangedEvent, uint64) {
Expand Down
62 changes: 0 additions & 62 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"math"
"sync"
"testing"
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand All @@ -29,7 +28,6 @@ import (
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

type mockSink struct {
Expand Down Expand Up @@ -411,63 +409,3 @@ func TestTableSinkWrapperSinkVersion(t *testing.T) {
require.Nil(t, wrapper.tableSink.s)
require.Equal(t, wrapper.tableSink.version, uint64(0))
}

func TestTableSinkWrapperSinkInner(t *testing.T) {
t.Parallel()

innerTableSink := tablesink.New[*model.RowChangedEvent](
model.ChangeFeedID{}, tablepb.Span{}, model.Ts(0),
newMockSink(), &dmlsink.RowChangeEventAppender{},
pdutil.NewClock4Test(),
prometheus.NewCounter(prometheus.CounterOpts{}),
prometheus.NewHistogram(prometheus.HistogramOpts{}),
)
version := new(uint64)

wrapper := newTableSinkWrapper(
model.DefaultChangeFeedID("1"),
spanz.TableIDToComparableSpan(1),
func() (tablesink.TableSink, uint64) {
*version += 1
return innerTableSink, *version
},
tablepb.TableStatePrepared,
oracle.GoTimeToTS(time.Now()),
oracle.GoTimeToTS(time.Now().Add(10000*time.Second)),
func(_ context.Context) (model.Ts, error) { return math.MaxUint64, nil },
)

require.True(t, wrapper.initTableSink())

wrapper.closeAndClearTableSink()

// Shouldn't be stuck because version is 0.
require.Equal(t, wrapper.tableSink.version, uint64(0))
isStuck, _ := wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because tableSink.advanced is just updated.
require.True(t, wrapper.initTableSink())
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because upperbound hasn't been advanced.
time.Sleep(200 * time.Millisecond)
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because `getCheckpointTs` will update tableSink.advanced.
nowTs := oracle.GoTimeToTS(time.Now())
wrapper.updateReceivedSorterResolvedTs(nowTs)
wrapper.barrierTs.Store(nowTs)
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

time.Sleep(200 * time.Millisecond)
nowTs = oracle.GoTimeToTS(time.Now())
wrapper.updateReceivedSorterResolvedTs(nowTs)
wrapper.barrierTs.Store(nowTs)
wrapper.updateResolvedTs(model.NewResolvedTs(nowTs))
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.True(t, isStuck)
}
1 change: 1 addition & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type SinkConfig struct {

// AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been
// advanced for this given duration, the sink will be canceled and re-established.
// Deprecated since v8.1.1
AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"`
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/sink/kafka/sarama_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@ package kafka

import (
"context"
"time"

<<<<<<< HEAD

Check failure on line 20 in pkg/sink/kafka/sarama_factory.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

missing import path

Check failure on line 20 in pkg/sink/kafka/sarama_factory.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

missing import path
"github.com/Shopify/sarama"
=======
"github.com/IBM/sarama"
"github.com/pingcap/log"
>>>>>>> 695f93240c (processor: fix a bug that will cause processor Tick get stuck when downstream is Kafka (#11339))
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"github.com/rcrowley/go-metrics"
"go.uber.org/zap"
)

type saramaFactory struct {
Expand All @@ -43,17 +50,32 @@ func NewSaramaFactory(
}

func (f *saramaFactory) AdminClient(ctx context.Context) (ClusterAdminClient, error) {
start := time.Now()
config, err := NewSaramaConfig(ctx, f.option)
duration := time.Since(start).Seconds()
if duration > 2 {
log.Warn("new sarama config cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
}
if err != nil {
return nil, err
}

start = time.Now()
client, err := sarama.NewClient(f.option.BrokerEndpoints, config)
duration = time.Since(start).Seconds()
if duration > 2 {
log.Warn("new sarama client cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
}
if err != nil {
return nil, errors.Trace(err)
}

start = time.Now()
admin, err := sarama.NewClusterAdminFromClient(client)
duration = time.Since(start).Seconds()
if duration > 2 {
log.Warn("new sarama cluster admin cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
}
if err != nil {
return nil, errors.Trace(err)
}
Expand Down

This file was deleted.

29 changes: 0 additions & 29 deletions tests/integration_tests/hang_sink_suicide/conf/diff_config.toml

This file was deleted.

47 changes: 0 additions & 47 deletions tests/integration_tests/hang_sink_suicide/run.sh

This file was deleted.

5 changes: 5 additions & 0 deletions tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ group=$2
# Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant
# changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence
# multi_cdc_cluster capture_suicide_while_balance_table
<<<<<<< HEAD
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_suicide changefeed_dup_error_restart"
mysql_only_http="http_api http_api_tls api_v2"
=======
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint syncpoint_check_ts server_config_compatibility changefeed_dup_error_restart"
mysql_only_http="http_api http_api_tls api_v2 http_api_tls_with_user_auth cli_tls_with_auth"
>>>>>>> 695f93240c (processor: fix a bug that will cause processor Tick get stuck when downstream is Kafka (#11339))
mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"

kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume mq_sink_lost_callback"
Expand Down

0 comments on commit a56e151

Please sign in to comment.