Skip to content

Commit

Permalink
Merge pull request #14 from yunify/metadata_permision
Browse files Browse the repository at this point in the history
Metadata access rule
  • Loading branch information
jolestar authored Jun 8, 2017
2 parents acbd7e4 + 40c5fed commit 329e0bd
Show file tree
Hide file tree
Showing 20 changed files with 1,342 additions and 293 deletions.
5 changes: 5 additions & 0 deletions backends/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type StoreClient interface {
PutMapping(nodePath string, mapping interface{}, replace bool) error
DeleteMapping(nodePath string, dir bool) error
SyncMapping(mapping store.Store, stopChan chan bool)

GetAccessRule() (map[string][]store.AccessRule, error)
PutAccessRule(rules map[string][]store.AccessRule) error
DeleteAccessRule(hosts []string) error
SyncAccessRule(accessStore store.AccessStore, stopChan chan bool)
}

// New is used to create a storage client based on our configuration.
Expand Down
78 changes: 78 additions & 0 deletions backends/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,84 @@ func TestMappingSync(t *testing.T) {
}
}

func TestAccessRule(t *testing.T) {
for _, backend := range backendNodes {
stopChan := make(chan bool)
defer func() {
stopChan <- true
}()
storeClient := NewTestClient(backend)

accessStore := store.NewAccessStore()
storeClient.SyncAccessRule(accessStore, stopChan)

rules := map[string][]store.AccessRule{
"192.168.1.1": {
{Path: "/clusters", Mode: store.AccessModeForbidden},
{Path: "/clusters/cl-1", Mode: store.AccessModeRead},
},
"192.168.1.2": {
{Path: "/clusters", Mode: store.AccessModeForbidden},
{Path: "/clusters/cl-2", Mode: store.AccessModeRead},
},
}
var rulesGet map[string][]store.AccessRule
err := storeClient.PutAccessRule(rules)
assert.NoError(t, err)

rulesGet, err = storeClient.GetAccessRule()
assert.NoError(t, err)
assert.Equal(t, rules, rulesGet)

time.Sleep(1000 * time.Millisecond)
assert.NotNil(t, accessStore.Get("192.168.1.1"))

err = storeClient.DeleteAccessRule([]string{"192.168.1.2"})
assert.NoError(t, err)

rulesGet, err = storeClient.GetAccessRule()
assert.NoError(t, err)
_, ok := rulesGet["192.168.1.2"]
assert.False(t, ok)

rules2 := map[string][]store.AccessRule{
"192.168.1.3": {
{Path: "/clusters", Mode: store.AccessModeForbidden},
{Path: "/clusters/cl-3", Mode: store.AccessModeRead},
},
}
err = storeClient.PutAccessRule(rules2)
assert.NoError(t, err)

time.Sleep(1000 * time.Millisecond)

assert.Nil(t, accessStore.Get("192.168.1.2"))
assert.NotNil(t, accessStore.Get("192.168.1.3"))

err = storeClient.DeleteAccessRule([]string{"192.168.1.1", "192.168.1.2", "192.168.1.3"})
assert.NoError(t, err)
}
}

func NewTestClient(backend string) StoreClient {
prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000))
group := fmt.Sprintf("/group%v", rand.Intn(1000))
println("Test backend: ", backend)
nodes := GetDefaultBackends(backend)

config := Config{
Backend: backend,
BackendNodes: nodes,
Prefix: prefix,
Group: group,
}
storeClient, err := New(config)
if err != nil {
panic(err)
}
return storeClient
}

func FillTestData(storeClient StoreClient) map[string]string {
testData := make(map[string]interface{})
for i := 0; i < 5; i++ {
Expand Down
172 changes: 133 additions & 39 deletions backends/etcdv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"path"
"reflect"
"strings"
"sync"
"time"
)

const SELF_MAPPING_PATH = "/_metad/mapping"
const RULE_PATH = "/_metad/rule"

var (
//see github.com/coreos/etcd/etcdserver/api/v3rpc/key.go
Expand All @@ -30,6 +32,7 @@ type Client struct {
client *client.Client
prefix string
mappingPrefix string
rulePrefix string
}

// NewEtcdClient returns an *etcd.Client with a connection to named machines.
Expand Down Expand Up @@ -79,7 +82,7 @@ func NewEtcdClient(group string, prefix string, machines []string, cert, key, ca
if err != nil {
return nil, err
}
return &Client{c, prefix, path.Join(SELF_MAPPING_PATH, group)}, nil
return &Client{c, prefix, path.Join(SELF_MAPPING_PATH, group), path.Join(RULE_PATH, group)}, nil
}

// Get queries etcd for nodePath.
Expand All @@ -104,9 +107,10 @@ func (c *Client) Delete(nodePath string, dir bool) error {
}

func (c *Client) Sync(store store.Store, stopChan chan bool) {
startedChan := make(chan bool)
go c.internalSync(c.prefix, store, stopChan, startedChan)
<-startedChan
initWG := &sync.WaitGroup{}
initWG.Add(1)
go c.internalSync(c.prefix, stopChan, initWG, c.newInitStoreFunc(c.prefix, store), newProcessSyncChangeFunc(store))
initWG.Wait()
}

func (c *Client) GetMapping(nodePath string, dir bool) (interface{}, error) {
Expand All @@ -132,9 +136,77 @@ func (c *Client) DeleteMapping(nodePath string, dir bool) error {
}

func (c *Client) SyncMapping(mapping store.Store, stopChan chan bool) {
startedChan := make(chan bool)
go c.internalSync(c.mappingPrefix, mapping, stopChan, startedChan)
<-startedChan
initWG := &sync.WaitGroup{}
initWG.Add(1)
go c.internalSync(c.mappingPrefix, stopChan, initWG, c.newInitStoreFunc(c.mappingPrefix, mapping), newProcessSyncChangeFunc(mapping))
initWG.Wait()
}

func (c *Client) GetAccessRule() (map[string][]store.AccessRule, error) {
result := make(map[string][]store.AccessRule)
m, err := c.internalGets(c.rulePrefix, "/")
if err != nil {
return nil, err
}
for k, v := range m {
rules, err := store.UnmarshalAccessRule(v)
if err != nil {
log.Error("Unexpect rule json value in etcd [%s]", v)
continue
}
_, host := path.Split(k)
result[host] = rules
}
return result, nil
}

func (c *Client) PutAccessRule(rules map[string][]store.AccessRule) error {
values := make(map[string]string, len(rules))
for k, v := range rules {
values[k] = store.MarshalAccessRule(v)
}
return c.internalPutValues(c.rulePrefix, "/", values, false)
}

func (c *Client) DeleteAccessRule(hosts []string) error {
for _, host := range hosts {
if host == "" {
continue
}
err := c.internalDelete(c.rulePrefix, path.Join("/", host), false)
if err != nil {
return err
}
}
return nil
}

func (c *Client) SyncAccessRule(accessStore store.AccessStore, stopChan chan bool) {
initWG := &sync.WaitGroup{}
initWG.Add(1)
go c.internalSync(c.rulePrefix, stopChan, initWG, func() error {
val, err := c.GetAccessRule()
if err != nil {
return err
}
accessStore.Puts(val)
return nil
}, func(event *client.Event, nodePath, value string) {
_, host := path.Split(nodePath)
switch event.Type {
case mvccpb.PUT:
rules, err := store.UnmarshalAccessRule(value)
if err != nil {
log.Error("Unexpect rule json value in etcd [%s]", value)
}
accessStore.Put(host, rules)
case mvccpb.DELETE:
accessStore.Delete(host)
default:
log.Warning("Unknow watch event type: %s ", event.Type)
}
})
initWG.Wait()
}

func (c *Client) internalGets(prefix, nodePath string) (map[string]string, error) {
Expand Down Expand Up @@ -182,66 +254,88 @@ func handleGetResp(prefix string, resp *client.GetResponse, vars map[string]stri
return nil
}

func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bool, startedChan chan bool) {

func (c *Client) internalSync(prefix string, stopChan chan bool, initWG *sync.WaitGroup, initStoreFunc func() error, processChangeFunc func(event *client.Event, nodePath, value string)) {
var rev int64 = 0
init := false
stop := false
cancelRoutine := make(chan bool)
defer close(cancelRoutine)

for {
var ctx context.Context
var cancel context.CancelFunc

go func() {
select {
case <-stopChan:
log.Info("Sync %s stop.", prefix)
stop = true
if cancel != nil {
cancel()
}
case <-cancelRoutine:
return
default:
}
}()

ctx, cancel := context.WithCancel(context.Background())
watchChan := c.client.Watch(ctx, prefix, client.WithPrefix(), client.WithRev(rev))

go func() {
select {
case <-stopChan:
log.Info("Sync %s stop.", prefix)
cancel()
cancelRoutine <- true
for {
if stop {
if !init {
initWG.Done()
}
}()

return
}
ctx, cancel = context.WithCancel(context.Background())
watchChan := c.client.Watch(ctx, prefix, client.WithPrefix(), client.WithRev(rev))
if watchChan == nil {
continue
}
for !init {
val, err := c.internalGets(prefix, "/")
if stop {
initWG.Done()
return
}
err := initStoreFunc()
if err != nil {
log.Error("GetValue from etcd nodePath:%s, error-type: %s, error: %s", prefix, reflect.TypeOf(err), err.Error())
log.Error("Get init value from etcd nodePath:%s, error-type: %s, error: %s", prefix, reflect.TypeOf(err), err.Error())
time.Sleep(time.Duration(1000) * time.Millisecond)
log.Info("Init store for prefix %s fail, retry.", prefix)
continue
}
store.PutBulk("/", val)
log.Info("Init store for prefix %s success.", prefix)
init = true
go func() {
startedChan <- true
}()
initWG.Done()
}
for resp := range watchChan {
processSyncChange(prefix, store, &resp)
for _, event := range resp.Events {
nodePath := string(event.Kv.Key)
// avoid sync mapping config as metadata when prefix is "/"
if (prefix == "" || prefix == "/") && (strings.HasPrefix(nodePath, SELF_MAPPING_PATH) || strings.HasPrefix(nodePath, RULE_PATH)) {
continue
}

nodePath = util.TrimPathPrefix(nodePath, prefix)
value := string(event.Kv.Value)
log.Debug("process sync change, event_type: %s, prefix: %v, nodePath:%v, value: %v ", event.Type, prefix, nodePath, value)
processChangeFunc(event, nodePath, value)
}
rev = resp.Header.Revision
}
}
}

func processSyncChange(prefix string, store store.Store, resp *client.WatchResponse) {
for _, event := range resp.Events {
nodePath := string(event.Kv.Key)

// avoid sync mapping config as metadata when prefix is "/"
if (prefix == "" || prefix == "/") && strings.HasPrefix(nodePath, SELF_MAPPING_PATH) {
continue
func (c *Client) newInitStoreFunc(prefix string, store store.Store) func() error {
return func() error {
val, err := c.internalGets(prefix, "/")
if err != nil {
return err
}
store.PutBulk("/", val)
return nil
}
}

nodePath = util.TrimPathPrefix(nodePath, prefix)
value := string(event.Kv.Value)
log.Debug("process sync change, event_type: %s, prefix: %v, nodePath:%v, value: %v ", event.Type, prefix, nodePath, value)
func newProcessSyncChangeFunc(store store.Store) func(event *client.Event, nodePath, value string) {
return func(event *client.Event, nodePath, value string) {
switch event.Type {
case mvccpb.PUT:
store.Put(nodePath, value)
Expand Down
Loading

0 comments on commit 329e0bd

Please sign in to comment.