Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add: kubernetes registry and remote package unit test #400

Merged
merged 66 commits into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
456f58f
Release 1.2.0
hxmhlt Oct 30, 2019
daa1fc3
Release 1.2.0
hxmhlt Oct 30, 2019
fe43eec
Merge branch 'master' into 1.2.0-release
hxmhlt Oct 30, 2019
11878c4
Release 1.2.0
hxmhlt Oct 30, 2019
55ab09e
Fix kubernetes import block and make map param
invalid-email-address Dec 20, 2019
32e5758
Fix the kubernetes && etcd registry race-condition
sxllwx Jan 13, 2020
723aa68
Fix bool value return
sxllwx Jan 13, 2020
2ec84e5
Delete the unused check block
sxllwx Jan 13, 2020
daac5e1
Add apache license
sxllwx Jan 13, 2020
347124d
Fix test embed etcd-server workdir
sxllwx Jan 13, 2020
460fed6
Delete the etcd test-server workdir after ut
sxllwx Jan 13, 2020
156bf40
Fix etcd work-dir conflict
sxllwx Jan 13, 2020
04ee311
fix latest issue
invalid-email-address Jan 18, 2020
5975ca0
Fix registry concurrent close panic
sxllwx Jan 19, 2020
00b1fb1
Merge pull request #289 from sxllwx/k8s
flycash Jan 24, 2020
7a71b3a
Merge develop branch
sxllwx Mar 13, 2020
4e5debc
Add ut for remote/kubernetes
sxllwx Mar 13, 2020
60eaf55
Delete unused method
sxllwx Mar 13, 2020
34ebc72
adapte for new registry
sxllwx Mar 13, 2020
29c8888
Add ut for registry/kubernetes
sxllwx Mar 13, 2020
f29c788
Fix ci client close race condition
sxllwx Mar 14, 2020
f11f6fb
Fix remote/kubernetes unit-test race condition
sxllwx Mar 14, 2020
e5c3ff5
Fix nil init
sxllwx Mar 14, 2020
9af6c53
Fix latest comment
sxllwx Mar 14, 2020
ae667e9
Fix latest alex comment
sxllwx Mar 14, 2020
059f9b8
Add double check for RWMutex
sxllwx Mar 14, 2020
d7ae998
Fix registry package unit test cover
sxllwx Mar 15, 2020
3349096
Add test cover for remote/kubernetes
sxllwx Mar 15, 2020
dfa8267
sync watch unit goroutine
sxllwx Mar 15, 2020
c7474fa
Fix method name bug,and handle the del event in config-listener.
sxllwx Mar 15, 2020
d776f8e
Fix remote/kubernetes sendMsg locker
sxllwx Mar 15, 2020
0886257
delete unused http pprof suite
sxllwx Mar 15, 2020
bf8bf85
Rename watcher and store name
sxllwx Mar 15, 2020
bc536bb
Fix named err
sxllwx Mar 15, 2020
7b13b44
Fix named err
sxllwx Mar 15, 2020
b9bf7d9
Fix time gap
sxllwx Mar 15, 2020
aee4f90
delete unused select case
sxllwx Mar 15, 2020
06b0da8
Add more rich log
sxllwx Mar 15, 2020
83d3975
move handle-client-restart from remote to registry
sxllwx Mar 15, 2020
603c1b9
Fix wg bug, add(1) out of goroutine
sxllwx Mar 15, 2020
8773a7d
Fix interest url slice-> map
sxllwx Mar 15, 2020
59fe063
Fix lock scop
sxllwx Mar 15, 2020
168a978
Fix kubernetes registry configListener nil condition
sxllwx Mar 15, 2020
92f9ea2
Fix zookeeper wg bug
sxllwx Mar 15, 2020
6b04b96
Fix create-path and push to test
sxllwx Mar 15, 2020
25f366c
Fix listener slice -> map
sxllwx Mar 15, 2020
5b87f8e
Fix missing protocol scheme bug
sxllwx Mar 15, 2020
b7af875
Fix nil point, the make slice will import a nil object in slice
sxllwx Mar 15, 2020
8941796
Merge pull request #1 from apache/develop
sxllwx Mar 15, 2020
1ed3e91
Fix go.sum conflict
sxllwx Mar 15, 2020
a47cf73
Fix go.sum conflict
sxllwx Mar 15, 2020
46f5c9a
Fix CHANGE.md
sxllwx Mar 15, 2020
d7a37ba
Fix CHANGE.md
sxllwx Mar 15, 2020
b88cf66
Mod: split long line codes
AlexStocks Mar 16, 2020
35624ed
Imp: set the init len for a map
AlexStocks Mar 16, 2020
eeaa817
Fix latest comment
sxllwx Mar 16, 2020
5a60988
Fix ci flow block issue
sxllwx Mar 16, 2020
14ba4ff
etcdv3 unit test, adapte for windows
sxllwx Mar 16, 2020
ab7db8a
simple the client validate method
sxllwx Mar 16, 2020
cef0d5d
short test time-cost for watch set
sxllwx Mar 16, 2020
938d5ed
Add more rich unit-test
sxllwx Mar 16, 2020
52a9e28
etcdv3 unit test, adapte for windows
sxllwx Mar 16, 2020
0769966
Fix ut nil pointer issue
sxllwx Mar 16, 2020
2eeb732
Fix kubernetes registry ut block issue
sxllwx Mar 16, 2020
164bf89
Fix kubernetes registry ut block issue
sxllwx Mar 16, 2020
e48b698
Add new registry block time, wait the watch groutine start
sxllwx Mar 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rename watcher and store name
  • Loading branch information
sxllwx committed Mar 15, 2020
commit bf8bf85dc37399a23d124a7e6809c5c76ee90d49
20 changes: 10 additions & 10 deletions remoting/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Client struct {
ns string

// the memory store
store Store
store WatcherSet

// protect the wg && currentPod
lock sync.RWMutex
Expand Down Expand Up @@ -132,7 +132,7 @@ func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Inte
ns: namespace,
rawClient: rawClient,
ctx: ctx,
store: newStore(ctx),
store: newWatcherSet(ctx),
cancel: cancel,
}

Expand Down Expand Up @@ -185,7 +185,7 @@ func newClient(namespace string) (*Client, error) {
cfg: cfg,
rawClient: rawClient,
ctx: ctx,
store: newStore(ctx),
store: newWatcherSet(ctx),
cancel: cancel,
}

Expand Down Expand Up @@ -415,10 +415,10 @@ func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {

// unmarshalRecord
// unmarshal the kubernetes dubbo annotation value
func (c *Client) unmarshalRecord(record string) ([]*Object, error) {
func (c *Client) unmarshalRecord(record string) ([]*WatcherEvent, error) {

if len(record) == 0 {
// []*Object is nil.
// []*WatcherEvent is nil.
return nil, nil
}

Expand All @@ -427,7 +427,7 @@ func (c *Client) unmarshalRecord(record string) ([]*Object, error) {
return nil, perrors.WithMessagef(err, "decode record (%s)", record)
}

var out []*Object
var out []*WatcherEvent
if err := json.Unmarshal(rawMsg, &out); err != nil {
return nil, perrors.WithMessage(err, "decode json")
}
Expand All @@ -436,7 +436,7 @@ func (c *Client) unmarshalRecord(record string) ([]*Object, error) {

// marshalRecord
// marshal the kubernetes dubbo annotation value
func (c *Client) marshalRecord(ol []*Object) (string, error) {
func (c *Client) marshalRecord(ol []*WatcherEvent) (string, error) {

msg, err := json.Marshal(ol)
if err != nil {
Expand Down Expand Up @@ -552,7 +552,7 @@ func (c *Client) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldP
return
}

newAnnotations, err := c.marshalRecord(append(al, &Object{Key: k, Value: v}))
newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v}))
if err != nil {
err = perrors.WithMessage(err, "marshal record")
return
Expand Down Expand Up @@ -605,7 +605,7 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) {

// Watch
// watch on spec key
func (c *Client) Watch(k string) (<-chan *Object, <-chan struct{}, error) {
func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) {

w, err := c.store.Watch(k, false)
if err != nil {
Expand All @@ -617,7 +617,7 @@ func (c *Client) Watch(k string) (<-chan *Object, <-chan struct{}, error) {

// Watch
// watch on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (<-chan *Object, <-chan struct{}, error) {
func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) {

w, err := c.store.Watch(prefix, true)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion remoting/kubernetes/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.

// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *Object, listeners ...remoting.DataListener) bool {
func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.DataListener) bool {

logger.Infof("got a kubernetes-store event {type: %d, key: %s}", event.EventType, event.Key)

Expand Down
86 changes: 44 additions & 42 deletions remoting/kubernetes/store.go → remoting/kubernetes/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"strconv"
"strings"
"sync"
)

import (
perrors "github.com/pkg/errors"
)

Expand Down Expand Up @@ -57,9 +59,9 @@ func (e eventType) String() string {
}
}

// Object
// WatcherEvent
// object is element in store
type Object struct {
type WatcherEvent struct {
// event-type
EventType eventType `json:"-"`
// the dubbo-go should consume the key
Expand All @@ -68,14 +70,14 @@ type Object struct {
Value string `json:"v"`
}

// Watchable Store
type Store interface {
// Watchable WatcherSet
type WatcherSet interface {

// put the object to the store
Put(object *Object) error
Put(object *WatcherEvent) error
// if prefix is false,
// the len([]*Object) == 1
Get(key string, prefix bool) ([]*Object, error)
// the len([]*WatcherEvent) == 1
Get(key string, prefix bool) ([]*WatcherEvent, error)
// watch the spec key or key prefix
Watch(key string, prefix bool) (Watcher, error)
// check the store status
Expand All @@ -87,15 +89,15 @@ type Watcher interface {
// the watcher's id
ID() string
// result stream
ResultChan() <-chan *Object
ResultChan() <-chan *WatcherEvent
// Stop the watcher
stop()
// check the watcher status
done() <-chan struct{}
}

// the store
type storeImpl struct {
type watcherSetImpl struct {

// Client's ctx, client die, the store will die too
ctx context.Context
Expand All @@ -104,15 +106,15 @@ type storeImpl struct {
lock sync.RWMutex

// the key is dubbo-go interest meta
cache map[string]*Object
cache map[string]*WatcherEvent

currentWatcherId uint64
watchers map[uint64]*watcher
}

// wait exit
// closeWatchers
// when the store was closed
func (s *storeImpl) waitExit() {
func (s *watcherSetImpl) closeWatchers() {

select {
case <-s.ctx.Done():
Expand All @@ -133,21 +135,21 @@ func (s *storeImpl) waitExit() {

// Watch
// watch on spec key, with or without prefix
func (s *storeImpl) Watch(key string, prefix bool) (Watcher, error) {
func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) {
return s.addWatcher(key, prefix)
}

// Done
// get the store status
func (s *storeImpl) Done() <-chan struct{} {
func (s *watcherSetImpl) Done() <-chan struct{} {
return s.ctx.Done()
}

// Put
// put the object to store
func (s *storeImpl) Put(object *Object) error {
func (s *watcherSetImpl) Put(object *WatcherEvent) error {

sendMsg := func(object *Object, w *watcher) {
sendMsg := func(object *WatcherEvent, w *watcher) {

select {
case <-w.done():
Expand Down Expand Up @@ -202,8 +204,7 @@ func (s *storeImpl) Put(object *Object) error {
}

// valid
// valid the client status should protected by lock
func (s *storeImpl) valid() error {
func (s *watcherSetImpl) valid() error {
select {
case <-s.ctx.Done():
return ErrStoreAlreadyStopped
Expand All @@ -213,34 +214,35 @@ func (s *storeImpl) valid() error {
}

// addWatcher
func (s *storeImpl) addWatcher(key string, prefix bool) (Watcher, error) {
func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) {

if err := s.valid(); err != nil {
return nil, err
}

s.lock.Lock()
defer s.lock.Unlock()

// increase the watcher-id
s.currentWatcherId++

w := &watcher{
id: s.currentWatcherId,
store: s,
interested: struct {
key string
prefix bool
}{key: key, prefix: prefix},
ch: make(chan *Object, defaultWatcherChanSize),
ch: make(chan *WatcherEvent, defaultWatcherChanSize),
exit: make(chan struct{}),
}

s.lock.Lock()
defer s.lock.Unlock()

if err := s.valid(); err != nil {
return nil, err
}

s.watchers[s.currentWatcherId] = w
w.id = s.currentWatcherId
s.currentWatcherId = s.currentWatcherId + 1
return w, nil
}

// Get
// get elements from cache
func (s *storeImpl) Get(key string, prefix bool) ([]*Object, error) {
func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) {

s.lock.RLock()
defer s.lock.RUnlock()
Expand All @@ -252,14 +254,14 @@ func (s *storeImpl) Get(key string, prefix bool) ([]*Object, error) {
if !prefix {
for k, v := range s.cache {
if k == key {
return []*Object{v}, nil
return []*WatcherEvent{v}, nil
}
}
// object
return nil, ErrKVPairNotFound
}

var out []*Object
var out []*WatcherEvent

for k, v := range s.cache {
if strings.Contains(k, key) {
Expand All @@ -279,21 +281,21 @@ type watcher struct {
id uint64

// the underlay store
store *storeImpl
store *watcherSetImpl

// the interest topic
interested struct {
key string
prefix bool
}
ch chan *Object
ch chan *WatcherEvent

closeOnce sync.Once
exit chan struct{}
}

// ResultChan
func (w *watcher) ResultChan() <-chan *Object {
func (w *watcher) ResultChan() <-chan *WatcherEvent {
return w.ch
}

Expand All @@ -319,14 +321,14 @@ func (w *watcher) done() <-chan struct{} {
return w.exit
}

// newStore
// new store from parent context
func newStore(ctx context.Context) Store {
s := &storeImpl{
// newWatcherSet
// new watcher set from parent context
func newWatcherSet(ctx context.Context) WatcherSet {
s := &watcherSetImpl{
ctx: ctx,
cache: map[string]*Object{},
cache: map[string]*WatcherEvent{},
watchers: map[uint64]*watcher{},
}
go s.waitExit()
go s.closeWatchers()
return s
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestStore(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

s := newStore(ctx)
s := newWatcherSet(ctx)

wg := sync.WaitGroup{}

Expand Down Expand Up @@ -82,7 +82,7 @@ func TestStore(t *testing.T) {

for i := 0; i < 5; i++ {
go func(i int) {
if err := s.Put(&Object{
if err := s.Put(&WatcherEvent{
Key: "key-" + strconv.Itoa(i),
Value: strconv.Itoa(i),
}); err != nil {
Expand Down