Skip to content

Commit

Permalink
add snapshot grouping capability to MCP's snapshot package (istio#9041)
Browse files Browse the repository at this point in the history
* add snapshot grouping capability to MCP's snapshot package

It's useful to group MCP clients into groups for the purpose of
downstream configuration management. For example, multiple Pilot pods
may share the same desired set of networking API resources. The
snapshot mechanism currently is harded to use the client identifer as
the grouping key. This prevents the use of more advanced grouping
schemes.

This PR introduces GroupIndex which rationalizes the concept of
organizing downstream snapshots into groups.

* address review comments
  • Loading branch information
ayj authored and istio-testing committed Oct 4, 2018
1 parent cfa1ba3 commit 285070b
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 62 deletions.
5 changes: 2 additions & 3 deletions galley/pkg/runtime/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"github.com/pkg/errors"

"istio.io/istio/galley/pkg/metadata"

"istio.io/istio/galley/pkg/runtime/resource"
"istio.io/istio/pkg/log"
"istio.io/istio/pkg/mcp/snapshot"
)

var scope = log.RegisterScope("runtime", "Galley runtime", 0)
Expand Down Expand Up @@ -179,6 +179,5 @@ func (p *Processor) processEvent(e resource.Event) bool {
func (p *Processor) publish() {
sn := p.state.buildSnapshot()

// TODO: Set the appropriate name for publishing
p.distributor.SetSnapshot("", sn)
p.distributor.SetSnapshot(snapshot.DefaultGroup, sn)
}
6 changes: 3 additions & 3 deletions galley/pkg/runtime/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var emptyInfo = testSchema.Get("type.googleapis.com/google.protobuf.Empty")

func TestProcessor_Start(t *testing.T) {
src := NewInMemorySource()
distributor := snapshot.New()
distributor := snapshot.New(snapshot.DefaultGroupIndex)
p := NewProcessor(src, distributor)

err := p.Start()
Expand All @@ -59,7 +59,7 @@ func (e *erroneousSource) Start() (chan resource.Event, error) {
func (e *erroneousSource) Stop() {}

func TestProcessor_Start_Error(t *testing.T) {
distributor := snapshot.New()
distributor := snapshot.New(snapshot.DefaultGroupIndex)
p := NewProcessor(&erroneousSource{}, distributor)

err := p.Start()
Expand All @@ -70,7 +70,7 @@ func TestProcessor_Start_Error(t *testing.T) {

func TestProcessor_Stop(t *testing.T) {
src := NewInMemorySource()
distributor := snapshot.New()
distributor := snapshot.New(snapshot.DefaultGroupIndex)
strategy := newPublishingStrategyWithDefaults()

p := newProcessor(src, distributor, strategy, testSchema, nil)
Expand Down
2 changes: 1 addition & 1 deletion galley/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func newServer(a *Args, p patchTable) (*Server, error) {
}
}

distributor := snapshot.New()
distributor := snapshot.New(snapshot.DefaultGroupIndex)
s.processor = runtime.NewProcessor(src, distributor)

var grpcOptions []grpc.ServerOption
Expand Down
4 changes: 3 additions & 1 deletion mixer/pkg/config/mcp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"time"

"istio.io/istio/pkg/mcp/snapshot"

"github.com/gogo/protobuf/proto"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -42,7 +44,7 @@ import (
var scope = log.RegisterScope("mcp", "Mixer MCP client stack", 0)

const (
mixerNodeID = ""
mixerNodeID = snapshot.DefaultGroup
eventChannelSize = 4096
requiredCertCheckFreq = 500 * time.Millisecond
)
Expand Down
12 changes: 6 additions & 6 deletions pkg/mcp/configz/configz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestConfigZ(t *testing.T) {

u := &updater{}
clnt := mcp.NewAggregatedMeshConfigServiceClient(cc)
cl := client.New(clnt, []string{"type.googleapis.com/google.protobuf.Empty"}, u, "zoo", map[string]string{"foo": "bar"})
cl := client.New(clnt, []string{"type.googleapis.com/google.protobuf.Empty"}, u, snapshot.DefaultGroup, map[string]string{"foo": "bar"})

ctx, cancel := context.WithCancel(context.Background())
go cl.Run(ctx)
Expand All @@ -69,7 +69,7 @@ func TestConfigZ(t *testing.T) {

// wait for client to make first watch request
for {
if status := s.Cache.Status("zoo"); status != nil {
if status := s.Cache.Status(snapshot.DefaultGroup); status != nil {
if status.Watches() > 0 {
break
}
Expand All @@ -86,11 +86,11 @@ func TestConfigZ(t *testing.T) {
t.Fatalf("Setting an entry should not have failed: %v", err)
}
prevSnapshotTime := time.Now()
s.Cache.SetSnapshot("zoo", b.Build())
s.Cache.SetSnapshot(snapshot.DefaultGroup, b.Build())

// wait for client to ACK the pushed snapshot
for {
if status := s.Cache.Status("zoo"); status != nil {
if status := s.Cache.Status(snapshot.DefaultGroup); status != nil {
if status.LastWatchRequestTime().After(prevSnapshotTime) {
break
}
Expand All @@ -108,7 +108,7 @@ func TestConfigZ(t *testing.T) {
func testConfigZWithNoRequest(t *testing.T, baseURL string) {
// First, test configz, with no recent requests.
data := request(t, baseURL+"/configz")
if !strings.Contains(data, "zoo") {
if !strings.Contains(data, snapshot.DefaultGroup) {
t.Fatalf("Node id should have been displayed: %q", data)
}
if !strings.Contains(data, "foo") || !strings.Contains(data, "bar") {
Expand Down Expand Up @@ -143,7 +143,7 @@ func testConfigJWithOneRequest(t *testing.T, baseURL string) {
t.Fatalf("Should have unmarshalled json: %v", err)
}

if m["ID"] != "zoo" {
if m["ID"] != snapshot.DefaultGroup {
t.Fatalf("Should have contained id: %v", data)
}

Expand Down
72 changes: 43 additions & 29 deletions pkg/mcp/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,35 @@ type Snapshot interface {
}

// Cache is a snapshot-based cache that maintains a single versioned
// snapshot of responses per client. Cache consistently replies with the
// snapshot of responses per group of clients. Cache consistently replies with the
// latest snapshot.
type Cache struct {
mu sync.RWMutex
snapshots map[string]Snapshot
status map[string]*StatusInfo
watchCount int64

groupIndex GroupIndexFn
}

// GroupIndexFn returns a stable group index for the given MCP client.
type GroupIndexFn func(client *mcp.Client) string

// DefaultGroup is the default group when using the DefaultGroupIndex() function.
const DefaultGroup = "default"

// DefaultGroupIndex provides a default GroupIndexFn function that
// is usable for testing and simple deployments.
func DefaultGroupIndex(_ *mcp.Client) string {
return DefaultGroup
}

// New creates a new cache of resource snapshots.
func New() *Cache {
func New(groupIndex GroupIndexFn) *Cache {
return &Cache{
snapshots: make(map[string]Snapshot),
status: make(map[string]*StatusInfo),
snapshots: make(map[string]Snapshot),
status: make(map[string]*StatusInfo),
groupIndex: groupIndex,
}
}

Expand Down Expand Up @@ -82,19 +97,18 @@ func (si *StatusInfo) LastWatchRequestTime() time.Time {

// Watch returns a watch for an MCP request.
func (c *Cache) Watch(request *mcp.MeshConfigRequest, responseC chan<- *server.WatchResponse) (*server.WatchResponse, server.CancelWatchFunc) { // nolint: lll
// TODO(ayj) - use hash of clients's ID to index map.
nodeID := request.Client.GetId()
group := c.groupIndex(request.Client)

c.mu.Lock()
defer c.mu.Unlock()

info, ok := c.status[nodeID]
info, ok := c.status[group]
if !ok {
info = &StatusInfo{
client: request.Client,
watches: make(map[int64]*responseWatch),
}
c.status[nodeID] = info
c.status[group] = info
}

// update last responseWatch request time
Expand All @@ -104,11 +118,11 @@ func (c *Cache) Watch(request *mcp.MeshConfigRequest, responseC chan<- *server.W

// return an immediate response if a snapshot is available and the
// requested version doesn't match.
if snapshot, ok := c.snapshots[nodeID]; ok {
if snapshot, ok := c.snapshots[group]; ok {
version := snapshot.Version(request.TypeUrl)
scope.Debugf("Found snapshot for node: %q, with version: %q", nodeID, version)
scope.Debugf("Found snapshot for group: %q, with version: %q", group, version)
if version != request.VersionInfo {
scope.Debugf("Responding to node %q with snapshot:\n%v\n", nodeID, snapshot)
scope.Debugf("Responding to group %q with snapshot:\n%v\n", group, snapshot)
response := &server.WatchResponse{
TypeURL: request.TypeUrl,
Version: version,
Expand All @@ -122,8 +136,8 @@ func (c *Cache) Watch(request *mcp.MeshConfigRequest, responseC chan<- *server.W
c.watchCount++
watchID := c.watchCount

log.Infof("Watch(): created watch %d for %s from nodeID %q, version %q",
watchID, request.TypeUrl, nodeID, request.VersionInfo)
log.Infof("Watch(): created watch %d for %s from group %q, version %q",
watchID, request.TypeUrl, group, request.VersionInfo)

info.mu.Lock()
info.watches[watchID] = &responseWatch{request: request, responseC: responseC}
Expand All @@ -132,7 +146,7 @@ func (c *Cache) Watch(request *mcp.MeshConfigRequest, responseC chan<- *server.W
cancel := func() {
c.mu.Lock()
defer c.mu.Unlock()
if info, ok := c.status[nodeID]; ok {
if info, ok := c.status[group]; ok {
info.mu.Lock()
delete(info.watches, watchID)
info.mu.Unlock()
Expand All @@ -141,16 +155,16 @@ func (c *Cache) Watch(request *mcp.MeshConfigRequest, responseC chan<- *server.W
return nil, cancel
}

// SetSnapshot updates a snapshot for a client.
func (c *Cache) SetSnapshot(node string, snapshot Snapshot) {
// SetSnapshot updates a snapshot for a group.
func (c *Cache) SetSnapshot(group string, snapshot Snapshot) {
c.mu.Lock()
defer c.mu.Unlock()

// update the existing entry
c.snapshots[node] = snapshot
c.snapshots[group] = snapshot

// trigger existing watches for which version changed
if info, ok := c.status[node]; ok {
if info, ok := c.status[group]; ok {
info.mu.Lock()
for id, watch := range info.watches {
version := snapshot.Version(watch.request.TypeUrl)
Expand All @@ -174,37 +188,37 @@ func (c *Cache) SetSnapshot(node string, snapshot Snapshot) {
}
}

// ClearSnapshot clears snapshot for a client. This does not cancel any open
// ClearSnapshot clears snapshot for a group. This does not cancel any open
// watches already created (see ClearStatus).
func (c *Cache) ClearSnapshot(node string) {
func (c *Cache) ClearSnapshot(group string) {
c.mu.Lock()
defer c.mu.Unlock()

delete(c.snapshots, node)
delete(c.snapshots, group)
}

// ClearStatus clears status for a client. This has the effect of canceling
// any open watches opened against this client info.
func (c *Cache) ClearStatus(node string) {
// ClearStatus clears status for a group. This has the effect of canceling
// any open watches opened against this group info.
func (c *Cache) ClearStatus(group string) {
c.mu.Lock()
defer c.mu.Unlock()

if info, ok := c.status[node]; ok {
if info, ok := c.status[group]; ok {
info.mu.Lock()
for _, watch := range info.watches {
// response channel may be shared
watch.responseC <- nil
}
info.mu.Unlock()
}
delete(c.status, node)
delete(c.status, group)
}

// Status returns informational status for a client.
func (c *Cache) Status(node string) *StatusInfo {
// Status returns informational status for a group.
func (c *Cache) Status(group string) *StatusInfo {
c.mu.RLock()
defer c.mu.RUnlock()
if info, ok := c.status[node]; ok {
if info, ok := c.status[group]; ok {
return info
}
return nil
Expand Down
Loading

0 comments on commit 285070b

Please sign in to comment.