Skip to content

Commit

Permalink
Cleanup: replace MessageName with TypeURL in MCP (istio#8207)
Browse files Browse the repository at this point in the history
* replace MessageName with TypeURL in MCP

* update mcp usage in mixer

* update mcp usage in galley
  • Loading branch information
lichuqiang authored and istio-testing committed Aug 27, 2018
1 parent c4ed228 commit 3d9c783
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 152 deletions.
6 changes: 3 additions & 3 deletions galley/tools/mcpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (

var (
serverAddr = flag.String("server", "127.0.0.1:9901", "The server address")
types = flag.String("types", "", "The types of resources to deploy")
types = flag.String("types", "", "The fully qualified type URLs of resources to deploy")
id = flag.String("id", "", "The node id for the client")
)

Expand All @@ -43,10 +43,10 @@ type updater struct {

// Update interface method implementation.
func (u *updater) Apply(ch *client.Change) error {
fmt.Printf("Incoming change: %v\n", ch.MessageName)
fmt.Printf("Incoming change: %v\n", ch.TypeURL)

for i, o := range ch.Objects {
fmt.Printf("%s[%d]\n", ch.MessageName, i)
fmt.Printf("%s[%d]\n", ch.TypeURL, i)

b, err := json.MarshalIndent(o, " ", " ")
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions mixer/pkg/config/mcp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ func (b *backend) Init(kinds []string) error {
}
b.mapping = m

messageNames := b.mapping.messageNames()
scope.Infof("Requesting following messages:")
for i, name := range messageNames {
scope.Infof(" [%d] %s", i, name)
typeURLs := b.mapping.typeURLs()
scope.Infof("Requesting following types:")
for i, url := range typeURLs {
scope.Infof(" [%d] %s", i, url)
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -183,7 +183,7 @@ func (b *backend) Init(kinds []string) error {
}

cl := mcp.NewAggregatedMeshConfigServiceClient(conn)
c := client.New(cl, messageNames, b, mixerNodeID, map[string]string{})
c := client.New(cl, typeURLs, b, mixerNodeID, map[string]string{})
configz.Register(c)

b.state = &state{
Expand Down Expand Up @@ -264,17 +264,17 @@ func (b *backend) Apply(change *client.Change) error {
defer b.callUpdateHook()

newTypeStates := make(map[string]map[store.Key]*store.BackEndResource)
typeURL := fmt.Sprintf("type.googleapis.com/%s", change.MessageName)
typeURL := change.TypeURL

scope.Debugf("Received update for: type:%s, count:%d", change.MessageName, len(change.Objects))
scope.Debugf("Received update for: type:%s, count:%d", typeURL, len(change.Objects))

for _, o := range change.Objects {
var kind string
var name string
var contents proto.Message

if scope.DebugEnabled() {
scope.Debugf("Processing incoming resource: %q @%s [%s]", o.Metadata.Name, o.Version, o.MessageName)
scope.Debugf("Processing incoming resource: %q @%s [%s]", o.Metadata.Name, o.Version, o.TypeURL)
}

// Demultiplex the resource, if it is a legacy type, and figure out its kind.
Expand Down
13 changes: 4 additions & 9 deletions mixer/pkg/config/mcp/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,13 @@ func constructMapping(allKinds []string, schema *kube.Schema) (*mapping, error)
}, nil
}

// messageNames returns all MessageNames that should be requested from the MCP server.
func (m *mapping) messageNames() []string {
messageName := func(s string) string {
idx := strings.LastIndex(s, "/")
return s[idx+1:]
}

// typeURLs returns all TypeURLs that should be requested from the MCP server.
func (m *mapping) typeURLs() []string {
result := make([]string, 0, len(m.typeURLsToKinds)+1)
for u := range m.typeURLsToKinds {
result = append(result, messageName(u))
result = append(result, u)
}
result = append(result, legacyMixerResourceMessageName)
result = append(result, legacyMixerResourceTypeURL)

return result
}
Expand Down
42 changes: 15 additions & 27 deletions pkg/mcp/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"io"
"sort"
"strings"
"sync"
"time"

Expand All @@ -34,22 +33,18 @@ import (
// try to re-establish the bi-directional grpc stream after this delay.
var reestablishStreamDelay = time.Second

const (
typeURLBase = "type.googleapis.com/"
)

// Object contains a decoded versioned object with metadata received from the server.
type Object struct {
MessageName string
Metadata *mcp.Metadata
Resource proto.Message
Version string
TypeURL string
Metadata *mcp.Metadata
Resource proto.Message
Version string
}

// Change is a collection of configuration objects of the same protobuf message type.
// Change is a collection of configuration objects of the same protobuf type.
type Change struct {
MessageName string
Objects []*Object
TypeURL string
Objects []*Object

// TODO(ayj) add incremental add/remove enum when the mcp protocol supports it.
}
Expand Down Expand Up @@ -145,7 +140,7 @@ func (r *recentRequestsJournal) snapshot() []RecentRequestInfo {
}

// New creates a new instance of the MCP client for the specified message types.
func New(mcpClient mcp.AggregatedMeshConfigServiceClient, supportedMessageNames []string, updater Updater, id string, metadata map[string]string) *Client { // nolint: lll
func New(mcpClient mcp.AggregatedMeshConfigServiceClient, supportedTypeURLs []string, updater Updater, id string, metadata map[string]string) *Client { // nolint: lll
clientInfo := &mcp.Client{
Id: id,
Metadata: &types.Struct{
Expand All @@ -159,8 +154,7 @@ func New(mcpClient mcp.AggregatedMeshConfigServiceClient, supportedMessageNames
}

state := make(map[string]*perTypeState)
for _, messageName := range supportedMessageNames {
typeURL := typeURLBase + messageName
for _, typeURL := range supportedTypeURLs {
state[typeURL] = &perTypeState{}
}

Expand Down Expand Up @@ -201,15 +195,9 @@ func (c *Client) handleResponse(response *mcp.MeshConfigResponse) *mcp.MeshConfi
return c.sendNACKRequest(response, "", errDetails)
}

responseMessageName := response.TypeUrl
// extract the message name from the fully qualified type_url.
if slash := strings.LastIndex(response.TypeUrl, "/"); slash >= 0 {
responseMessageName = response.TypeUrl[slash+1:]
}

change := &Change{
MessageName: responseMessageName,
Objects: make([]*Object, 0, len(response.Envelopes)),
TypeURL: response.TypeUrl,
Objects: make([]*Object, 0, len(response.Envelopes)),
}
for _, envelope := range response.Envelopes {
var dynamicAny types.DynamicAny
Expand All @@ -225,10 +213,10 @@ func (c *Client) handleResponse(response *mcp.MeshConfigResponse) *mcp.MeshConfi
}

object := &Object{
MessageName: responseMessageName,
Metadata: envelope.Metadata,
Resource: dynamicAny.Message,
Version: response.VersionInfo,
TypeURL: response.TypeUrl,
Metadata: envelope.Metadata,
Resource: dynamicAny.Message,
Version: response.VersionInfo,
}
change.Objects = append(change.Objects, object)
}
Expand Down
Loading

0 comments on commit 3d9c783

Please sign in to comment.