Skip to content

Commit

Permalink
channelz: major cleanup / reorganization (#6969)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Mar 15, 2024
1 parent a1033b1 commit 55cd7a6
Show file tree
Hide file tree
Showing 47 changed files with 2,467 additions and 2,791 deletions.
4 changes: 2 additions & 2 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ type BuildOptions struct {
// implementations which do not communicate with a remote load balancer
// server can ignore this field.
Authority string
// ChannelzParentID is the parent ClientConn's channelz ID.
ChannelzParentID *channelz.Identifier
// ChannelzParent is the parent ClientConn's channelz channel.
ChannelzParent channelz.Identifier
// CustomUserAgent is the custom user agent set on the parent ClientConn.
// The balancer should set the same custom user agent if it creates a
// ClientConn.
Expand Down
2 changes: 1 addition & 1 deletion balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() error {
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`))
dopts = append(dopts, grpc.WithResolvers(lb.manualResolver))
dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID))
dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParent))

// Enable Keepalive for grpclb client.
dopts = append(dopts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Expand Down
26 changes: 13 additions & 13 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
ccb := &ccBalancerWrapper{
cc: cc,
opts: balancer.BuildOptions{
DialCreds: cc.dopts.copts.TransportCredentials,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
DialCreds: cc.dopts.copts.TransportCredentials,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParent: cc.channelz,
Target: cc.parsedTarget,
},
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
Expand Down Expand Up @@ -155,14 +155,14 @@ func (ccb *ccBalancerWrapper) switchTo(name string) {
func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) {
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
channelz.Warningf(logger, ccb.cc.channelz, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name)
}

if err := ccb.balancer.SwitchTo(builder); err != nil {
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
channelz.Errorf(logger, ccb.cc.channelz, "Channel failed to build new LB policy %q: %v", name, err)
return
}
ccb.curBalancerName = builder.Name()
Expand All @@ -175,7 +175,7 @@ func (ccb *ccBalancerWrapper) close() {
ccb.mu.Lock()
ccb.closed = true
ccb.mu.Unlock()
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing")
channelz.Info(logger, ccb.cc.channelz, "ccBalancerWrapper: closing")
ccb.serializer.Schedule(func(context.Context) {
if ccb.balancer == nil {
return
Expand Down Expand Up @@ -212,7 +212,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
}
ac, err := ccb.cc.newAddrConnLocked(addrs, opts)
if err != nil {
channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
channelz.Warningf(logger, ccb.cc.channelz, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
}
acbw := &acBalancerWrapper{
Expand Down Expand Up @@ -304,7 +304,7 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, err error) {
}

func (acbw *acBalancerWrapper) String() string {
return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int())
return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelz.ID)
}

func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
Expand Down
3 changes: 3 additions & 0 deletions channelz/service/func_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func convertToPtypesDuration(sec int64, usec int64) *durpb.Duration {
}

func sockoptToProto(skopts *channelz.SocketOptionData) []*channelzpb.SocketOption {
if skopts == nil {
return nil
}
var opts []*channelzpb.SocketOption
if skopts.Linger != nil {
additional, err := anypb.New(&channelzpb.SocketOptionLinger{
Expand Down
Loading

0 comments on commit 55cd7a6

Please sign in to comment.