Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/p2p(ticdc): fix default p2p config #11182

Merged
merged 3 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cdc/puller/multiplexing_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type tableProgress struct {
}

scheduled atomic.Bool
start time.Time
}

func (p *tableProgress) handleResolvedSpans(ctx context.Context, e *model.ResolvedSpans) (err error) {
Expand All @@ -95,7 +96,10 @@ func (p *tableProgress) handleResolvedSpans(ctx context.Context, e *model.Resolv
zap.String("namespace", p.changefeed.Namespace),
zap.String("changefeed", p.changefeed.ID),
zap.String("tableName", p.tableName),
zap.Uint64("resolvedTs", resolvedTs))
zap.Any("tableID", p.spans),
zap.Uint64("resolvedTs", resolvedTs),
zap.Duration("duration", time.Since(p.start)),
)
}
if resolvedTs > p.resolvedTs.Load() {
p.resolvedTs.Store(resolvedTs)
Expand Down Expand Up @@ -230,6 +234,7 @@ func (p *MultiplexingPuller) subscribe(

resolvedEventsCache: make(chan kv.MultiplexingEvent, tableResolvedTsBufferSize),
tsTracker: frontier.NewFrontier(0, spans...),
start: time.Now(),
}

progress.consume.f = func(
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v3/agent/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (t *tableSpan) injectDispatchTableTask(task *dispatchTableTask) {
t.task = task
return
}
log.Debug("schedulerv3: table inject dispatch table task ignored,"+
log.Warn("schedulerv3: table inject dispatch table task ignored,"+
"since there is one not finished yet",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
Expand Down
18 changes: 18 additions & 0 deletions cdc/scheduler/internal/v3/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@
// FIXME it's an unbounded buffer, and may cause OOM!
msgBuf []*schedulepb.Message
}
lastPrintTime time.Time
ignoreCount int64
totalMsg int64
role Role
}

// Role of the transport user.
Expand All @@ -89,6 +93,7 @@
peerTopic: peerTopic,
messageServer: server,
messageRouter: router,
role: role,
}
var err error
trans.errCh, err = trans.messageServer.SyncAddHandler(
Expand All @@ -112,6 +117,7 @@
func (t *p2pTransport) Send(
ctx context.Context, msgs []*schedulepb.Message,
) error {
t.totalMsg += int64(len(msgs))
for i := range msgs {
value := msgs[i]
to := value.To
Expand All @@ -127,6 +133,18 @@
_, err := client.TrySendMessage(ctx, t.peerTopic, value)
if err != nil {
if cerror.ErrPeerMessageSendTryAgain.Equal(err) {
t.ignoreCount++
if time.Since(t.lastPrintTime) > 30*time.Second {
log.Warn("schedulerv3: message send failed since ignored, retry later",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.String("to", to),
zap.Int64("ignoreCount", t.ignoreCount),
zap.Int64("totalMsg", t.totalMsg),
zap.Float64("ignoreRate", float64(t.ignoreCount)/float64(t.totalMsg)),
zap.String("role", string(t.role)),
)
}

Check warning on line 147 in cdc/scheduler/internal/v3/transport/transport.go

View check run for this annotation

Codecov / codecov/patch

cdc/scheduler/internal/v3/transport/transport.go#L136-L147

Added lines #L136 - L147 were not covered by tests
return nil
}
if cerror.ErrPeerMessageClientClosed.Equal(err) {
Expand Down
18 changes: 9 additions & 9 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ func TestParseCfg(t *testing.T) {
// We expect the default configuration here.
Messages: &config.MessagesConfig{
ClientMaxBatchInterval: config.TomlDuration(time.Millisecond * 10),
ClientMaxBatchSize: 8 * 1024 * 1024,
ClientMaxBatchCount: 128,
ClientMaxBatchSize: 64 * 1024 * 1024,
ClientMaxBatchCount: 1024,
ClientRetryRateLimit: 1.0,
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
ServerWorkerPoolSize: 8,
MaxRecvMsgSize: 256 * 1024 * 1024,
KeepAliveTimeout: config.TomlDuration(time.Second * 10),
KeepAliveTime: config.TomlDuration(time.Second * 30),
Expand Down Expand Up @@ -469,12 +469,12 @@ cert-allowed-cn = ["dd","ee"]
// We expect the default configuration here.
Messages: &config.MessagesConfig{
ClientMaxBatchInterval: config.TomlDuration(time.Millisecond * 10),
ClientMaxBatchSize: 8 * 1024 * 1024,
ClientMaxBatchCount: 128,
ClientMaxBatchSize: 64 * 1024 * 1024,
ClientMaxBatchCount: 1024,
ClientRetryRateLimit: 1.0,
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
ServerWorkerPoolSize: 8,
MaxRecvMsgSize: 256 * 1024 * 1024,
KeepAliveTimeout: config.TomlDuration(time.Second * 10),
KeepAliveTime: config.TomlDuration(time.Second * 30),
Expand Down Expand Up @@ -534,12 +534,12 @@ unknown3 = 3
// We expect the default configuration here.
Messages: &config.MessagesConfig{
ClientMaxBatchInterval: config.TomlDuration(time.Millisecond * 10),
ClientMaxBatchSize: 8 * 1024 * 1024,
ClientMaxBatchCount: 128,
ClientMaxBatchSize: 64 * 1024 * 1024,
ClientMaxBatchCount: 1024,
ClientRetryRateLimit: 1.0,
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
ServerWorkerPoolSize: 8,
MaxRecvMsgSize: 256 * 1024 * 1024,
KeepAliveTimeout: config.TomlDuration(time.Second * 10),
KeepAliveTime: config.TomlDuration(time.Second * 30),
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@ const (
},
"messages": {
"client-max-batch-interval": 10000000,
"client-max-batch-size": 8388608,
"client-max-batch-count": 128,
"client-max-batch-size": 67108864,
"client-max-batch-count": 1024,
"client-retry-rate-limit": 1,
"server-max-pending-message-count": 102400,
"server-ack-interval": 100000000,
"server-worker-pool-size": 4,
"server-worker-pool-size": 8,
"max-recv-msg-size": 268435456,
"keep-alive-time": 30000000000,
"keep-alive-timeout": 10000000000
Expand Down
8 changes: 4 additions & 4 deletions pkg/config/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ type MessagesConfig struct {
var defaultMessageConfig = &MessagesConfig{
// Note that ClientMaxBatchInterval may increase the checkpoint latency.
ClientMaxBatchInterval: TomlDuration(time.Millisecond * 10),
ClientMaxBatchSize: 8 * 1024 * 1024, // 8MB
ClientMaxBatchCount: 128,
ClientMaxBatchSize: 64 * 1024 * 1024, // 64MB
ClientMaxBatchCount: 1024,
ClientRetryRateLimit: 1.0, // Once per second
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
ServerWorkerPoolSize: 8,
MaxRecvMsgSize: defaultMaxRecvMsgSize,
KeepAliveTime: TomlDuration(time.Second * 30),
KeepAliveTimeout: TomlDuration(time.Second * 10),
Expand All @@ -66,7 +66,7 @@ const (

// clientSendChannelSize represents the size of an internal channel used to buffer
// unsent messages.
clientSendChannelSize = 128
clientSendChannelSize = 1024

// clientDialTimeout represents the timeout given to gRPC to dial. 5 seconds seems reasonable
// because it is unlikely that the latency between TiCDC nodes is larger than 5 seconds.
Expand Down
Loading