Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 4 additions & 14 deletions xds/internal/clients/xdsclient/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,10 @@ func decodeResponse(opts *DecodeOptions, rType *ResourceType, resp response) (ma
perResourceErrors := make(map[string]error) // Tracks resource validation errors, where we have a resource name.
ret := make(map[string]dataAndErrTuple) // Return result, a map from resource name to either resource data or error.
for _, r := range resp.resources {
// Unwrap and validate the resource, but preserve the original bytes for
// decoding. This is required for resource types that don't have a name
// field in the resource itself, but only have one in the wrapped
// resource.
inner, err := xdsresource.UnwrapResource(r)
if err != nil {
topLevelErrors = append(topLevelErrors, err)
continue
}
if _, ok := opts.Config.ResourceTypes[inner.GetTypeUrl()]; !ok || inner.GetTypeUrl() != resp.typeURL {
topLevelErrors = append(topLevelErrors, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "unexpected resource type: %q ", inner.GetTypeUrl()))
continue
}
result, err := rType.Decoder.Decode(r.GetValue(), *opts)
result, err := rType.Decoder.Decode(AnyProto{
TypeURL: r.GetTypeUrl(),
Value: r.GetValue(),
}, *opts)

// Name field of the result is left unpopulated only when resource
// deserialization fails.
Expand Down
22 changes: 17 additions & 5 deletions xds/internal/clients/xdsclient/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/xds/internal/clients/internal/pretty"
"google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
Expand Down Expand Up @@ -62,17 +63,24 @@ var (
}
)

func unmarshalListenerResource(r []byte) (string, listenerUpdate, error) {
func unmarshalListenerResource(rProto *anypb.Any) (string, listenerUpdate, error) {
rProto, err := xdsresource.UnwrapResource(rProto)
if err != nil {
return "", listenerUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
}
if !xdsresource.IsListenerResource(rProto.GetTypeUrl()) {
return "", listenerUpdate{}, fmt.Errorf("unexpected listener resource type: %q", rProto.GetTypeUrl())
}
lis := &v3listenerpb.Listener{}
if err := proto.Unmarshal(r, lis); err != nil {
if err := proto.Unmarshal(rProto.GetValue(), lis); err != nil {
return "", listenerUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
}

lu, err := processListener(lis)
if err != nil {
return lis.GetName(), listenerUpdate{}, err
}
lu.Raw = r
lu.Raw = rProto.GetValue()
return lis.GetName(), *lu, nil
}

Expand Down Expand Up @@ -154,8 +162,12 @@ type listenerDecoder struct{}

// Decode deserializes and validates an xDS resource serialized inside the
// provided `Any` proto, as received from the xDS management server.
func (listenerDecoder) Decode(resource []byte, _ DecodeOptions) (*DecodeResult, error) {
name, listener, err := unmarshalListenerResource(resource)
func (listenerDecoder) Decode(resource AnyProto, _ DecodeOptions) (*DecodeResult, error) {
rProto := &anypb.Any{
TypeUrl: resource.TypeURL,
Value: resource.Value,
}
name, listener, err := unmarshalListenerResource(rProto)
switch {
case name == "":
// Name is unset only when protobuf deserialization fails.
Expand Down
18 changes: 15 additions & 3 deletions xds/internal/clients/xdsclient/resource_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,21 @@ type Decoder interface {
// Decode deserializes and validates an xDS resource as received from the
// xDS management server.
//
// If deserialization fails or resource validation fails, it returns a
// non-nil error. Otherwise, returns a fully populated DecodeResult.
Decode(resource []byte, options DecodeOptions) (*DecodeResult, error)
// The `resource` parameter may contain a value of the serialized wrapped
// resource (i.e. with the type URL
// `type.googleapis.com/envoy.service.discovery.v3.Resource`).
// Implementations are responsible for unwrapping the underlying resource if
// it is wrapped.
//
// If unmarshalling or validation fails, it returns a non-nil error.
// Otherwise, returns a fully populated DecodeResult.
Decode(resource AnyProto, options DecodeOptions) (*DecodeResult, error)
}

// AnyProto contains the type URL and serialized proto data of an xDS resource.
type AnyProto struct {
TypeURL string
Value []byte
}

// DecodeOptions wraps the options required by ResourceType implementations for
Expand Down
22 changes: 17 additions & 5 deletions xds/internal/clients/xdsclient/test/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/xds/internal/clients/xdsclient"
"google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
Expand Down Expand Up @@ -73,17 +74,24 @@ var (
}
)

func unmarshalListenerResource(r []byte) (string, listenerUpdate, error) {
func unmarshalListenerResource(rProto *anypb.Any) (string, listenerUpdate, error) {
rProto, err := xdsresource.UnwrapResource(rProto)
if err != nil {
return "", listenerUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
}
if !xdsresource.IsListenerResource(rProto.GetTypeUrl()) {
return "", listenerUpdate{}, fmt.Errorf("unexpected listener resource type: %q", rProto.GetTypeUrl())
}
lis := &v3listenerpb.Listener{}
if err := proto.Unmarshal(r, lis); err != nil {
if err := proto.Unmarshal(rProto.GetValue(), lis); err != nil {
return "", listenerUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
}

lu, err := processListener(lis)
if err != nil {
return lis.GetName(), listenerUpdate{}, err
}
lu.Raw = r
lu.Raw = rProto.GetValue()
return lis.GetName(), *lu, nil
}

Expand Down Expand Up @@ -165,8 +173,12 @@ type listenerDecoder struct{}

// Decode deserializes and validates an xDS resource serialized inside the
// provided `Any` proto, as received from the xDS management server.
func (listenerDecoder) Decode(resource []byte, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
name, listener, err := unmarshalListenerResource(resource)
func (listenerDecoder) Decode(resource xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
rProto := &anypb.Any{
TypeUrl: resource.TypeURL,
Value: resource.Value,
}
name, listener, err := unmarshalListenerResource(rProto)
switch {
case name == "":
// Name is unset only when protobuf deserialization fails.
Expand Down
9 changes: 6 additions & 3 deletions xds/internal/xdsclient/xdsresource/resource_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,17 @@ type GenericResourceTypeDecoder struct {

// Decode deserialize and validate resource bytes of an xDS resource received
// from the xDS management server.
func (gd *GenericResourceTypeDecoder) Decode(resourceBytes []byte, gOpts xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
raw := &anypb.Any{TypeUrl: gd.ResourceType.TypeURL(), Value: resourceBytes}
func (gd *GenericResourceTypeDecoder) Decode(resource xdsclient.AnyProto, gOpts xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
rProto := &anypb.Any{
TypeUrl: resource.TypeURL,
Value: resource.Value,
}
opts := &DecodeOptions{BootstrapConfig: gd.BootstrapConfig}
if gOpts.ServerConfig != nil {
opts.ServerConfig = gd.ServerConfigMap[*gOpts.ServerConfig]
}

result, err := gd.ResourceType.Decode(opts, raw)
result, err := gd.ResourceType.Decode(opts, rProto)
if result == nil {
return nil, err
}
Expand Down