-
Notifications
You must be signed in to change notification settings - Fork 4.6k
xds: Add cluster endpoint watchers to depndency manager #8744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #8744 +/- ##
==========================================
- Coverage 83.39% 83.35% -0.05%
==========================================
Files 419 419
Lines 32566 32861 +295
==========================================
+ Hits 27159 27390 +231
- Misses 4023 4067 +44
- Partials 1384 1404 +20
🚀 New features to boost your workflow:
|
|
Can you also please check the codecov report here: #8744 (comment) to see if there are any valid lines that are missing test coverage. Thanks. |
| } | ||
|
|
||
| func (dr *dnsResolver) ParseServiceConfig(string) *serviceconfig.ParseResult { | ||
| return &serviceconfig.ParseResult{Err: fmt.Errorf("service config not supported")} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that this is existing behavior, but do you happen to know if this is what other languages do as well?
|
|
||
| // dnsResolverState watches updates for the given DNS hostname. It implements | ||
| // resolver.ClientConn interface to work with the DNS resolver. | ||
| type dnsResolver struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make an interface that both the endpointsWatcher and the dnsWatcher can satisfy, so that the dependency manager can treat them as equals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if that will add any value , because in the populateCusterConfig we will anyway have to treat is separately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If they have a common interface, why would we have to treat it separately?
| cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), | ||
| cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy", "TelemetryLabels"), | ||
| cmpopts.IgnoreFields(xdsresource.EndpointsUpdate{}, "Raw"), | ||
| cmp.Transformer("ErrorsToString", func(in error) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this transformation meant for cluster errors and endpoint errors? If so, would it make more sense to have a function with the below contents, and have transformers for the specific fields that we are interested in, that invoke the function instead of blindly invoking it on all fields of type error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, its meant for cluster errors and endpoint errors , and those are the only two error fields in XDSConfig struct. So I dont think it will apply to anything else. And even if we add any error fields later , we might need to do this transformation since we get wrapped errors from XDSConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without having transformers attached to specific fields, it becomes the responsibility of the reader to figure out which fields are affected by the transformation. Either have a comment saying which fields or affected, or change the transformation to apply to specific fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment.
| // Safely access the first address string for comparison. | ||
| addrA := "" | ||
| if len(a.Addresses) > 0 { | ||
| addrA = a.Addresses[0].Addr | ||
| } | ||
|
|
||
| addrB := "" | ||
| if len(b.Addresses) > 0 { | ||
| addrB = b.Addresses[0].Addr | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using a stable hash and computing a hash for every endpoint and determining the order based on that hash?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| {ID: clients.Locality{ | ||
| Region: "region-1", | ||
| Zone: "zone-1", | ||
| SubZone: "subzone-1", | ||
| }, | ||
| Endpoints: []xdsresource.Endpoint{ | ||
| { | ||
| Addresses: []string{addr}, | ||
| HealthStatus: xdsresource.EndpointHealthStatusUnknown, | ||
| Weight: 1, | ||
| }, | ||
| }, | ||
| Weight: 1}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be in compliance with this style guidenline: go/go-style/decisions#literal-matching-braces
Please fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem to be fixed yet.
| // Drain the updates because management server keeps sending the error | ||
| // updates repeatedly causing the update from dependency manager to be | ||
| // blocked if we don't drain it. | ||
| select { | ||
| case <-ctx.Done(): | ||
| case <-watcher.updateCh: | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this pattern is used in a bunch of tests, could we have a watcher type that can handle this instead of having the main test goroutine handle this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| clusterConfigs[clusterName] = &xdsresource.ClusterResult{Err: m.annotateErrorWithNodeID(fmt.Errorf("aggregate cluster graph exceeds max depth (%d)", aggregateClusterMaxDepth))} | ||
| return true, nil, m.annotateErrorWithNodeID(fmt.Errorf("aggregate cluster graph exceeds max depth (%d)", aggregateClusterMaxDepth)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be:
err := m.annotateErrorWithNodeID(fmt.Errorf("aggregate cluster graph exceeds max depth (%d)", aggregateClusterMaxDepth))}
clusterConfigs[clusterName] = &xdsresource.ClusterResult{Err: err}
return true, nil, errThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| // When the cluster update is not one of the three types of cluster updates. | ||
| clusterConfigs[clusterName] = &xdsresource.ClusterResult{Err: m.annotateErrorWithNodeID(fmt.Errorf("cluster type %v of cluster %s not supported", update.ClusterType, clusterName))} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is better handled as the default case in the switch above, and once we do that we won't even need the return statement here as every branch of the switch will have a return statement of its own.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And when we do that, we won't need this comment either.
| m.maybeSendUpdateLocked() | ||
| } | ||
|
|
||
| // RequestDNSReresolution calls all the the DNS resolvers ResolveNow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/resolvers/resolver's/
| {ID: clients.Locality{ | ||
| Region: "region-1", | ||
| Zone: "zone-1", | ||
| SubZone: "subzone-1", | ||
| }, | ||
| Endpoints: []xdsresource.Endpoint{ | ||
| { | ||
| Addresses: []string{addr}, | ||
| HealthStatus: xdsresource.EndpointHealthStatusUnknown, | ||
| Weight: 1, | ||
| }, | ||
| }, | ||
| Weight: 1}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem to be fixed yet.
| // Safely access the first address string for comparison. | ||
| addrA := "" | ||
| if len(a.Addresses) > 0 { | ||
| addrA = a.Addresses[0].Addr | ||
| } | ||
|
|
||
| addrB := "" | ||
| if len(b.Addresses) > 0 { | ||
| addrB = b.Addresses[0].Addr | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping on this.
| cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), | ||
| cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy", "TelemetryLabels"), | ||
| cmpopts.IgnoreFields(xdsresource.EndpointsUpdate{}, "Raw"), | ||
| cmp.Transformer("ErrorsToString", func(in error) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without having transformers attached to specific fields, it becomes the responsibility of the reader to figure out which fields are affected by the transformation. Either have a comment saying which fields or affected, or change the transformation to apply to specific fields.
| {ID: clients.Locality{ | ||
| Region: "region-1", | ||
| Zone: "zone-1", | ||
| SubZone: "subzone-1", | ||
| }, | ||
| Endpoints: []xdsresource.Endpoint{ | ||
| { | ||
| Addresses: []string{"localhost:8081"}, | ||
| HealthStatus: xdsresource.EndpointHealthStatusUnknown, | ||
| Weight: 1, | ||
| }, | ||
| }, | ||
| Weight: 1}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hasn't been fixed yet.
| updateCh: make(chan *xdsresource.XDSConfig), | ||
| errorCh: make(chan error), | ||
| } | ||
| ctx, cancel := context.WithTimeout(context.Background(), 50*defaultTestTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are increasing it for testing, please change the value of the constant (in one place). This is very easy to miss in a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it! CHanged.
|
I made a small refactoring commit to split the I also added the |
This is part of A74 implementation which add CDS/EDS/DNS watchers to the dependency manager. It also adds a temporary flag that is disabled by default so that it is not used in the current RPC paths , but enabled in the dependency manager tests.
RELEASE NOTES: None