-
Notifications
You must be signed in to change notification settings - Fork 253
upgrade grpc to v1.73 #1779
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
upgrade grpc to v1.73 #1779
Changes from all commits
5b694e3
435dc7b
9bd9fb0
6f135a0
9c7b37b
523a18a
2a2ed9e
61142a1
a9b4a61
01d2ef4
fcf6d0c
ba4ac6b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,11 +61,14 @@ import ( | |
| "go.uber.org/zap" | ||
| "google.golang.org/grpc" | ||
| "google.golang.org/grpc/connectivity" | ||
| "google.golang.org/grpc/mem" | ||
| "google.golang.org/grpc/metadata" | ||
| ) | ||
|
|
||
| // globalEncodedMsgDataPool is used to pool pre-encoded message data for batch commands. | ||
| var globalEncodedMsgDataPool = grpc.NewSharedBufferPool() | ||
| var globalEncodedMsgDataPool = mem.NewTieredBufferPool( | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| 256, 4<<10, 16<<10, 32<<10, 1<<20, // copied from defaultBufferPoolSizes | ||
| ) | ||
|
|
||
| type encodedBatchCmd struct { | ||
| // implement isBatchCommandsRequest_Request_Cmd | ||
|
|
@@ -111,15 +114,15 @@ func encodeRequestCmd(req *tikvpb.BatchCommandsRequest_Request) error { | |
| return nil | ||
| } | ||
| data := globalEncodedMsgDataPool.Get(req.Cmd.Size()) | ||
| n, err := req.Cmd.MarshalTo(data) | ||
| n, err := req.Cmd.MarshalTo(*data) | ||
| if err != nil { | ||
| globalEncodedMsgDataPool.Put(&data) | ||
| globalEncodedMsgDataPool.Put(data) | ||
| return errors.WithStack(err) | ||
| } else if n != len(data) { | ||
| globalEncodedMsgDataPool.Put(&data) | ||
| return errors.Errorf("unexpected marshaled size: got %d, want %d", n, len(data)) | ||
| } else if n != len(*data) { | ||
| globalEncodedMsgDataPool.Put(data) | ||
| return errors.Errorf("unexpected marshaled size: got %d, want %d", n, len(*data)) | ||
| } | ||
| req.Cmd = &encodedBatchCmd{data: &data} | ||
| req.Cmd = &encodedBatchCmd{data: data} | ||
| return nil | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ import ( | |
| "google.golang.org/grpc/encoding/gzip" | ||
| "google.golang.org/grpc/experimental" | ||
| "google.golang.org/grpc/keepalive" | ||
| "google.golang.org/grpc/mem" | ||
| ) | ||
|
|
||
| type connPool struct { | ||
|
|
@@ -76,6 +77,7 @@ func (a *connPool) monitoredDial(ctx context.Context, connName, target string, o | |
| conn = &monitoredConn{ | ||
| Name: connName, | ||
| } | ||
| //nolint:SA1019 | ||
| conn.ClientConn, err = grpc.DialContext(ctx, target, opts...) | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep using the old (deprecated) API to avoid potential TiFlash problems. |
||
| if err != nil { | ||
| return nil, err | ||
|
|
@@ -120,6 +122,9 @@ func (a *connPool) Init(addr string, security config.Security, idleNotify *uint3 | |
| callOptions = append(callOptions, grpc.UseCompressor(gzip.Name)) | ||
| } | ||
|
|
||
| // Don't remove this until we no longer use sharedBytes in tipb and kvproto. | ||
| callOptions = append(callOptions, grpc.ForceCodec(&legacyCodec{})) | ||
|
|
||
| opts = append([]grpc.DialOption{ | ||
| opt, | ||
| grpc.WithInitialWindowSize(cfg.TiKVClient.GrpcInitialWindowSize), | ||
|
|
@@ -141,8 +146,8 @@ func (a *connPool) Init(addr string, security config.Security, idleNotify *uint3 | |
| Timeout: cfg.TiKVClient.GetGrpcKeepAliveTimeout(), | ||
| }), | ||
| }, opts...) | ||
| if cfg.TiKVClient.GrpcSharedBufferPool { | ||
| opts = append(opts, experimental.WithRecvBufferPool(grpc.NewSharedBufferPool())) | ||
| if !cfg.TiKVClient.GrpcSharedBufferPool { | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After grpc-go 1.66, the default pool was changed from noop pool to |
||
| opts = append(opts, experimental.WithBufferPool(mem.NopBufferPool{})) | ||
| } | ||
| conn, err := a.monitoredDial( | ||
| ctx, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| // Copyright 2026 PingCAP, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package client | ||
|
|
||
| import ( | ||
| "fmt" | ||
|
|
||
| "google.golang.org/protobuf/proto" | ||
| "google.golang.org/protobuf/protoadapt" | ||
| ) | ||
|
|
||
| // legacyCodec copies the legacy proto Codec used in grpc-go < 1.66, see | ||
| // https://github.com/grpc/grpc-go/blob/v1.65.x/encoding/proto/proto.go | ||
| // tipb/kvproto rely on grpc-go passing a []byte into `Unmarshal` that can be | ||
| // owned by the callee, so we override the `Marshal` to reduce the copy overhead. | ||
| // But after grpc-go >= 1.66, it switches to CodecV2. And the buffer passed into | ||
| // `Unmarshal` is now from a shared pool, which will be returned to the pool right | ||
| // after `Unmarshal` returns, which breaks the assumption of tipb/kvproto. So we | ||
| // force the old Codec behavior until sharedBytes is gone. | ||
| type legacyCodec struct{} | ||
|
|
||
| func (legacyCodec) Marshal(v any) ([]byte, error) { | ||
| vv := messageV2Of(v) | ||
| if vv == nil { | ||
| return nil, fmt.Errorf("failed to marshal, message is %T, want proto.Message", v) | ||
| } | ||
|
|
||
| return proto.Marshal(vv) | ||
| } | ||
|
|
||
| func (legacyCodec) Unmarshal(data []byte, v any) error { | ||
| vv := messageV2Of(v) | ||
| if vv == nil { | ||
| return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v) | ||
| } | ||
|
|
||
| return proto.Unmarshal(data, vv) | ||
| } | ||
|
|
||
| func messageV2Of(v any) proto.Message { | ||
| switch v := v.(type) { | ||
| case protoadapt.MessageV1: | ||
| return protoadapt.MessageV2Of(v) | ||
| case protoadapt.MessageV2: | ||
| return v | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (legacyCodec) Name() string { | ||
| return "proto" | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about other go.mod in examples?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's OK to keep them, but we can upgrade them as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think keep same is better