This repository has been archived by the owner on Feb 9, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 92
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement new store API with k8s CRD backend (#1072)
Automatic merge from submit-queue Implement new store API with k8s CRD backend k8s custom resource definitions (CRDs) support is added although the use of CRDs are limited to the new API which isn't used in the actual code path yet. **Release note**: ```release-note NONE ```
- Loading branch information
1 parent
42cb484
commit 52d63b2
Showing
12 changed files
with
1,117 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | ||
|
||
go_library( | ||
name = "go_default_library", | ||
srcs = [ | ||
"queue.go", | ||
"resource.go", | ||
"store.go", | ||
], | ||
visibility = ["//visibility:public"], | ||
deps = [ | ||
"//pkg/config/store:go_default_library", | ||
"@com_github_gogo_protobuf//jsonpb:go_default_library", | ||
"@com_github_gogo_protobuf//proto:go_default_library", | ||
"@com_github_golang_glog//:go_default_library", | ||
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library", | ||
"@io_k8s_apimachinery//pkg/runtime:go_default_library", | ||
"@io_k8s_apimachinery//pkg/runtime/schema:go_default_library", | ||
"@io_k8s_apimachinery//pkg/runtime/serializer:go_default_library", | ||
"@io_k8s_client_go//discovery:go_default_library", | ||
"@io_k8s_client_go//dynamic:go_default_library", | ||
"@io_k8s_client_go//plugin/pkg/client/auth/gcp:go_default_library", | ||
"@io_k8s_client_go//plugin/pkg/client/auth/oidc:go_default_library", | ||
"@io_k8s_client_go//rest:go_default_library", | ||
"@io_k8s_client_go//tools/cache:go_default_library", | ||
"@io_k8s_client_go//tools/clientcmd:go_default_library", | ||
], | ||
) | ||
|
||
go_test( | ||
name = "go_default_test", | ||
size = "medium", | ||
srcs = [ | ||
"queue_test.go", | ||
"resource_test.go", | ||
"store_test.go", | ||
], | ||
data = ["//testdata/kubernetes:kubeconfig"], | ||
library = ":go_default_library", | ||
deps = [ | ||
"//pkg/config/proto:go_default_library", | ||
"//pkg/config/store:go_default_library", | ||
"@com_github_gogo_protobuf//proto:go_default_library", | ||
"@com_github_golang_glog//:go_default_library", | ||
"@io_k8s_api//core/v1:go_default_library", | ||
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library", | ||
"@io_k8s_apimachinery//pkg/apis/meta/v1/unstructured:go_default_library", | ||
"@io_k8s_apimachinery//pkg/runtime/schema:go_default_library", | ||
"@io_k8s_client_go//kubernetes:go_default_library", | ||
"@io_k8s_client_go//plugin/pkg/client/auth/gcp:go_default_library", | ||
"@io_k8s_client_go//plugin/pkg/client/auth/oidc:go_default_library", | ||
"@io_k8s_client_go//rest:go_default_library", | ||
"@io_k8s_client_go//tools/clientcmd:go_default_library", | ||
], | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
// Copyright 2017 Istio Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package crd | ||
|
||
import ( | ||
"context" | ||
|
||
"istio.io/mixer/pkg/config/store" | ||
) | ||
|
||
type eventQueue struct { | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
chout chan store.Event | ||
chin chan store.Event | ||
} | ||
|
||
func newQueue(ctx context.Context, cancel context.CancelFunc) *eventQueue { | ||
eq := &eventQueue{ | ||
ctx: ctx, | ||
cancel: cancel, | ||
chout: make(chan store.Event), | ||
chin: make(chan store.Event), | ||
} | ||
go eq.run() | ||
return eq | ||
} | ||
|
||
func (q *eventQueue) run() { | ||
loop: | ||
for { | ||
select { | ||
case <-q.ctx.Done(): | ||
break loop | ||
case ev := <-q.chin: | ||
evs := []store.Event{ev} | ||
for len(evs) > 0 { | ||
select { | ||
case <-q.ctx.Done(): | ||
break loop | ||
case ev := <-q.chin: | ||
evs = append(evs, ev) | ||
case q.chout <- evs[0]: | ||
evs = evs[1:] | ||
} | ||
} | ||
} | ||
} | ||
close(q.chout) | ||
} | ||
|
||
func (q *eventQueue) Send(t store.ChangeType, key store.Key) { | ||
select { | ||
case <-q.ctx.Done(): | ||
case q.chin <- store.Event{Key: key, Type: t}: | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
// Copyright 2017 Istio Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package crd | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"istio.io/mixer/pkg/config/store" | ||
) | ||
|
||
func TestQueue(t *testing.T) { | ||
count := 10 | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
q := newQueue(ctx, cancel) | ||
defer q.cancel() | ||
donec := make(chan struct{}) | ||
evs := []store.Event{} | ||
go func() { | ||
for ev := range q.chout { | ||
evs = append(evs, ev) | ||
if len(evs) >= count { | ||
break | ||
} | ||
} | ||
close(donec) | ||
}() | ||
for i := 0; i < count; i++ { | ||
q.Send(store.Update, store.Key{Kind: "kind", Namespace: "ns", Name: fmt.Sprintf("%d", i)}) | ||
} | ||
<-donec | ||
if len(evs) != count { | ||
t.Errorf("Got %d Want %d", len(evs), count) | ||
} | ||
for i, ev := range evs { | ||
if ev.Name != fmt.Sprintf("%d", i) { | ||
t.Errorf("%d: Got name %s Want %d", i, ev.Name, i) | ||
} | ||
} | ||
} | ||
|
||
func TestQueueSync(t *testing.T) { | ||
count := 10 | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
q := newQueue(ctx, cancel) | ||
defer q.cancel() | ||
for i := 0; i < count; i++ { | ||
q.Send(store.Update, store.Key{Kind: "kind", Namespace: "ns", Name: fmt.Sprintf("%d", i)}) | ||
} | ||
for i := 0; i < count; i++ { | ||
ev := <-q.chout | ||
if ev.Name != fmt.Sprintf("%d", i) { | ||
t.Errorf("Got name %s Want %d", ev.Name, i) | ||
} | ||
} | ||
} | ||
|
||
func TestQueueCancel(t *testing.T) { | ||
count := 10 | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(count)/2) | ||
q := newQueue(ctx, cancel) | ||
evs := []store.Event{} | ||
donec := make(chan struct{}) | ||
go func() { | ||
for ev := range q.chout { | ||
evs = append(evs, ev) | ||
} | ||
close(donec) | ||
}() | ||
for i := 0; i < count; i++ { | ||
q.Send(store.Update, store.Key{Kind: "kind", Namespace: "ns", Name: fmt.Sprintf("%d", i)}) | ||
time.Sleep(time.Millisecond) | ||
} | ||
<-donec | ||
if len(evs) > count/2 { | ||
t.Errorf("Got %d, Want <=%d", len(evs), count/2) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
// Copyright 2017 Istio Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package crd | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
|
||
"github.com/gogo/protobuf/jsonpb" | ||
"github.com/gogo/protobuf/proto" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
) | ||
|
||
//resource type represents the structure of a single custom resource. | ||
type resource struct { | ||
Kind string `json:"kind"` | ||
APIVersion string `json:"apiVersion"` | ||
metav1.ObjectMeta `json:"metadata,omitempty"` | ||
|
||
Spec map[string]interface{} `json:"spec,omitempty"` | ||
} | ||
|
||
func deepCopy(s interface{}) interface{} { | ||
switch x := s.(type) { | ||
case map[string]interface{}: | ||
clone := make(map[string]interface{}, len(x)) | ||
for k, v := range x { | ||
clone[k] = deepCopy(v) | ||
} | ||
return clone | ||
case []interface{}: | ||
clone := make([]interface{}, len(x)) | ||
for i, v := range x { | ||
clone[i] = deepCopy(v) | ||
} | ||
return clone | ||
default: | ||
return x | ||
} | ||
} | ||
|
||
func deepCopySpec(s1 map[string]interface{}, s2 map[string]interface{}) { | ||
for k, v := range s1 { | ||
s2[k] = deepCopy(v) | ||
} | ||
} | ||
|
||
// GetObjectKind implements runtime.Object interface. | ||
func (r *resource) GetObjectKind() schema.ObjectKind { | ||
return &metav1.TypeMeta{ | ||
Kind: r.Kind, | ||
APIVersion: apiVersion, | ||
} | ||
} | ||
|
||
// DeepCopyObject implements runtime.Object interface. | ||
func (r *resource) DeepCopyObject() runtime.Object { | ||
r2 := &resource{Kind: r.Kind} | ||
r.ObjectMeta.DeepCopyInto(&r2.ObjectMeta) | ||
deepCopySpec(r.Spec, r2.Spec) | ||
return r2 | ||
} | ||
|
||
// resourceList represents the data of listing custom resources. | ||
type resourceList struct { | ||
Kind string | ||
metav1.ListMeta `json:"metadata,omitempty"` | ||
Items []*resource `json:"items"` | ||
} | ||
|
||
// GetObjectKind implements runtime.Object interface. | ||
func (r *resourceList) GetObjectKind() schema.ObjectKind { | ||
return &metav1.TypeMeta{ | ||
Kind: r.Kind, | ||
APIVersion: apiVersion, | ||
} | ||
} | ||
|
||
// GetObjectKind implements runtime.Object interface. | ||
func (r *resourceList) DeepCopyObject() runtime.Object { | ||
r2 := &resourceList{ | ||
ListMeta: *r.ListMeta.DeepCopy(), | ||
Items: make([]*resource, len(r.Items)), | ||
} | ||
for i, item := range r.Items { | ||
r2.Items[i] = item.DeepCopyObject().(*resource) | ||
} | ||
return r2 | ||
} | ||
|
||
func convert(spec map[string]interface{}, pbSpec proto.Message) error { | ||
// This is inefficient; convert to a protobuf message through JSON. | ||
// TODO: use reflect. | ||
jsonData, err := json.Marshal(spec) | ||
if err != nil { | ||
return err | ||
} | ||
return jsonpb.Unmarshal(bytes.NewReader(jsonData), pbSpec) | ||
} | ||
|
||
func convertBack(pbSpec proto.Message, spec *map[string]interface{}) error { | ||
buf := bytes.NewBuffer(nil) | ||
if err := (&jsonpb.Marshaler{}).Marshal(buf, pbSpec); err != nil { | ||
return err | ||
} | ||
if err := json.Unmarshal(buf.Bytes(), spec); err != nil { | ||
return err | ||
} | ||
return nil | ||
} |
Oops, something went wrong.