From 8905019728832f1859360fbb43624230f9e56f6e Mon Sep 17 00:00:00 2001 From: jolestar Date: Fri, 26 May 2017 23:03:31 +0800 Subject: [PATCH 01/11] remove hidden node support. --- store/node.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/store/node.go b/store/node.go index 5dca13f..5b4cf4f 100644 --- a/store/node.go +++ b/store/node.go @@ -78,15 +78,6 @@ func (n *node) IsRoot() bool { return n.parent == nil } -// IsHidden function checks if the node is a hidden node. A hidden node -// will begin with '_' -// A hidden node will not be shown via get command under a directory -// For example if we have /foo/_hidden and /foo/notHidden, get "/foo" -// will only return /foo/notHidden -func (n *node) IsHidden() bool { - return n.Name[0] == '_' -} - // IsDir function checks whether the node is a dir. func (n *node) IsDir() bool { return n.Children != nil @@ -243,10 +234,6 @@ func (n *node) GetValue() interface{} { if n.IsDir() { values := make(map[string]interface{}) for k, node := range n.Children { - //skip hidden node. - if node.IsHidden() { - continue - } v := node.GetValue() m, isMap := v.(map[string]interface{}) // skip empty dir. From 2ea0d13dc6a3787d8c0d215c98760ef161868296 Mon Sep 17 00:00:00 2001 From: jolestar Date: Fri, 26 May 2017 23:04:44 +0800 Subject: [PATCH 02/11] traveller support draft. --- store/store.go | 9 +- store/traveller.go | 219 ++++++++++++++++++++++++++++++++++++++++ store/traveller_test.go | 87 ++++++++++++++++ 3 files changed, 313 insertions(+), 2 deletions(-) create mode 100644 store/traveller.go create mode 100644 store/traveller_test.go diff --git a/store/store.go b/store/store.go index c25b40b..9eb3805 100644 --- a/store/store.go +++ b/store/store.go @@ -32,6 +32,8 @@ type Store interface { Version() int64 // Destroy the store Destroy() + // Traveller + Traveller(rules []AccessRule) Traveller } type store struct { @@ -179,6 +181,10 @@ func (s *store) Destroy() { s.Root = nil } +func (s *store) Traveller(rules []AccessRule) Traveller { + return newNodeTraveller(s, rules) +} + // walk walks all the nodePath and apply the walkFunc on each directory func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) *node) *node { components := strings.Split(nodePath, "/") @@ -188,9 +194,8 @@ func (s *store) walk(nodePath string, walkFunc func(prev *node, component string for i := 1; i < len(components); i++ { if len(components[i]) == 0 { // ignore empty string - return curr + continue } - curr = walkFunc(curr, components[i]) if curr == nil { return nil diff --git a/store/traveller.go b/store/traveller.go new file mode 100644 index 0000000..e961555 --- /dev/null +++ b/store/traveller.go @@ -0,0 +1,219 @@ +package store + +import ( + "encoding/json" + "strings" +) + +type AccessMode int +type EnterResult int + +const ( + AccessModeNil = AccessMode(-1) + AccessModeForbidden = AccessMode(0) + AccessModeRead = AccessMode(1) +) + +type AccessRule struct { + Path string + Mode AccessMode +} + +type Traveller interface { + // can Enter node + Enter(node string) bool + // Back to parent dir + Back() + + GetValue() interface{} +} + +type pathTree struct { + root *pathNode +} + +func (t *pathTree) Json() string { + b, _ := json.MarshalIndent(t.root, "", " ") + return string(b) +} + +type pathNode struct { + Name string + Mode AccessMode + parent *pathNode + Children []*pathNode +} + +func (n *pathNode) HasChildren() bool { + return len(n.Children) > 0 +} + +func (n *pathNode) GetChildren(name string, strict bool) *pathNode { + var wildcardNode *pathNode + for _, c := range n.Children { + if name == c.Name { + return c + } + if !strict && c.Name == "*" { + wildcardNode = c + } + } + return wildcardNode +} + +func newNodeTraveller(store *store, rules []AccessRule) Traveller { + return &nodeTraveller{store: store, tree: buildPathTree(rules)} +} + +func buildPathTree(rules []AccessRule) *pathTree { + root := &pathNode{ + Name: "/", + Mode: AccessModeNil, + parent: nil, + } + tree := &pathTree{ + root: root, + } + for _, rule := range rules { + p := rule.Path + curr := root + if p != "/" { + components := strings.Split(p, "/") + for _, component := range components { + if component == "" { + continue + } + child := curr.GetChildren(component, true) + if child == nil { + child = &pathNode{Name: component, Mode: AccessModeNil, parent: curr} + curr.Children = append(curr.Children, child) + } + curr = child + } + } + curr.Mode = rule.Mode + } + return tree +} + +type stackElement struct { + pathNode *pathNode + mode AccessMode +} + +type travellerStack struct { + backend []interface{} +} + +func (s *travellerStack) Push(v interface{}) { + s.backend = append(s.backend, v) +} + +func (s *travellerStack) Pop() interface{} { + l := len(s.backend) + if l == 0 { + return nil + } + e := s.backend[l-1] + s.backend = s.backend[:l-1] + return e +} + +type nodeTraveller struct { + store *store + tree *pathTree + currNode *node + currPathNode *pathNode + currMode AccessMode + stack travellerStack +} + +func (t *nodeTraveller) Enter(dir string) bool { + if dir == "/" { + if t.currNode == nil { + t.stack.Push(&stackElement{pathNode: t.currPathNode, mode: t.currMode}) + t.currNode = t.store.Root + t.currPathNode = t.tree.root + t.currMode = t.currPathNode.Mode + return true + } + return false + } + if t.currNode == nil { + return false + } + n := t.currNode.GetChild(dir) + if n == nil { + return false + } + //if !n.IsDir() { + // return EnterNotDir + //} + + var p *pathNode + if t.currPathNode != nil { + p = t.currPathNode.GetChildren(dir, false) + } + result := false + if p != nil { + // if p HasChildren, means exist other rule for future access + if p.HasChildren() || p.Mode >= AccessModeRead { + result = true + } + } else { + if t.currMode >= AccessModeRead { + result = true + } + } + + if result { + t.stack.Push(&stackElement{pathNode: t.currPathNode, mode: t.currMode}) + t.currNode = n + t.currPathNode = p + if t.currPathNode != nil && t.currPathNode.Mode != AccessModeNil { + t.currMode = t.currPathNode.Mode + } + + } + return result +} + +func (t *nodeTraveller) Back() { + if t.currNode == nil || t.currNode.IsRoot() { + panic("illegal status") + } + e := t.stack.Pop() + if e == nil { + panic("illegal status") + } + ele := e.(*stackElement) + t.currNode = t.currNode.parent + t.currMode = ele.mode + t.currPathNode = ele.pathNode +} + +func (t *nodeTraveller) GetValue() interface{} { + if t.currNode == nil { + panic("illegal status.") + } + if t.currNode.IsDir() { + values := make(map[string]interface{}) + for k, node := range t.currNode.Children { + eresult := t.Enter(node.Name) + if !eresult { + continue + } + v := t.GetValue() + t.Back() + m, isMap := v.(map[string]interface{}) + // skip empty dir. + if isMap && len(m) == 0 { + continue + } + values[k] = v + } + return values + } else { + return t.currNode.Value + } +} diff --git a/store/traveller_test.go b/store/traveller_test.go new file mode 100644 index 0000000..2c0af1b --- /dev/null +++ b/store/traveller_test.go @@ -0,0 +1,87 @@ +package store + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "testing" + //"encoding/json" +) + +func TestNodeTraveller(t *testing.T) { + s := New() + data := map[string]interface{}{ + "clusters": map[string]interface{}{ + "cl-1": map[string]interface{}{ + "env": map[string]interface{}{ + "name": "app1", + "secret": "123456", + }, + "public_key": "public_key_val", + }, + "cl-2": map[string]interface{}{ + "env": map[string]interface{}{ + "name": "app2", + "secret": "1234567", + }, + "public_key": "public_key_val2", + }, + }, + } + s.Put("/", data) + + accessRules := []AccessRule{ + { + Path: "/", + Mode: AccessModeForbidden, + }, + { + Path: "/clusters", + Mode: AccessModeRead, + }, + { + Path: "/clusters/*/env", + Mode: AccessModeForbidden, + }, + { + Path: "/clusters/cl-1", + Mode: AccessModeRead, + }, + } + traveller := s.Traveller(accessRules) + nodeTraveller := traveller.(*nodeTraveller) + fmt.Println(nodeTraveller.tree.Json()) + + assert.True(t, traveller.Enter("/")) + assert.True(t, traveller.Enter("clusters")) + assert.True(t, traveller.Enter("cl-1")) + assert.True(t, traveller.Enter("env")) + //assert.True(t, EnterForbidden, traveller.Enter("env")) + + traveller = s.Traveller(accessRules) + assert.True(t, traveller.Enter("/")) + assert.True(t, traveller.Enter("clusters")) + assert.True(t, traveller.Enter("cl-2")) + assert.False(t, traveller.Enter("env")) + + traveller = s.Traveller(accessRules) + assert.True(t, traveller.Enter("/")) + assert.True(t, traveller.Enter("clusters")) + assert.True(t, traveller.Enter("cl-2")) + assert.True(t, traveller.Enter("public_key")) + + traveller = s.Traveller(accessRules) + traveller.Enter("/") + traveller.Enter("clusters") + //traveller.Enter("cl-2") + v := traveller.GetValue() + //j,_ := json.MarshalIndent(v, "", " ") + //fmt.Printf("%s", string(j)) + mVal, ok := v.(map[string]interface{}) + assert.True(t, ok) + cl1 := mVal["cl-1"].(map[string]interface{}) + cl2 := mVal["cl-2"].(map[string]interface{}) + + envM := cl1["env"].(map[string]interface{}) + assert.Equal(t, 2, len(envM)) + assert.Nil(t, cl2["env"]) +} From b70688c267a8b0a241b5b7608f6378a365a736c4 Mon Sep 17 00:00:00 2001 From: jolestar Date: Sat, 27 May 2017 12:32:01 +0800 Subject: [PATCH 03/11] refactor store node traveller. --- store/store.go | 2 +- store/traveller.go | 166 +++++++++++++++++++++++----------------- store/traveller_test.go | 92 +++++++++++++++++----- 3 files changed, 169 insertions(+), 91 deletions(-) diff --git a/store/store.go b/store/store.go index 9eb3805..1c12ed9 100644 --- a/store/store.go +++ b/store/store.go @@ -182,7 +182,7 @@ func (s *store) Destroy() { } func (s *store) Traveller(rules []AccessRule) Traveller { - return newNodeTraveller(s, rules) + return newTraveller(s, rules) } // walk walks all the nodePath and apply the walkFunc on each directory diff --git a/store/traveller.go b/store/traveller.go index e961555..a804a97 100644 --- a/store/traveller.go +++ b/store/traveller.go @@ -20,36 +20,31 @@ type AccessRule struct { } type Traveller interface { - // can Enter node - Enter(node string) bool - // Back to parent dir + // Enter path's node, return is success. + Enter(path string) bool + // Back to parent node Back() - + // BackStep back multi step + BackStep(step int) + // Back to root node + BackToRoot() + // GetValue get current node value, if node is dir, will return a map contains children's value, otherwise return node.Value GetValue() interface{} } -type pathTree struct { - root *pathNode -} - -func (t *pathTree) Json() string { - b, _ := json.MarshalIndent(t.root, "", " ") - return string(b) -} - -type pathNode struct { +type accessNode struct { Name string Mode AccessMode - parent *pathNode - Children []*pathNode + parent *accessNode + Children []*accessNode } -func (n *pathNode) HasChildren() bool { +func (n *accessNode) HasChildren() bool { return len(n.Children) > 0 } -func (n *pathNode) GetChildren(name string, strict bool) *pathNode { - var wildcardNode *pathNode +func (n *accessNode) GetChildren(name string, strict bool) *accessNode { + var wildcardNode *accessNode for _, c := range n.Children { if name == c.Name { return c @@ -61,17 +56,22 @@ func (n *pathNode) GetChildren(name string, strict bool) *pathNode { return wildcardNode } -func newNodeTraveller(store *store, rules []AccessRule) Traveller { - return &nodeTraveller{store: store, tree: buildPathTree(rules)} +type accessTree struct { + root *accessNode +} + +func (t *accessTree) Json() string { + b, _ := json.MarshalIndent(t.root, "", " ") + return string(b) } -func buildPathTree(rules []AccessRule) *pathTree { - root := &pathNode{ +func newAccessTree(rules []AccessRule) *accessTree { + root := &accessNode{ Name: "/", Mode: AccessModeNil, parent: nil, } - tree := &pathTree{ + tree := &accessTree{ root: root, } for _, rule := range rules { @@ -85,7 +85,7 @@ func buildPathTree(rules []AccessRule) *pathTree { } child := curr.GetChildren(component, true) if child == nil { - child = &pathNode{Name: component, Mode: AccessModeNil, parent: curr} + child = &accessNode{Name: component, Mode: AccessModeNil, parent: curr} curr.Children = append(curr.Children, child) } curr = child @@ -97,19 +97,19 @@ func buildPathTree(rules []AccessRule) *pathTree { } type stackElement struct { - pathNode *pathNode - mode AccessMode + node *accessNode + mode AccessMode } type travellerStack struct { - backend []interface{} + backend []*stackElement } -func (s *travellerStack) Push(v interface{}) { +func (s *travellerStack) Push(v *stackElement) { s.backend = append(s.backend, v) } -func (s *travellerStack) Pop() interface{} { +func (s *travellerStack) Pop() *stackElement { l := len(s.backend) if l == 0 { return nil @@ -119,45 +119,60 @@ func (s *travellerStack) Pop() interface{} { return e } +func (s *travellerStack) Clean() { + s.backend = []*stackElement{} +} + type nodeTraveller struct { - store *store - tree *pathTree - currNode *node - currPathNode *pathNode - currMode AccessMode - stack travellerStack -} - -func (t *nodeTraveller) Enter(dir string) bool { - if dir == "/" { - if t.currNode == nil { - t.stack.Push(&stackElement{pathNode: t.currPathNode, mode: t.currMode}) - t.currNode = t.store.Root - t.currPathNode = t.tree.root - t.currMode = t.currPathNode.Mode - return true + store *store + access *accessTree + currNode *node + currAccessNode *accessNode + currMode AccessMode + stack travellerStack +} + +func newTraveller(store *store, rules []AccessRule) Traveller { + accessTree := newAccessTree(rules) + return &nodeTraveller{store: store, access: accessTree, currNode: store.Root, currAccessNode: accessTree.root, currMode: accessTree.root.Mode} +} + +func (t *nodeTraveller) Enter(path string) bool { + if path == "/" { + return t.enter(path) + } else { + components := strings.Split(path, "/") + step := 0 + for _, component := range components { + if component == "" { + continue + } + if !t.enter(component) { + t.BackStep(step) + return false + } + step = step + 1 } - return false + return true } - if t.currNode == nil { - return false +} + +func (t *nodeTraveller) enter(node string) bool { + if node == "/" { + return true } - n := t.currNode.GetChild(dir) + n := t.currNode.GetChild(node) if n == nil { return false } - //if !n.IsDir() { - // return EnterNotDir - //} - - var p *pathNode - if t.currPathNode != nil { - p = t.currPathNode.GetChildren(dir, false) + var an *accessNode + if t.currAccessNode != nil { + an = t.currAccessNode.GetChildren(node, false) } result := false - if p != nil { - // if p HasChildren, means exist other rule for future access - if p.HasChildren() || p.Mode >= AccessModeRead { + if an != nil { + // if an HasChildren, means exist other rule for future access + if an.HasChildren() || an.Mode >= AccessModeRead { result = true } } else { @@ -167,11 +182,11 @@ func (t *nodeTraveller) Enter(dir string) bool { } if result { - t.stack.Push(&stackElement{pathNode: t.currPathNode, mode: t.currMode}) + t.stack.Push(&stackElement{node: t.currAccessNode, mode: t.currMode}) t.currNode = n - t.currPathNode = p - if t.currPathNode != nil && t.currPathNode.Mode != AccessModeNil { - t.currMode = t.currPathNode.Mode + t.currAccessNode = an + if t.currAccessNode != nil && t.currAccessNode.Mode != AccessModeNil { + t.currMode = t.currAccessNode.Mode } } @@ -179,17 +194,29 @@ func (t *nodeTraveller) Enter(dir string) bool { } func (t *nodeTraveller) Back() { - if t.currNode == nil || t.currNode.IsRoot() { + if t.currNode.IsRoot() { panic("illegal status") } e := t.stack.Pop() if e == nil { panic("illegal status") } - ele := e.(*stackElement) t.currNode = t.currNode.parent - t.currMode = ele.mode - t.currPathNode = ele.pathNode + t.currMode = e.mode + t.currAccessNode = e.node +} + +func (t *nodeTraveller) BackStep(step int) { + for i := 0; i < step; i++ { + t.Back() + } +} + +func (t *nodeTraveller) BackToRoot() { + t.stack.Clean() + t.currNode = t.store.Root + t.currAccessNode = t.access.root + t.currMode = t.currAccessNode.Mode } func (t *nodeTraveller) GetValue() interface{} { @@ -199,8 +226,7 @@ func (t *nodeTraveller) GetValue() interface{} { if t.currNode.IsDir() { values := make(map[string]interface{}) for k, node := range t.currNode.Children { - eresult := t.Enter(node.Name) - if !eresult { + if !t.Enter(node.Name) { continue } v := t.GetValue() diff --git a/store/traveller_test.go b/store/traveller_test.go index 2c0af1b..90b91fc 100644 --- a/store/traveller_test.go +++ b/store/traveller_test.go @@ -7,7 +7,71 @@ import ( //"encoding/json" ) -func TestNodeTraveller(t *testing.T) { +func TestTravellerStack(t *testing.T) { + stack := &travellerStack{} + + assert.Nil(t, stack.Pop()) + + one := &stackElement{node: nil, mode: AccessModeNil} + two := &stackElement{node: nil, mode: AccessModeForbidden} + three := &stackElement{node: nil, mode: AccessModeRead} + stack.Push(one) + stack.Push(two) + stack.Push(three) + + assert.Equal(t, three, stack.Pop()) + assert.Equal(t, two, stack.Pop()) + assert.Equal(t, one, stack.Pop()) + + assert.Nil(t, stack.Pop()) +} + +func TestTravellerEnter(t *testing.T) { + s := New() + data := map[string]interface{}{ + "clusters": map[string]interface{}{ + "cl-1": map[string]interface{}{ + "env": map[string]interface{}{ + "name": "app1", + "secret": "123456", + }, + "public_key": "public_key_val", + }, + "cl-2": map[string]interface{}{ + "env": map[string]interface{}{ + "name": "app2", + "secret": "1234567", + }, + "public_key": "public_key_val2", + }, + }, + } + s.Put("/", data) + + accessRules := []AccessRule{ + { + Path: "/", + Mode: AccessModeRead, + }, + } + + traveller := s.Traveller(accessRules) + assert.True(t, traveller.Enter("/clusters")) + assert.True(t, traveller.Enter("/cl-1/env")) + assert.True(t, traveller.Enter("name")) + assert.Equal(t, "app1", traveller.GetValue()) + + traveller.BackToRoot() + assert.True(t, traveller.Enter("/clusters/cl-1/env/secret")) + traveller.BackStep(2) + assert.True(t, traveller.Enter("public_key")) + assert.Equal(t, "public_key_val", traveller.GetValue()) + + traveller.BackToRoot() + assert.True(t, traveller.Enter("/")) +} + +func TestTraveller(t *testing.T) { s := New() data := map[string]interface{}{ "clusters": map[string]interface{}{ @@ -49,29 +113,17 @@ func TestNodeTraveller(t *testing.T) { } traveller := s.Traveller(accessRules) nodeTraveller := traveller.(*nodeTraveller) - fmt.Println(nodeTraveller.tree.Json()) + fmt.Println(nodeTraveller.access.Json()) - assert.True(t, traveller.Enter("/")) - assert.True(t, traveller.Enter("clusters")) - assert.True(t, traveller.Enter("cl-1")) - assert.True(t, traveller.Enter("env")) - //assert.True(t, EnterForbidden, traveller.Enter("env")) + assert.True(t, traveller.Enter("/clusters/cl-1/env")) + traveller.BackToRoot() - traveller = s.Traveller(accessRules) - assert.True(t, traveller.Enter("/")) - assert.True(t, traveller.Enter("clusters")) - assert.True(t, traveller.Enter("cl-2")) - assert.False(t, traveller.Enter("env")) + assert.False(t, traveller.Enter("/clusters/cl-2/env")) + assert.True(t, traveller.Enter("/clusters/cl-2/public_key")) - traveller = s.Traveller(accessRules) - assert.True(t, traveller.Enter("/")) - assert.True(t, traveller.Enter("clusters")) - assert.True(t, traveller.Enter("cl-2")) - assert.True(t, traveller.Enter("public_key")) + traveller.BackToRoot() - traveller = s.Traveller(accessRules) - traveller.Enter("/") - traveller.Enter("clusters") + traveller.Enter("/clusters") //traveller.Enter("cl-2") v := traveller.GetValue() //j,_ := json.MarshalIndent(v, "", " ") From 76d22cc6866e3351c9cb9d862595cea1598b25cd Mon Sep 17 00:00:00 2001 From: jolestar Date: Sat, 27 May 2017 15:02:55 +0800 Subject: [PATCH 04/11] traveller lock store. --- store/traveller.go | 35 +++++++++++++++++++++++++++++++++++ store/traveller_test.go | 4 ++++ 2 files changed, 39 insertions(+) diff --git a/store/traveller.go b/store/traveller.go index a804a97..eb638c1 100644 --- a/store/traveller.go +++ b/store/traveller.go @@ -30,6 +30,10 @@ type Traveller interface { BackToRoot() // GetValue get current node value, if node is dir, will return a map contains children's value, otherwise return node.Value GetValue() interface{} + // Close release traveller + Close() + // GetVersion get store version. + GetVersion() int64 } type accessNode struct { @@ -134,10 +138,14 @@ type nodeTraveller struct { func newTraveller(store *store, rules []AccessRule) Traveller { accessTree := newAccessTree(rules) + store.worldLock.RLock() return &nodeTraveller{store: store, access: accessTree, currNode: store.Root, currAccessNode: accessTree.root, currMode: accessTree.root.Mode} } func (t *nodeTraveller) Enter(path string) bool { + if t.store == nil { + panic("illegal status: access a closed traveller.") + } if path == "/" { return t.enter(path) } else { @@ -194,6 +202,9 @@ func (t *nodeTraveller) enter(node string) bool { } func (t *nodeTraveller) Back() { + if t.store == nil { + panic("illegal status: access a closed traveller.") + } if t.currNode.IsRoot() { panic("illegal status") } @@ -213,6 +224,9 @@ func (t *nodeTraveller) BackStep(step int) { } func (t *nodeTraveller) BackToRoot() { + if t.store == nil { + panic("illegal status: access a closed traveller.") + } t.stack.Clean() t.currNode = t.store.Root t.currAccessNode = t.access.root @@ -220,6 +234,9 @@ func (t *nodeTraveller) BackToRoot() { } func (t *nodeTraveller) GetValue() interface{} { + if t.store == nil { + panic("illegal status: access a closed traveller.") + } if t.currNode == nil { panic("illegal status.") } @@ -243,3 +260,21 @@ func (t *nodeTraveller) GetValue() interface{} { return t.currNode.Value } } + +func (t *nodeTraveller) GetVersion() int64 { + if t.store == nil { + panic("illegal status: access a closed traveller.") + } + return t.store.Version() +} + +func (t *nodeTraveller) Close(){ + if t.store == nil { + panic("illegal status: access a closed traveller.") + } + t.store = nil + t.access = nil + t.currAccessNode = nil + t.currNode = nil + t.store.worldLock.RUnlock() +} \ No newline at end of file diff --git a/store/traveller_test.go b/store/traveller_test.go index 90b91fc..d0b0d1c 100644 --- a/store/traveller_test.go +++ b/store/traveller_test.go @@ -56,6 +56,7 @@ func TestTravellerEnter(t *testing.T) { } traveller := s.Traveller(accessRules) + defer traveller.Close() assert.True(t, traveller.Enter("/clusters")) assert.True(t, traveller.Enter("/cl-1/env")) assert.True(t, traveller.Enter("name")) @@ -69,6 +70,7 @@ func TestTravellerEnter(t *testing.T) { traveller.BackToRoot() assert.True(t, traveller.Enter("/")) + } func TestTraveller(t *testing.T) { @@ -112,6 +114,8 @@ func TestTraveller(t *testing.T) { }, } traveller := s.Traveller(accessRules) + defer traveller.Close() + nodeTraveller := traveller.(*nodeTraveller) fmt.Println(nodeTraveller.access.Json()) From 64031d0441d70e27845ff62c26897774d4f26037 Mon Sep 17 00:00:00 2001 From: jolestar Date: Sat, 27 May 2017 17:20:01 +0800 Subject: [PATCH 05/11] gofmt vendor --- vendor/github.com/Sirupsen/logrus/entry.go | 4 ++-- .../github.com/Sirupsen/logrus/entry_test.go | 22 +++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/vendor/github.com/Sirupsen/logrus/entry.go b/vendor/github.com/Sirupsen/logrus/entry.go index 2a98065..b066a30 100644 --- a/vendor/github.com/Sirupsen/logrus/entry.go +++ b/vendor/github.com/Sirupsen/logrus/entry.go @@ -32,8 +32,8 @@ func NewEntry(logger *Logger) *Entry { return &Entry{ Logger: logger, // Default is three fields, give a little extra room - Data: make(Fields, 5), - Level: logger.Level, + Data: make(Fields, 5), + Level: logger.Level, } } diff --git a/vendor/github.com/Sirupsen/logrus/entry_test.go b/vendor/github.com/Sirupsen/logrus/entry_test.go index f7de400..a963a78 100644 --- a/vendor/github.com/Sirupsen/logrus/entry_test.go +++ b/vendor/github.com/Sirupsen/logrus/entry_test.go @@ -53,15 +53,15 @@ func TestEntryPanicf(t *testing.T) { } func TestEntryLogLevel(t *testing.T) { - out := &bytes.Buffer{} - logger := New() - logger.Out = out - logger.Level = DebugLevel - entry := NewEntry(logger) - assert.Equal(t, DebugLevel, entry.Level) - entry.Level = WarnLevel - entry.Info("it should not be displayed") - assert.Equal(t, "", out.String()) - entry.Warn("it should be displayed") - assert.Contains(t, out.String(), "it should be displayed") + out := &bytes.Buffer{} + logger := New() + logger.Out = out + logger.Level = DebugLevel + entry := NewEntry(logger) + assert.Equal(t, DebugLevel, entry.Level) + entry.Level = WarnLevel + entry.Info("it should not be displayed") + assert.Equal(t, "", out.String()) + entry.Warn("it should be displayed") + assert.Contains(t, out.String(), "it should be displayed") } From 1fcee1c77059ecf7b8254ee908e7e12b77989b73 Mon Sep 17 00:00:00 2001 From: jolestar Date: Sat, 27 May 2017 18:26:55 +0800 Subject: [PATCH 06/11] add access controll to metad api. --- metad.go | 35 +++++++++- metad_test.go | 14 +++- metadata/metarepo.go | 134 +++++++++++++++++++++++++------------- metadata/metarepo_test.go | 42 +++++++----- store/access.go | 83 +++++++++++++++++++++++ store/store.go | 6 +- store/traveller.go | 100 +++------------------------- store/traveller_test.go | 4 +- 8 files changed, 258 insertions(+), 160 deletions(-) create mode 100644 store/access.go diff --git a/metad.go b/metad.go index fc0ce55..d50a7c2 100644 --- a/metad.go +++ b/metad.go @@ -23,6 +23,7 @@ import ( "github.com/yunify/metad/backends" "github.com/yunify/metad/log" "github.com/yunify/metad/metadata" + "github.com/yunify/metad/store" "github.com/yunify/metad/util/flatmap" yaml "gopkg.in/yaml.v2" ) @@ -84,7 +85,7 @@ func New(config *Config) (*Metad, error) { return nil, err } - metadataRepo := metadata.New(config.OnlySelf, storeClient) + metadataRepo := metadata.New(storeClient) return &Metad{config: config, metadataRepo: metadataRepo, router: mux.NewRouter(), manageRouter: mux.NewRouter()}, nil } @@ -138,6 +139,11 @@ func (m *Metad) initManageRouter() { data.HandleFunc("/{nodePath:.*}", m.manageWrapper(m.dataGet)).Methods("GET") data.HandleFunc("/{nodePath:.*}", m.manageWrapper(m.dataUpdate)).Methods("POST", "PUT") data.HandleFunc("/{nodePath:.*}", m.manageWrapper(m.dataDelete)).Methods("DELETE") + + rule := v1.PathPrefix("/rule").Subrouter() + rule.HandleFunc("/", m.manageWrapper(m.accessRuleGet)).Methods("GET") + rule.HandleFunc("/", m.manageWrapper(m.accessRuleUpdate)).Methods("POST", "PUT") + rule.HandleFunc("/", m.manageWrapper(m.accessRuleDelete)).Methods("DELETE") } func (m *Metad) Serve() { @@ -299,6 +305,33 @@ func (m *Metad) mappingDelete(ctx context.Context, req *http.Request) (interface } } +func (m *Metad) accessRuleGet(ctx context.Context, req *http.Request) (interface{}, *HttpError) { + return nil, nil +} + +func (m *Metad) accessRuleUpdate(ctx context.Context, req *http.Request) (interface{}, *HttpError) { + decoder := json.NewDecoder(req.Body) + var data map[string][]store.AccessRule + err := decoder.Decode(&data) + if err != nil { + return nil, NewHttpError(http.StatusBadRequest, fmt.Sprintf("invalid json format, error:%s", err.Error())) + } else { + err = m.metadataRepo.PutAccessRule(data) + if err != nil { + if log.IsDebugEnable() { + log.Debug("accessRuleUpdate data:%v, error:%s", data, err.Error()) + } + return nil, NewServerError(err) + } else { + return nil, nil + } + } +} + +func (m *Metad) accessRuleDelete(ctx context.Context, req *http.Request) (interface{}, *HttpError) { + return nil, nil +} + func contentType(req *http.Request) int { str := httputil.NegotiateContentType(req, []string{ "text/plain", diff --git a/metad_test.go b/metad_test.go index 611bc8c..7a68568 100644 --- a/metad_test.go +++ b/metad_test.go @@ -211,6 +211,8 @@ func TestMetadWatch(t *testing.T) { metad.Init() defer metad.Stop() + ip := "192.168.1.1" + remoteAddr := ip + ":1234" dataJson := ` { @@ -230,12 +232,21 @@ func TestMetadWatch(t *testing.T) { metad.manageRouter.ServeHTTP(w, req) assert.Equal(t, 200, w.Code) + ruleJson := ` + {"192.168.1.1":[{"path":"/","mode":1}] + } + ` + req = httptest.NewRequest("PUT", "/v1/rule/", strings.NewReader(ruleJson)) + w = httptest.NewRecorder() + metad.manageRouter.ServeHTTP(w, req) + assert.Equal(t, 200, w.Code) + time.Sleep(sleepTime) versions := make(chan int, 1) go func() { req := httptest.NewRequest("GET", "/nodes/1/ip?wait=true", nil) req.Header.Set("accept", "application/json") - + req.RemoteAddr = remoteAddr w := httptest.NewRecorder() metad.router.ServeHTTP(w, req) assert.Equal(t, 200, w.Code) @@ -262,6 +273,7 @@ func TestMetadWatch(t *testing.T) { //wait with prev_version should return immediately req = httptest.NewRequest("GET", fmt.Sprintf("/nodes/1/ip?wait=true&prev_version=%d", v), nil) req.Header.Set("accept", "application/json") + req.RemoteAddr = remoteAddr w = httptest.NewRecorder() metad.router.ServeHTTP(w, req) assert.Equal(t, 200, w.Code) diff --git a/metadata/metarepo.go b/metadata/metarepo.go index aa9298c..6f76eab 100644 --- a/metadata/metarepo.go +++ b/metadata/metarepo.go @@ -19,21 +19,21 @@ import ( const DEFAULT_WATCH_BUF_LEN = 100 type MetadataRepo struct { - onlySelf bool mapping store.Store storeClient backends.StoreClient data store.Store + accessTrees map[string]*store.AccessTree metaStopChan chan bool mappingStopChan chan bool timerPool *util.TimerPool } -func New(onlySelf bool, storeClient backends.StoreClient) *MetadataRepo { +func New(storeClient backends.StoreClient) *MetadataRepo { metadataRepo := MetadataRepo{ - onlySelf: onlySelf, mapping: store.New(), storeClient: storeClient, data: store.New(), + accessTrees: make(map[string]*store.AccessTree), metaStopChan: make(chan bool), mappingStopChan: make(chan bool), timerPool: util.NewTimerPool(100 * time.Millisecond), @@ -41,10 +41,6 @@ func New(onlySelf bool, storeClient backends.StoreClient) *MetadataRepo { return &metadataRepo } -func (r *MetadataRepo) SetOnlySelf(onlySelf bool) { - r.onlySelf = onlySelf -} - func (r *MetadataRepo) StartSync() { log.Info("Start Sync") r.startMetaSync() @@ -69,28 +65,55 @@ func (r *MetadataRepo) StopSync() { r.mapping.Destroy() } +func (r *MetadataRepo) getAccessTree(clientIP string) *store.AccessTree { + accessTree := r.accessTrees[clientIP] + //for compatible with old version, auto convert mapping to AccessRule + if accessTree == nil { + mappingData := r.GetMapping(path.Join("/", clientIP)) + if mappingData == nil { + if log.IsDebugEnable() { + log.Debug("Can not find mapping for %s", clientIP) + } + return nil + } + mapping, mok := mappingData.(map[string]interface{}) + if !mok { + log.Warning("Mapping for %s is not a map, result:%v", clientIP, mappingData) + return nil + } + flattenMapping := flatmap.Flatten(mapping) + rules := []store.AccessRule{} + for _, dataPath := range flattenMapping { + rules = append(rules, store.AccessRule{Path: dataPath, Mode: store.AccessModeRead}) + } + accessTree = store.NewAccessTree(rules) + } + return accessTree +} + func (r *MetadataRepo) Root(clientIP string, nodePath string) (currentVersion int64, val interface{}) { + if clientIP == "" { + panic(errors.New("clientIP must not be empty.")) + } nodePath = path.Join("/", nodePath) - if r.onlySelf { - currentVersion = r.data.Version() - if nodePath == "/" { - mapVal := make(map[string]interface{}) - selfVal := r.Self(clientIP, "/") - if selfVal != nil { + accessTree := r.getAccessTree(clientIP) + if accessTree == nil { + return + } + traveller := r.data.Traveller(accessTree) + defer traveller.Close() + if !traveller.Enter(nodePath) { + return + } + currentVersion = traveller.GetVersion() + val = traveller.GetValue() + if val != nil && nodePath == "/" { + selfVal := r.self(clientIP, "/", traveller) + if selfVal != nil { + mapVal, ok := val.(map[string]interface{}) + if ok { mapVal["self"] = selfVal } - val = mapVal - } - } else { - currentVersion, val = r.data.Get(nodePath) - if val != nil && nodePath == "/" { - selfVal := r.Self(clientIP, "/") - if selfVal != nil { - mapVal, ok := val.(map[string]interface{}) - if ok { - mapVal["self"] = selfVal - } - } } } return @@ -98,17 +121,8 @@ func (r *MetadataRepo) Root(clientIP string, nodePath string) (currentVersion in func (r *MetadataRepo) Watch(ctx context.Context, clientIP string, nodePath string) interface{} { nodePath = path.Join("/", nodePath) - - if r.onlySelf { - if nodePath == "/" { - return r.WatchSelf(ctx, clientIP, "/") - } else { - return nil - } - } else { - w := r.data.Watch(nodePath, DEFAULT_WATCH_BUF_LEN) - return r.changeToResult(w, ctx.Done()) - } + w := r.data.Watch(nodePath, DEFAULT_WATCH_BUF_LEN) + return r.changeToResult(w, ctx.Done()) } var TIMER_NIL *time.Timer = &time.Timer{C: nil} @@ -203,6 +217,17 @@ func (r *MetadataRepo) Self(clientIP string, nodePath string) interface{} { panic(errors.New("clientIP must not be empty.")) } nodePath = path.Join("/", nodePath) + + accessTree := r.getAccessTree(clientIP) + if accessTree == nil { + return nil + } + traveller := r.data.Traveller(accessTree) + defer traveller.Close() + return r.self(clientIP, nodePath, traveller) +} + +func (r *MetadataRepo) self(clientIP string, nodePath string, traveller store.Traveller) interface{} { mappingData := r.GetMapping(path.Join("/", clientIP)) if mappingData == nil { if log.IsDebugEnable() { @@ -215,16 +240,20 @@ func (r *MetadataRepo) Self(clientIP string, nodePath string) interface{} { log.Warning("Mapping for %s is not a map, result:%v", clientIP, mappingData) return nil } - return r.getMappingDatas(nodePath, mapping) + return r.getMappingDatas(nodePath, mapping, traveller) } -func (r *MetadataRepo) getMappingData(nodePath, link string) interface{} { +func (r *MetadataRepo) getMappingData(nodePath, link string, traveller store.Traveller) interface{} { nodePath = path.Join(link, nodePath) - _, val := r.data.Get(nodePath) - return val + if traveller.Enter(nodePath) { + val := traveller.GetValue() + traveller.BackToRoot() + return val + } + return nil } -func (r *MetadataRepo) getMappingDatas(nodePath string, mapping map[string]interface{}) interface{} { +func (r *MetadataRepo) getMappingDatas(nodePath string, mapping map[string]interface{}, traveller store.Traveller) interface{} { nodePath = path.Join("/", nodePath) paths := strings.Split(nodePath, "/")[1:] // trim first blank item // nodePath is "/" @@ -233,7 +262,7 @@ func (r *MetadataRepo) getMappingDatas(nodePath string, mapping map[string]inter for k, v := range mapping { submapping, isMap := v.(map[string]interface{}) if isMap { - val := r.getMappingDatas("/", submapping) + val := r.getMappingDatas("/", submapping, traveller) if val != nil { meta[k] = val } else { @@ -241,7 +270,7 @@ func (r *MetadataRepo) getMappingDatas(nodePath string, mapping map[string]inter } } else { subNodePath := fmt.Sprintf("%v", v) - val := r.getMappingData("/", subNodePath) + val := r.getMappingData("/", subNodePath, traveller) if val != nil { meta[k] = val } else { @@ -257,9 +286,9 @@ func (r *MetadataRepo) getMappingDatas(nodePath string, mapping map[string]inter if ok { submapping, isMap := elemValue.(map[string]interface{}) if isMap { - return r.getMappingDatas(path.Join(paths[1:]...), submapping) + return r.getMappingDatas(path.Join(paths[1:]...), submapping, traveller) } else { - return r.getMappingData(path.Join(paths[1:]...), fmt.Sprintf("%v", elemValue)) + return r.getMappingData(path.Join(paths[1:]...), fmt.Sprintf("%v", elemValue), traveller) } } else { if log.IsDebugEnable() { @@ -400,6 +429,21 @@ func (r *MetadataRepo) DataVersion() int64 { return r.data.Version() } +func (r *MetadataRepo) PutAccessRule(rulesMap map[string][]store.AccessRule) error { + for host, rules := range rulesMap { + r.accessTrees[host] = store.NewAccessTree(rules) + } + return nil +} + +func (r *MetadataRepo) DeleteAccessRule(host string) { + delete(r.accessTrees, host) +} + +func (r *MetadataRepo) GetAccessRule(host string) map[string][]store.AccessRule { + return nil +} + func checkSubs(subs []string) error { for _, sub := range subs { if strings.Index(sub, "/") >= 0 { diff --git a/metadata/metarepo_test.go b/metadata/metarepo_test.go index 903f4ae..fc54d6c 100644 --- a/metadata/metarepo_test.go +++ b/metadata/metarepo_test.go @@ -38,7 +38,7 @@ func TestMetarepoData(t *testing.T) { storeClient, err := backends.New(config) assert.NoError(t, err) - metarepo := New(false, storeClient) + metarepo := New(storeClient) metarepo.DeleteData("/") metarepo.StartSync() @@ -46,8 +46,14 @@ func TestMetarepoData(t *testing.T) { testData := FillTestData(metarepo) time.Sleep(sleepTime) ValidTestData(t, testData, metarepo.data) - - _, val := metarepo.Root("192.168.0.1", "/nodes/0") + clientIP := "192.168.0.1" + accessRule := map[string][]store.AccessRule{ + clientIP: { + {Path: "/", Mode: store.AccessModeRead}, + }, + } + metarepo.PutAccessRule(accessRule) + _, val := metarepo.Root(clientIP, "/nodes/0") assert.NotNil(t, val) mapVal, mok := val.(map[string]interface{}) @@ -95,7 +101,7 @@ func TestMetarepoMapping(t *testing.T) { storeClient, err := backends.New(config) assert.NoError(t, err) - metarepo := New(false, storeClient) + metarepo := New(storeClient) metarepo.DeleteData("/") metarepo.DeleteMapping("/") @@ -203,7 +209,7 @@ func TestMetarepoSelf(t *testing.T) { storeClient, err := backends.New(config) assert.NoError(t, err) - metarepo := New(false, storeClient) + metarepo := New(storeClient) metarepo.DeleteMapping("/") metarepo.DeleteData("/") @@ -294,7 +300,7 @@ func TestMetarepoRoot(t *testing.T) { storeClient, err := backends.New(config) assert.NoError(t, err) - metarepo := New(false, storeClient) + metarepo := New(storeClient) metarepo.DeleteMapping("/") metarepo.DeleteData("/") @@ -305,6 +311,14 @@ func TestMetarepoRoot(t *testing.T) { time.Sleep(sleepTime) ip := "192.168.1.0" + + accessRule := map[string][]store.AccessRule{ + ip: { + {Path: "/", Mode: store.AccessModeRead}, + }, + } + metarepo.PutAccessRule(accessRule) + mapping := make(map[string]interface{}) mapping["node"] = "/nodes/0" err = metarepo.PutMapping(ip, mapping, true) @@ -320,14 +334,6 @@ func TestMetarepoRoot(t *testing.T) { assert.NotNil(t, selfVal) assert.True(t, len(mapVal) > 1) - metarepo.SetOnlySelf(true) - - _, val = metarepo.Root(ip, "/") - mapVal = val.(map[string]interface{}) - selfVal = mapVal["self"] - assert.NotNil(t, selfVal) - assert.True(t, len(mapVal) == 1) - metarepo.DeleteData("/") metarepo.DeleteMapping("/") metarepo.StopSync() @@ -347,7 +353,7 @@ func TestWatch(t *testing.T) { storeClient, err := backends.New(config) assert.NoError(t, err) - metarepo := New(false, storeClient) + metarepo := New(storeClient) metarepo.DeleteMapping("/") metarepo.DeleteData("/") @@ -420,7 +426,7 @@ func TestWatchSelf(t *testing.T) { storeClient, err := backends.New(config) assert.NoError(t, err) - metarepo := New(false, storeClient) + metarepo := New(storeClient) metarepo.DeleteMapping("/") metarepo.DeleteData("/") @@ -504,7 +510,7 @@ func TestWatchCloseChan(t *testing.T) { storeClient, err := backends.New(config) assert.NoError(t, err) - metarepo := New(false, storeClient) + metarepo := New(storeClient) metarepo.StartSync() @@ -560,7 +566,7 @@ func TestSelfWatchNodeNotExist(t *testing.T) { storeClient, err := backends.New(config) assert.NoError(t, err) - metarepo := New(false, storeClient) + metarepo := New(storeClient) metarepo.StartSync() diff --git a/store/access.go b/store/access.go new file mode 100644 index 0000000..5e7473d --- /dev/null +++ b/store/access.go @@ -0,0 +1,83 @@ +package store + +import ( + "encoding/json" + "strings" +) + +type AccessMode int + +const ( + AccessModeNil = AccessMode(-1) + AccessModeForbidden = AccessMode(0) + AccessModeRead = AccessMode(1) +) + +type AccessRule struct { + Path string `json:"path"` + Mode AccessMode `json:"mode"` +} + +type AccessNode struct { + Name string + Mode AccessMode + parent *AccessNode + Children []*AccessNode +} + +func (n *AccessNode) HasChildren() bool { + return len(n.Children) > 0 +} + +func (n *AccessNode) GetChildren(name string, strict bool) *AccessNode { + var wildcardNode *AccessNode + for _, c := range n.Children { + if name == c.Name { + return c + } + if !strict && c.Name == "*" { + wildcardNode = c + } + } + return wildcardNode +} + +type AccessTree struct { + Root *AccessNode +} + +func (t *AccessTree) Json() string { + b, _ := json.MarshalIndent(t.Root, "", " ") + return string(b) +} + +func NewAccessTree(rules []AccessRule) *AccessTree { + root := &AccessNode{ + Name: "/", + Mode: AccessModeNil, + parent: nil, + } + tree := &AccessTree{ + Root: root, + } + for _, rule := range rules { + p := rule.Path + curr := root + if p != "/" { + components := strings.Split(p, "/") + for _, component := range components { + if component == "" { + continue + } + child := curr.GetChildren(component, true) + if child == nil { + child = &AccessNode{Name: component, Mode: AccessModeNil, parent: curr} + curr.Children = append(curr.Children, child) + } + curr = child + } + } + curr.Mode = rule.Mode + } + return tree +} diff --git a/store/store.go b/store/store.go index 1c12ed9..df267ae 100644 --- a/store/store.go +++ b/store/store.go @@ -33,7 +33,7 @@ type Store interface { // Destroy the store Destroy() // Traveller - Traveller(rules []AccessRule) Traveller + Traveller(accessTree *AccessTree) Traveller } type store struct { @@ -181,8 +181,8 @@ func (s *store) Destroy() { s.Root = nil } -func (s *store) Traveller(rules []AccessRule) Traveller { - return newTraveller(s, rules) +func (s *store) Traveller(accessTree *AccessTree) Traveller { + return newTraveller(s, accessTree) } // walk walks all the nodePath and apply the walkFunc on each directory diff --git a/store/traveller.go b/store/traveller.go index eb638c1..c3534ad 100644 --- a/store/traveller.go +++ b/store/traveller.go @@ -1,24 +1,9 @@ package store import ( - "encoding/json" "strings" ) -type AccessMode int -type EnterResult int - -const ( - AccessModeNil = AccessMode(-1) - AccessModeForbidden = AccessMode(0) - AccessModeRead = AccessMode(1) -) - -type AccessRule struct { - Path string - Mode AccessMode -} - type Traveller interface { // Enter path's node, return is success. Enter(path string) bool @@ -36,72 +21,8 @@ type Traveller interface { GetVersion() int64 } -type accessNode struct { - Name string - Mode AccessMode - parent *accessNode - Children []*accessNode -} - -func (n *accessNode) HasChildren() bool { - return len(n.Children) > 0 -} - -func (n *accessNode) GetChildren(name string, strict bool) *accessNode { - var wildcardNode *accessNode - for _, c := range n.Children { - if name == c.Name { - return c - } - if !strict && c.Name == "*" { - wildcardNode = c - } - } - return wildcardNode -} - -type accessTree struct { - root *accessNode -} - -func (t *accessTree) Json() string { - b, _ := json.MarshalIndent(t.root, "", " ") - return string(b) -} - -func newAccessTree(rules []AccessRule) *accessTree { - root := &accessNode{ - Name: "/", - Mode: AccessModeNil, - parent: nil, - } - tree := &accessTree{ - root: root, - } - for _, rule := range rules { - p := rule.Path - curr := root - if p != "/" { - components := strings.Split(p, "/") - for _, component := range components { - if component == "" { - continue - } - child := curr.GetChildren(component, true) - if child == nil { - child = &accessNode{Name: component, Mode: AccessModeNil, parent: curr} - curr.Children = append(curr.Children, child) - } - curr = child - } - } - curr.Mode = rule.Mode - } - return tree -} - type stackElement struct { - node *accessNode + node *AccessNode mode AccessMode } @@ -129,17 +50,16 @@ func (s *travellerStack) Clean() { type nodeTraveller struct { store *store - access *accessTree + access *AccessTree currNode *node - currAccessNode *accessNode + currAccessNode *AccessNode currMode AccessMode stack travellerStack } -func newTraveller(store *store, rules []AccessRule) Traveller { - accessTree := newAccessTree(rules) +func newTraveller(store *store, accessTree *AccessTree) Traveller { store.worldLock.RLock() - return &nodeTraveller{store: store, access: accessTree, currNode: store.Root, currAccessNode: accessTree.root, currMode: accessTree.root.Mode} + return &nodeTraveller{store: store, access: accessTree, currNode: store.Root, currAccessNode: accessTree.Root, currMode: accessTree.Root.Mode} } func (t *nodeTraveller) Enter(path string) bool { @@ -173,7 +93,7 @@ func (t *nodeTraveller) enter(node string) bool { if n == nil { return false } - var an *accessNode + var an *AccessNode if t.currAccessNode != nil { an = t.currAccessNode.GetChildren(node, false) } @@ -229,7 +149,7 @@ func (t *nodeTraveller) BackToRoot() { } t.stack.Clean() t.currNode = t.store.Root - t.currAccessNode = t.access.root + t.currAccessNode = t.access.Root t.currMode = t.currAccessNode.Mode } @@ -268,13 +188,13 @@ func (t *nodeTraveller) GetVersion() int64 { return t.store.Version() } -func (t *nodeTraveller) Close(){ +func (t *nodeTraveller) Close() { if t.store == nil { panic("illegal status: access a closed traveller.") } - t.store = nil t.access = nil t.currAccessNode = nil t.currNode = nil t.store.worldLock.RUnlock() -} \ No newline at end of file + t.store = nil +} diff --git a/store/traveller_test.go b/store/traveller_test.go index d0b0d1c..5903f02 100644 --- a/store/traveller_test.go +++ b/store/traveller_test.go @@ -55,7 +55,7 @@ func TestTravellerEnter(t *testing.T) { }, } - traveller := s.Traveller(accessRules) + traveller := s.Traveller(NewAccessTree(accessRules)) defer traveller.Close() assert.True(t, traveller.Enter("/clusters")) assert.True(t, traveller.Enter("/cl-1/env")) @@ -113,7 +113,7 @@ func TestTraveller(t *testing.T) { Mode: AccessModeRead, }, } - traveller := s.Traveller(accessRules) + traveller := s.Traveller(NewAccessTree(accessRules)) defer traveller.Close() nodeTraveller := traveller.(*nodeTraveller) From 7464b4107df7ff10b6194ade69d2af7b591d4d49 Mon Sep 17 00:00:00 2001 From: jolestar Date: Wed, 31 May 2017 20:11:32 +0800 Subject: [PATCH 07/11] add access store client impl. --- backends/client.go | 5 ++ backends/client_test.go | 78 ++++++++++++++++++++ backends/etcdv3/client.go | 112 ++++++++++++++++++++++++----- backends/etcdv3/client_test.go | 2 +- backends/local/client.go | 48 ++++++++++++- metad.go | 14 +++- metad_test.go | 83 ++++++++++++++++++++++ metadata/metarepo.go | 57 +++++++++------ metadata/metarepo_test.go | 12 ++-- store/access.go | 125 +++++++++++++++++++++++++++++++-- store/store.go | 4 +- store/traveller.go | 15 ++-- util/helper.go | 12 ++++ 13 files changed, 500 insertions(+), 67 deletions(-) diff --git a/backends/client.go b/backends/client.go index 692566d..4d55b26 100644 --- a/backends/client.go +++ b/backends/client.go @@ -25,6 +25,11 @@ type StoreClient interface { PutMapping(nodePath string, mapping interface{}, replace bool) error DeleteMapping(nodePath string, dir bool) error SyncMapping(mapping store.Store, stopChan chan bool) + + GetAccessRule() (map[string][]store.AccessRule, error) + PutAccessRule(rules map[string][]store.AccessRule) error + DeleteAccessRule(hosts []string) error + SyncAccessRule(accessStore store.AccessStore, stopChan chan bool) } // New is used to create a storage client based on our configuration. diff --git a/backends/client_test.go b/backends/client_test.go index ca56afc..bddb2b3 100644 --- a/backends/client_test.go +++ b/backends/client_test.go @@ -461,6 +461,84 @@ func TestMappingSync(t *testing.T) { } } +func TestAccessRule(t *testing.T) { + for _, backend := range backendNodes { + stopChan := make(chan bool) + defer func() { + stopChan <- true + }() + storeClient := NewTestClient(backend) + + accessStore := store.NewAccessStore() + storeClient.SyncAccessRule(accessStore, stopChan) + + rules := map[string][]store.AccessRule{ + "192.168.1.1": { + {Path: "/clusters", Mode: store.AccessModeForbidden}, + {Path: "/clusters/cl-1", Mode: store.AccessModeRead}, + }, + "192.168.1.2": { + {Path: "/clusters", Mode: store.AccessModeForbidden}, + {Path: "/clusters/cl-2", Mode: store.AccessModeRead}, + }, + } + var rulesGet map[string][]store.AccessRule + err := storeClient.PutAccessRule(rules) + assert.NoError(t, err) + + rulesGet, err = storeClient.GetAccessRule() + assert.NoError(t, err) + assert.Equal(t, rules, rulesGet) + + time.Sleep(1000 * time.Millisecond) + assert.NotNil(t, accessStore.Get("192.168.1.1")) + + err = storeClient.DeleteAccessRule([]string{"192.168.1.2"}) + assert.NoError(t, err) + + rulesGet, err = storeClient.GetAccessRule() + assert.NoError(t, err) + _, ok := rulesGet["192.168.1.2"] + assert.False(t, ok) + + rules2 := map[string][]store.AccessRule{ + "192.168.1.3": { + {Path: "/clusters", Mode: store.AccessModeForbidden}, + {Path: "/clusters/cl-3", Mode: store.AccessModeRead}, + }, + } + err = storeClient.PutAccessRule(rules2) + assert.NoError(t, err) + + time.Sleep(1000 * time.Millisecond) + + assert.Nil(t, accessStore.Get("192.168.1.2")) + assert.NotNil(t, accessStore.Get("192.168.1.3")) + + err = storeClient.DeleteAccessRule([]string{"192.168.1.1", "192.168.1.2", "192.168.1.3"}) + assert.NoError(t, err) + } +} + +func NewTestClient(backend string) StoreClient { + prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) + group := fmt.Sprintf("/group%v", rand.Intn(1000)) + println("Test backend: ", backend) + nodes := GetDefaultBackends(backend) + + config := Config{ + Backend: backend, + BackendNodes: nodes, + Prefix: prefix, + Group: group, + } + storeClient, err := New(config) + if err != nil { + panic(err) + } + return storeClient +} + func FillTestData(storeClient StoreClient) map[string]string { testData := make(map[string]interface{}) for i := 0; i < 5; i++ { diff --git a/backends/etcdv3/client.go b/backends/etcdv3/client.go index 4c21d23..3c7ecdd 100644 --- a/backends/etcdv3/client.go +++ b/backends/etcdv3/client.go @@ -19,6 +19,7 @@ import ( ) const SELF_MAPPING_PATH = "/_metad/mapping" +const RULE_PATH = "/_metad/rule" var ( //see github.com/coreos/etcd/etcdserver/api/v3rpc/key.go @@ -30,6 +31,7 @@ type Client struct { client *client.Client prefix string mappingPrefix string + rulePrefix string } // NewEtcdClient returns an *etcd.Client with a connection to named machines. @@ -79,7 +81,7 @@ func NewEtcdClient(group string, prefix string, machines []string, cert, key, ca if err != nil { return nil, err } - return &Client{c, prefix, path.Join(SELF_MAPPING_PATH, group)}, nil + return &Client{c, prefix, path.Join(SELF_MAPPING_PATH, group), path.Join(RULE_PATH, group)}, nil } // Get queries etcd for nodePath. @@ -105,7 +107,7 @@ func (c *Client) Delete(nodePath string, dir bool) error { func (c *Client) Sync(store store.Store, stopChan chan bool) { startedChan := make(chan bool) - go c.internalSync(c.prefix, store, stopChan, startedChan) + go c.internalSync(c.prefix, stopChan, startedChan, c.newInitStoreFunc(c.prefix, store), newProcessSyncChangeFunc(store)) <-startedChan } @@ -133,7 +135,70 @@ func (c *Client) DeleteMapping(nodePath string, dir bool) error { func (c *Client) SyncMapping(mapping store.Store, stopChan chan bool) { startedChan := make(chan bool) - go c.internalSync(c.mappingPrefix, mapping, stopChan, startedChan) + go c.internalSync(c.mappingPrefix, stopChan, startedChan, c.newInitStoreFunc(c.mappingPrefix, mapping), newProcessSyncChangeFunc(mapping)) + <-startedChan +} + +func (c *Client) GetAccessRule() (map[string][]store.AccessRule, error) { + result := make(map[string][]store.AccessRule) + m, err := c.internalGets(c.rulePrefix, "/") + if err != nil { + return nil, err + } + for k, v := range m { + rules, err := store.UnmarshalAccessRule(v) + if err != nil { + log.Error("Unexpect rule json value in etcd [%s]", v) + continue + } + _, host := path.Split(k) + result[host] = rules + } + return result, nil +} + +func (c *Client) PutAccessRule(rules map[string][]store.AccessRule) error { + values := make(map[string]string, len(rules)) + for k, v := range rules { + values[k] = store.MarshalAccessRule(v) + } + return c.internalPutValues(c.rulePrefix, "/", values, false) +} + +func (c *Client) DeleteAccessRule(hosts []string) error { + for _, host := range hosts { + err := c.internalDelete(c.rulePrefix, path.Join("/", host), false) + if err != nil { + return err + } + } + return nil +} + +func (c *Client) SyncAccessRule(accessStore store.AccessStore, stopChan chan bool) { + startedChan := make(chan bool) + go c.internalSync(c.rulePrefix, stopChan, startedChan, func() error { + val, err := c.GetAccessRule() + if err != nil { + return err + } + accessStore.Puts(val) + return nil + }, func(event *client.Event, nodePath, value string) { + _, host := path.Split(nodePath) + switch event.Type { + case mvccpb.PUT: + rules, err := store.UnmarshalAccessRule(value) + if err != nil { + log.Error("Unexpect rule json value in etcd [%s]", value) + } + accessStore.Put(host, rules) + case mvccpb.DELETE: + accessStore.Delete(host) + default: + log.Warning("Unknow watch event type: %s ", event.Type) + } + }) <-startedChan } @@ -182,7 +247,7 @@ func handleGetResp(prefix string, resp *client.GetResponse, vars map[string]stri return nil } -func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bool, startedChan chan bool) { +func (c *Client) internalSync(prefix string, stopChan chan bool, startedChan chan bool, initStoreFunc func() error, processChangeFunc func(event *client.Event, nodePath, value string)) { var rev int64 = 0 init := false @@ -209,14 +274,13 @@ func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bo }() for !init { - val, err := c.internalGets(prefix, "/") + err := initStoreFunc() if err != nil { - log.Error("GetValue from etcd nodePath:%s, error-type: %s, error: %s", prefix, reflect.TypeOf(err), err.Error()) + log.Error("Get init value from etcd nodePath:%s, error-type: %s, error: %s", prefix, reflect.TypeOf(err), err.Error()) time.Sleep(time.Duration(1000) * time.Millisecond) log.Info("Init store for prefix %s fail, retry.", prefix) continue } - store.PutBulk("/", val) log.Info("Init store for prefix %s success.", prefix) init = true go func() { @@ -224,24 +288,36 @@ func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bo }() } for resp := range watchChan { - processSyncChange(prefix, store, &resp) + for _, event := range resp.Events { + nodePath := string(event.Kv.Key) + // avoid sync mapping config as metadata when prefix is "/" + if (prefix == "" || prefix == "/") && (strings.HasPrefix(nodePath, SELF_MAPPING_PATH) || strings.HasPrefix(nodePath, RULE_PATH)) { + continue + } + + nodePath = util.TrimPathPrefix(nodePath, prefix) + value := string(event.Kv.Value) + log.Debug("process sync change, event_type: %s, prefix: %v, nodePath:%v, value: %v ", event.Type, prefix, nodePath, value) + processChangeFunc(event, nodePath, value) + } rev = resp.Header.Revision } } } -func processSyncChange(prefix string, store store.Store, resp *client.WatchResponse) { - for _, event := range resp.Events { - nodePath := string(event.Kv.Key) - - // avoid sync mapping config as metadata when prefix is "/" - if (prefix == "" || prefix == "/") && strings.HasPrefix(nodePath, SELF_MAPPING_PATH) { - continue +func (c *Client) newInitStoreFunc(prefix string, store store.Store) func() error { + return func() error { + val, err := c.internalGets(prefix, "/") + if err != nil { + return err } + store.PutBulk("/", val) + return nil + } +} - nodePath = util.TrimPathPrefix(nodePath, prefix) - value := string(event.Kv.Value) - log.Debug("process sync change, event_type: %s, prefix: %v, nodePath:%v, value: %v ", event.Type, prefix, nodePath, value) +func newProcessSyncChangeFunc(store store.Store) func(event *client.Event, nodePath, value string) { + return func(event *client.Event, nodePath, value string) { switch event.Type { case mvccpb.PUT: store.Put(nodePath, value) diff --git a/backends/etcdv3/client_test.go b/backends/etcdv3/client_test.go index d1c7e41..887e2c2 100644 --- a/backends/etcdv3/client_test.go +++ b/backends/etcdv3/client_test.go @@ -32,7 +32,7 @@ func TestClientSyncStop(t *testing.T) { metastore := store.New() // expect internalSync not block after stopChan has signal startedChan := make(chan bool) - storeClient.internalSync(prefix, metastore, stopChan, startedChan) + storeClient.internalSync(prefix, stopChan, startedChan, storeClient.newInitStoreFunc(prefix, metastore), newProcessSyncChangeFunc(metastore)) initialized := <-startedChan log.Info(fmt.Sprint("sync status:", initialized)) diff --git a/backends/local/client.go b/backends/local/client.go index a0faaef..1411f3d 100644 --- a/backends/local/client.go +++ b/backends/local/client.go @@ -7,14 +7,17 @@ import ( // a backend just for test. type Client struct { - data store.Store - mapping store.Store + data store.Store + mapping store.Store + rules map[string][]store.AccessRule + accessStore store.AccessStore } func NewLocalClient() (*Client, error) { return &Client{ data: store.New(), mapping: store.New(), + rules: map[string][]store.AccessRule{}, }, nil } @@ -79,6 +82,47 @@ func (c *Client) SyncMapping(mapping store.Store, stopChan chan bool) { go c.internalSync("mapping", c.mapping, mapping, stopChan) } +func (c *Client) GetAccessRule() (map[string][]store.AccessRule, error) { + result := make(map[string][]store.AccessRule, len(c.rules)) + for k, v := range c.rules { + result[k] = v + } + return result, nil +} + +func (c *Client) PutAccessRule(rules map[string][]store.AccessRule) error { + for k, v := range rules { + c.rules[k] = v + if c.accessStore != nil { + c.accessStore.Put(k, v) + } + } + return nil +} + +func (c *Client) DeleteAccessRule(hosts []string) error { + for _, host := range hosts { + delete(c.rules, host) + if c.accessStore != nil { + c.accessStore.Delete(host) + } + } + return nil +} + +func (c *Client) SyncAccessRule(accessStore store.AccessStore, stopChan chan bool) { + c.accessStore = accessStore + for k, v := range c.rules { + c.accessStore.Put(k, v) + } + go func() { + select { + case <-stopChan: + c.accessStore = nil + } + }() +} + func (c *Client) internalSync(name string, from store.Store, to store.Store, stopChan chan bool) { w := from.Watch("/", 5000) _, meta := from.Get("/") diff --git a/metad.go b/metad.go index d50a7c2..9c37d1d 100644 --- a/metad.go +++ b/metad.go @@ -140,6 +140,10 @@ func (m *Metad) initManageRouter() { data.HandleFunc("/{nodePath:.*}", m.manageWrapper(m.dataUpdate)).Methods("POST", "PUT") data.HandleFunc("/{nodePath:.*}", m.manageWrapper(m.dataDelete)).Methods("DELETE") + v1.HandleFunc("/rule", m.manageWrapper(m.accessRuleGet)).Methods("GET") + v1.HandleFunc("/rule", m.manageWrapper(m.accessRuleUpdate)).Methods("POST", "PUT") + v1.HandleFunc("/rule", m.manageWrapper(m.accessRuleDelete)).Methods("DELETE") + rule := v1.PathPrefix("/rule").Subrouter() rule.HandleFunc("/", m.manageWrapper(m.accessRuleGet)).Methods("GET") rule.HandleFunc("/", m.manageWrapper(m.accessRuleUpdate)).Methods("POST", "PUT") @@ -306,7 +310,10 @@ func (m *Metad) mappingDelete(ctx context.Context, req *http.Request) (interface } func (m *Metad) accessRuleGet(ctx context.Context, req *http.Request) (interface{}, *HttpError) { - return nil, nil + hostsStr := req.FormValue("hosts") + hosts := strings.Split(hostsStr, ",") + val := m.metadataRepo.GetAccessRule(hosts) + return val, nil } func (m *Metad) accessRuleUpdate(ctx context.Context, req *http.Request) (interface{}, *HttpError) { @@ -329,7 +336,10 @@ func (m *Metad) accessRuleUpdate(ctx context.Context, req *http.Request) (interf } func (m *Metad) accessRuleDelete(ctx context.Context, req *http.Request) (interface{}, *HttpError) { - return nil, nil + hostsStr := req.FormValue("hosts") + hosts := strings.Split(hostsStr, ",") + err := m.metadataRepo.DeleteAccessRule(hosts) + return nil, NewServerError(err) } func contentType(req *http.Request) int { diff --git a/metad_test.go b/metad_test.go index 7a68568..f03be3f 100644 --- a/metad_test.go +++ b/metad_test.go @@ -436,6 +436,89 @@ func TestMetadMappingDelete(t *testing.T) { getAndCheckMapping(metad, t, ip, false) } +func TestMetadAccessRule(t *testing.T) { + config := &Config{ + Backend: testBackend, + } + metad, err := New(config) + assert.NoError(t, err) + + metad.Init() + + defer metad.Stop() + + data := map[string]interface{}{ + "clusters": map[string]interface{}{ + "cl-1": map[string]interface{}{ + "name": "cl-1", + "env": map[string]interface{}{ + "username": "user1", + "secret": "123456", + }, + "public_key": "public_key_val", + }, + "cl-2": map[string]interface{}{ + "name": "cl-2", + "env": map[string]interface{}{ + "username": "user2", + "secret": "1234567", + }, + "public_key": "public_key_val2", + }, + }, + } + + b, _ := json.Marshal(data) + dataJson := string(b) + req := httptest.NewRequest("PUT", "/v1/data/", strings.NewReader(dataJson)) + w := httptest.NewRecorder() + metad.manageRouter.ServeHTTP(w, req) + assert.Equal(t, 200, w.Code) + mappingJson := ` + {"192.168.1.1":{"cluster":"/clusters/cl-1", "links":{"c2":"/clusters/cl-2"}}, + "192.168.1.2":{"cluster":"/clusters/cl-2"}}` + req = httptest.NewRequest("POST", "/v1/mapping", strings.NewReader(mappingJson)) + w = httptest.NewRecorder() + metad.manageRouter.ServeHTTP(w, req) + assert.Equal(t, 200, w.Code) + + ruleJson := ` + {"192.168.1.1":[{"path":"/","mode":0}, {"path":"/clusters/*/env","mode":0},{"path":"/clusters/cl-1","mode":1}, {"path":"/clusters/cl-2","mode":1}, {"path":"/clusters/cl-2/env/secret","mode":0}], + "192.168.1.2":[{"path":"/","mode":0}, {"path":"/clusters/*/env","mode":0},{"path":"/clusters/cl-2","mode":1}] + } + ` + req = httptest.NewRequest("POST", "/v1/rule", strings.NewReader(ruleJson)) + w = httptest.NewRecorder() + metad.manageRouter.ServeHTTP(w, req) + assert.Equal(t, 200, w.Code) + + time.Sleep(sleepTime) + + req = httptest.NewRequest("GET", "/", nil) + req.RemoteAddr = "192.168.1.1:1234" + req.Header.Set("accept", "application/json") + w = httptest.NewRecorder() + metad.router.ServeHTTP(w, req) + assert.Equal(t, 200, w.Code) + assert.Equal(t, "cl-1", util.GetMapValue(parse(w), "/self/cluster/name")) + // node1 can access cl-2 + assert.Equal(t, "cl-2", util.GetMapValue(parse(w), "/clusters/cl-2/name")) + assert.Equal(t, "user2", util.GetMapValue(parse(w), "/self/links/c2/env/username")) + //can not access cl-2 env/secret + assert.Equal(t, "", util.GetMapValue(parse(w), "/self/links/c2/env/secret")) + + req = httptest.NewRequest("GET", "/", nil) + req.RemoteAddr = "192.168.1.2:1234" + req.Header.Set("accept", "application/json") + w = httptest.NewRecorder() + metad.router.ServeHTTP(w, req) + assert.Equal(t, 200, w.Code) + assert.Equal(t, "cl-2", util.GetMapValue(parse(w), "/self/cluster/name")) + assert.Equal(t, "1234567", util.GetMapValue(parse(w), "/self/cluster/env/secret")) + // node2 can not access cl-1 + assert.Equal(t, "", util.GetMapValue(parse(w), "/clusters/cl-1/name")) +} + func getAndCheckMapping(metad *Metad, t *testing.T, ip string, exist bool) { req := httptest.NewRequest("GET", "/v1/mapping", nil) req.Header.Set("Accept", "application/json") diff --git a/metadata/metarepo.go b/metadata/metarepo.go index 6f76eab..f8b33f6 100644 --- a/metadata/metarepo.go +++ b/metadata/metarepo.go @@ -19,24 +19,26 @@ import ( const DEFAULT_WATCH_BUF_LEN = 100 type MetadataRepo struct { - mapping store.Store - storeClient backends.StoreClient - data store.Store - accessTrees map[string]*store.AccessTree - metaStopChan chan bool - mappingStopChan chan bool - timerPool *util.TimerPool + mapping store.Store + storeClient backends.StoreClient + data store.Store + accessStore store.AccessStore + metaStopChan chan bool + mappingStopChan chan bool + accessRuleStopChan chan bool + timerPool *util.TimerPool } func New(storeClient backends.StoreClient) *MetadataRepo { metadataRepo := MetadataRepo{ - mapping: store.New(), - storeClient: storeClient, - data: store.New(), - accessTrees: make(map[string]*store.AccessTree), - metaStopChan: make(chan bool), - mappingStopChan: make(chan bool), - timerPool: util.NewTimerPool(100 * time.Millisecond), + mapping: store.New(), + storeClient: storeClient, + data: store.New(), + accessStore: store.NewAccessStore(), + metaStopChan: make(chan bool), + mappingStopChan: make(chan bool), + accessRuleStopChan: make(chan bool), + timerPool: util.NewTimerPool(100 * time.Millisecond), } return &metadataRepo } @@ -45,6 +47,7 @@ func (r *MetadataRepo) StartSync() { log.Info("Start Sync") r.startMetaSync() r.startMappingSync() + r.startAccessRuleSync() } func (r *MetadataRepo) startMetaSync() { @@ -55,18 +58,23 @@ func (r *MetadataRepo) startMappingSync() { r.storeClient.SyncMapping(r.mapping, r.mappingStopChan) } +func (r *MetadataRepo) startAccessRuleSync() { + r.storeClient.SyncAccessRule(r.accessStore, r.accessRuleStopChan) +} + func (r *MetadataRepo) StopSync() { log.Info("Stop Sync") r.metaStopChan <- true r.mappingStopChan <- true + r.accessRuleStopChan <- true time.Sleep(1 * time.Second) r.data.Destroy() time.Sleep(1 * time.Second) r.mapping.Destroy() } -func (r *MetadataRepo) getAccessTree(clientIP string) *store.AccessTree { - accessTree := r.accessTrees[clientIP] +func (r *MetadataRepo) getAccessTree(clientIP string) store.AccessTree { + accessTree := r.accessStore.Get(clientIP) //for compatible with old version, auto convert mapping to AccessRule if accessTree == nil { mappingData := r.GetMapping(path.Join("/", clientIP)) @@ -430,18 +438,21 @@ func (r *MetadataRepo) DataVersion() int64 { } func (r *MetadataRepo) PutAccessRule(rulesMap map[string][]store.AccessRule) error { - for host, rules := range rulesMap { - r.accessTrees[host] = store.NewAccessTree(rules) + for _, v := range rulesMap { + err := store.CheckAccessRules(v) + if err != nil { + return err + } } - return nil + return r.storeClient.PutAccessRule(rulesMap) } -func (r *MetadataRepo) DeleteAccessRule(host string) { - delete(r.accessTrees, host) +func (r *MetadataRepo) DeleteAccessRule(hosts []string) error { + return r.storeClient.DeleteAccessRule(hosts) } -func (r *MetadataRepo) GetAccessRule(host string) map[string][]store.AccessRule { - return nil +func (r *MetadataRepo) GetAccessRule(hosts []string) map[string][]store.AccessRule { + return r.accessStore.GetAccessRule(hosts) } func checkSubs(subs []string) error { diff --git a/metadata/metarepo_test.go b/metadata/metarepo_test.go index fc54d6c..4d458a4 100644 --- a/metadata/metarepo_test.go +++ b/metadata/metarepo_test.go @@ -41,11 +41,6 @@ func TestMetarepoData(t *testing.T) { metarepo := New(storeClient) metarepo.DeleteData("/") - metarepo.StartSync() - - testData := FillTestData(metarepo) - time.Sleep(sleepTime) - ValidTestData(t, testData, metarepo.data) clientIP := "192.168.0.1" accessRule := map[string][]store.AccessRule{ clientIP: { @@ -53,6 +48,13 @@ func TestMetarepoData(t *testing.T) { }, } metarepo.PutAccessRule(accessRule) + + metarepo.StartSync() + + testData := FillTestData(metarepo) + time.Sleep(sleepTime) + ValidTestData(t, testData, metarepo.data) + _, val := metarepo.Root(clientIP, "/nodes/0") assert.NotNil(t, val) diff --git a/store/access.go b/store/access.go index 5e7473d..b4a872d 100644 --- a/store/access.go +++ b/store/access.go @@ -2,22 +2,57 @@ package store import ( "encoding/json" + "fmt" "strings" + "sync" ) type AccessMode int const ( + begin = AccessMode(-2) AccessModeNil = AccessMode(-1) AccessModeForbidden = AccessMode(0) AccessModeRead = AccessMode(1) + end = AccessMode(2) ) +func CheckAccessMode(mode AccessMode) bool { + if mode <= begin || mode >= end { + return false + } + return true +} + +func CheckAccessRules(rules []AccessRule) error { + keys := make(map[string]*struct{}, len(rules)) + for _, r := range rules { + if !CheckAccessMode(r.Mode) { + return fmt.Errorf("Invalid AccessMode [%v]", r.Mode) + } + if _, ok := keys[r.Path]; ok { + return fmt.Errorf("AccessRule path [%s] repeat define.", r.Path) + } + } + return nil +} + type AccessRule struct { Path string `json:"path"` Mode AccessMode `json:"mode"` } +func MarshalAccessRule(rules []AccessRule) string { + b, _ := json.Marshal(rules) + return string(b) +} + +func UnmarshalAccessRule(data string) ([]AccessRule, error) { + rules := []AccessRule{} + err := json.Unmarshal([]byte(data), &rules) + return rules, err +} + type AccessNode struct { Name string Mode AccessMode @@ -25,11 +60,11 @@ type AccessNode struct { Children []*AccessNode } -func (n *AccessNode) HasChildren() bool { +func (n *AccessNode) HasChild() bool { return len(n.Children) > 0 } -func (n *AccessNode) GetChildren(name string, strict bool) *AccessNode { +func (n *AccessNode) GetChild(name string, strict bool) *AccessNode { var wildcardNode *AccessNode for _, c := range n.Children { if name == c.Name { @@ -42,22 +77,98 @@ func (n *AccessNode) GetChildren(name string, strict bool) *AccessNode { return wildcardNode } -type AccessTree struct { +type AccessStore interface { + Get(host string) AccessTree + GetAccessRule(hosts []string) map[string][]AccessRule + Put(host string, rules []AccessRule) + Puts(rules map[string][]AccessRule) + Delete(host string) +} + +func NewAccessStore() AccessStore { + return &accessStore{m: make(map[string]AccessTree)} +} + +type accessStore struct { + m map[string]AccessTree + lock sync.RWMutex +} + +func (s *accessStore) Delete(host string) { + s.lock.Lock() + delete(s.m, host) + s.lock.Unlock() +} + +func (s *accessStore) Get(host string) AccessTree { + s.lock.RLock() + defer s.lock.RUnlock() + return s.m[host] +} + +func (s *accessStore) Put(host string, rules []AccessRule) { + s.lock.Lock() + s.m[host] = NewAccessTree(rules) + s.lock.Unlock() +} + +func (s *accessStore) Puts(rules map[string][]AccessRule) { + s.lock.Lock() + for k, v := range rules { + s.m[k] = NewAccessTree(v) + } + s.lock.Unlock() +} + +func (s *accessStore) GetAccessRule(hosts []string) map[string][]AccessRule { + s.lock.RLock() + defer s.lock.RUnlock() + result := map[string][]AccessRule{} + if len(hosts) == 0 { + for k, v := range s.m { + result[k] = v.ToAccessRule() + } + } else { + for _, host := range hosts { + t, ok := s.m[host] + if ok { + result[host] = t.ToAccessRule() + } + } + } + return result +} + +type AccessTree interface { + GetRoot() *AccessNode + ToAccessRule() []AccessRule + Json() string +} + +type accessTree struct { Root *AccessNode } -func (t *AccessTree) Json() string { +func (t *accessTree) GetRoot() *AccessNode { + return t.Root +} + +func (t *accessTree) ToAccessRule() []AccessRule { + return nil +} + +func (t *accessTree) Json() string { b, _ := json.MarshalIndent(t.Root, "", " ") return string(b) } -func NewAccessTree(rules []AccessRule) *AccessTree { +func NewAccessTree(rules []AccessRule) AccessTree { root := &AccessNode{ Name: "/", Mode: AccessModeNil, parent: nil, } - tree := &AccessTree{ + tree := &accessTree{ Root: root, } for _, rule := range rules { @@ -69,7 +180,7 @@ func NewAccessTree(rules []AccessRule) *AccessTree { if component == "" { continue } - child := curr.GetChildren(component, true) + child := curr.GetChild(component, true) if child == nil { child = &AccessNode{Name: component, Mode: AccessModeNil, parent: curr} curr.Children = append(curr.Children, child) diff --git a/store/store.go b/store/store.go index df267ae..288274e 100644 --- a/store/store.go +++ b/store/store.go @@ -33,7 +33,7 @@ type Store interface { // Destroy the store Destroy() // Traveller - Traveller(accessTree *AccessTree) Traveller + Traveller(accessTree AccessTree) Traveller } type store struct { @@ -181,7 +181,7 @@ func (s *store) Destroy() { s.Root = nil } -func (s *store) Traveller(accessTree *AccessTree) Traveller { +func (s *store) Traveller(accessTree AccessTree) Traveller { return newTraveller(s, accessTree) } diff --git a/store/traveller.go b/store/traveller.go index c3534ad..96880f6 100644 --- a/store/traveller.go +++ b/store/traveller.go @@ -50,16 +50,17 @@ func (s *travellerStack) Clean() { type nodeTraveller struct { store *store - access *AccessTree + access AccessTree currNode *node currAccessNode *AccessNode currMode AccessMode stack travellerStack } -func newTraveller(store *store, accessTree *AccessTree) Traveller { +func newTraveller(store *store, accessTree AccessTree) Traveller { store.worldLock.RLock() - return &nodeTraveller{store: store, access: accessTree, currNode: store.Root, currAccessNode: accessTree.Root, currMode: accessTree.Root.Mode} + currAccessNode := accessTree.GetRoot() + return &nodeTraveller{store: store, access: accessTree, currNode: store.Root, currAccessNode: currAccessNode, currMode: currAccessNode.Mode} } func (t *nodeTraveller) Enter(path string) bool { @@ -95,12 +96,12 @@ func (t *nodeTraveller) enter(node string) bool { } var an *AccessNode if t.currAccessNode != nil { - an = t.currAccessNode.GetChildren(node, false) + an = t.currAccessNode.GetChild(node, false) } result := false if an != nil { - // if an HasChildren, means exist other rule for future access - if an.HasChildren() || an.Mode >= AccessModeRead { + // if an HasChild, means exist other rule for future access + if an.HasChild() || an.Mode >= AccessModeRead { result = true } } else { @@ -149,7 +150,7 @@ func (t *nodeTraveller) BackToRoot() { } t.stack.Clean() t.currNode = t.store.Root - t.currAccessNode = t.access.Root + t.currAccessNode = t.access.GetRoot() t.currMode = t.currAccessNode.Mode } diff --git a/util/helper.go b/util/helper.go index 0cfe2b0..254bc93 100644 --- a/util/helper.go +++ b/util/helper.go @@ -3,6 +3,7 @@ package util import ( "github.com/yunify/metad/util/flatmap" "path" + "strconv" "strings" ) @@ -47,3 +48,14 @@ func GetMapValue(m interface{}, nodePath string) string { v := fm[nodePath] return v } + +func ParseInt(value string, defaultValue int) int { + if value == "" { + return defaultValue + } + result, err := strconv.Atoi(value) + if err != nil { + result = defaultValue + } + return result +} From 8a4269e693e0e7f2d6b1156d19ecb02c8daa5933 Mon Sep 17 00:00:00 2001 From: jolestar Date: Fri, 2 Jun 2017 16:30:38 +0800 Subject: [PATCH 08/11] add more test. --- metad_test.go | 58 +++++------ metadata/metarepo_test.go | 201 ++++++++++++++++---------------------- store/access.go | 47 ++++++--- store/access_test.go | 58 +++++++++++ store/traveller.go | 6 +- 5 files changed, 201 insertions(+), 169 deletions(-) create mode 100644 store/access_test.go diff --git a/metad_test.go b/metad_test.go index f03be3f..d13b46c 100644 --- a/metad_test.go +++ b/metad_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/yunify/metad/log" "github.com/yunify/metad/util" + "math/rand" ) var ( @@ -21,16 +22,11 @@ var ( func init() { log.SetLevel("debug") + rand.Seed(time.Now().UnixNano()) } func TestMetad(t *testing.T) { - config := &Config{ - Backend: testBackend, - } - metad, err := New(config) - assert.NoError(t, err) - - metad.Init() + metad := NewTestMetad() defer metad.Stop() @@ -202,13 +198,7 @@ func TestMetad(t *testing.T) { } func TestMetadWatch(t *testing.T) { - config := &Config{ - Backend: testBackend, - } - metad, err := New(config) - assert.NoError(t, err) - - metad.Init() + metad := NewTestMetad() defer metad.Stop() ip := "192.168.1.1" @@ -283,13 +273,7 @@ func TestMetadWatch(t *testing.T) { } func TestMetadWatchSelf(t *testing.T) { - config := &Config{ - Backend: testBackend, - } - metad, err := New(config) - assert.NoError(t, err) - - metad.Init() + metad := NewTestMetad() defer metad.Stop() @@ -371,13 +355,7 @@ func TestMetadWatchSelf(t *testing.T) { } func TestMetadMappingDelete(t *testing.T) { - config := &Config{ - Backend: testBackend, - } - metad, err := New(config) - assert.NoError(t, err) - - metad.Init() + metad := NewTestMetad() defer metad.Stop() @@ -437,14 +415,7 @@ func TestMetadMappingDelete(t *testing.T) { } func TestMetadAccessRule(t *testing.T) { - config := &Config{ - Backend: testBackend, - } - metad, err := New(config) - assert.NoError(t, err) - - metad.Init() - + metad := NewTestMetad() defer metad.Stop() data := map[string]interface{}{ @@ -519,6 +490,21 @@ func TestMetadAccessRule(t *testing.T) { assert.Equal(t, "", util.GetMapValue(parse(w), "/clusters/cl-1/name")) } +func NewTestMetad() *Metad { + group := fmt.Sprintf("/group%v", rand.Intn(10000)) + config := &Config{ + Backend: testBackend, + Group: group, + } + metad, err := New(config) + if err != nil { + panic(err) + } + + metad.Init() + return metad +} + func getAndCheckMapping(metad *Metad, t *testing.T, ip string, exist bool) { req := httptest.NewRequest("GET", "/v1/mapping", nil) req.Header.Set("Accept", "application/json") diff --git a/metadata/metarepo_test.go b/metadata/metarepo_test.go index 4d458a4..78de6a8 100644 --- a/metadata/metarepo_test.go +++ b/metadata/metarepo_test.go @@ -26,19 +26,7 @@ var ( func TestMetarepoData(t *testing.T) { - prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) - - nodes := backends.GetDefaultBackends(backend) - - config := backends.Config{ - Backend: backend, - BackendNodes: nodes, - Prefix: prefix, - } - storeClient, err := backends.New(config) - assert.NoError(t, err) - - metarepo := New(storeClient) + metarepo := NewTestMetarepo() metarepo.DeleteData("/") clientIP := "192.168.0.1" @@ -72,7 +60,7 @@ func TestMetarepoData(t *testing.T) { subs := []string{"1", "3", "noexistkey"} //test batch delete - err = metarepo.DeleteData("nodes", subs...) + err := metarepo.DeleteData("nodes", subs...) time.Sleep(sleepTime) assert.NoError(t, err) @@ -90,20 +78,7 @@ func TestMetarepoData(t *testing.T) { func TestMetarepoMapping(t *testing.T) { - prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) - group := fmt.Sprintf("/group%v", rand.Intn(1000)) - nodes := backends.GetDefaultBackends(backend) - - config := backends.Config{ - Backend: backend, - BackendNodes: nodes, - Prefix: prefix, - Group: group, - } - storeClient, err := backends.New(config) - assert.NoError(t, err) - - metarepo := New(storeClient) + metarepo := NewTestMetarepo() metarepo.DeleteData("/") metarepo.DeleteMapping("/") @@ -120,7 +95,7 @@ func TestMetarepoMapping(t *testing.T) { mappings[ip] = mapping } // batch update - err = metarepo.PutMapping("/", mappings, true) + err := metarepo.PutMapping("/", mappings, true) assert.NoError(t, err) time.Sleep(sleepTime) @@ -198,20 +173,7 @@ func TestMetarepoMapping(t *testing.T) { } func TestMetarepoSelf(t *testing.T) { - prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) - group := fmt.Sprintf("/group%v", rand.Intn(1000)) - nodes := backends.GetDefaultBackends(backend) - - config := backends.Config{ - Backend: backend, - BackendNodes: nodes, - Prefix: prefix, - Group: group, - } - storeClient, err := backends.New(config) - assert.NoError(t, err) - - metarepo := New(storeClient) + metarepo := NewTestMetarepo() metarepo.DeleteMapping("/") metarepo.DeleteData("/") @@ -233,7 +195,7 @@ func TestMetarepoSelf(t *testing.T) { mappings[ip] = mapping } // batch update - err = metarepo.PutMapping("/", mappings, true) + err := metarepo.PutMapping("/", mappings, true) assert.NoError(t, err) time.Sleep(sleepTime) @@ -289,20 +251,7 @@ func TestMetarepoSelf(t *testing.T) { func TestMetarepoRoot(t *testing.T) { - prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) - group := fmt.Sprintf("/group%v", rand.Intn(1000)) - nodes := backends.GetDefaultBackends(backend) - - config := backends.Config{ - Backend: backend, - BackendNodes: nodes, - Prefix: prefix, - Group: group, - } - storeClient, err := backends.New(config) - assert.NoError(t, err) - - metarepo := New(storeClient) + metarepo := NewTestMetarepo() metarepo.DeleteMapping("/") metarepo.DeleteData("/") @@ -323,7 +272,7 @@ func TestMetarepoRoot(t *testing.T) { mapping := make(map[string]interface{}) mapping["node"] = "/nodes/0" - err = metarepo.PutMapping(ip, mapping, true) + err := metarepo.PutMapping(ip, mapping, true) assert.NoError(t, err) time.Sleep(sleepTime) @@ -342,20 +291,7 @@ func TestMetarepoRoot(t *testing.T) { } func TestWatch(t *testing.T) { - prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) - group := fmt.Sprintf("/group%v", rand.Intn(1000)) - nodes := backends.GetDefaultBackends(backend) - - config := backends.Config{ - Backend: backend, - BackendNodes: nodes, - Prefix: prefix, - Group: group, - } - storeClient, err := backends.New(config) - assert.NoError(t, err) - - metarepo := New(storeClient) + metarepo := NewTestMetarepo() metarepo.DeleteMapping("/") metarepo.DeleteData("/") @@ -415,20 +351,7 @@ func TestWatch(t *testing.T) { } func TestWatchSelf(t *testing.T) { - prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) - group := fmt.Sprintf("/group%v", rand.Intn(1000)) - nodes := backends.GetDefaultBackends(backend) - - config := backends.Config{ - Backend: backend, - BackendNodes: nodes, - Prefix: prefix, - Group: group, - } - storeClient, err := backends.New(config) - assert.NoError(t, err) - - metarepo := New(storeClient) + metarepo := NewTestMetarepo() metarepo.DeleteMapping("/") metarepo.DeleteData("/") @@ -437,7 +360,7 @@ func TestWatchSelf(t *testing.T) { ip := "192.168.1.1" - err = metarepo.PutMapping(ip, map[string]interface{}{ + err := metarepo.PutMapping(ip, map[string]interface{}{ "node": "/nodes/1", }, true) assert.NoError(t, err) @@ -499,26 +422,13 @@ func TestWatchSelf(t *testing.T) { } func TestWatchCloseChan(t *testing.T) { - prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) - group := fmt.Sprintf("/group%v", rand.Intn(1000)) - nodes := backends.GetDefaultBackends(backend) - - config := backends.Config{ - Backend: backend, - BackendNodes: nodes, - Prefix: prefix, - Group: group, - } - storeClient, err := backends.New(config) - assert.NoError(t, err) - - metarepo := New(storeClient) + metarepo := NewTestMetarepo() metarepo.StartSync() ip := "192.168.1.1" - err = metarepo.PutMapping(ip, map[string]interface{}{ + err := metarepo.PutMapping(ip, map[string]interface{}{ "node": "/nodes/1", }, true) assert.NoError(t, err) @@ -555,20 +465,7 @@ func TestWatchCloseChan(t *testing.T) { // TestSelfWatchNodeNotExist // create a mapping with a node not exist, then update the node func TestSelfWatchNodeNotExist(t *testing.T) { - prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) - group := fmt.Sprintf("/group%v", rand.Intn(1000)) - nodes := backends.GetDefaultBackends(backend) - - config := backends.Config{ - Backend: backend, - BackendNodes: nodes, - Prefix: prefix, - Group: group, - } - storeClient, err := backends.New(config) - assert.NoError(t, err) - - metarepo := New(storeClient) + metarepo := NewTestMetarepo() metarepo.StartSync() @@ -576,7 +473,7 @@ func TestSelfWatchNodeNotExist(t *testing.T) { ip := "192.168.1.1" - err = metarepo.PutMapping(ip, map[string]interface{}{ + err := metarepo.PutMapping(ip, map[string]interface{}{ "host": "/hosts/i-local", "cmd": "/cmd/i-local", }, true) @@ -606,6 +503,74 @@ func TestSelfWatchNodeNotExist(t *testing.T) { metarepo.StopSync() } +func TestAccessRule(t *testing.T) { + metarepo := NewTestMetarepo() + metarepo.StartSync() + + data := map[string]interface{}{ + "clusters": map[string]interface{}{ + "cl-1": map[string]interface{}{ + "name": "cl-1", + "env": map[string]interface{}{ + "username": "user1", + "secret": "123456", + }, + "public_key": "public_key_val", + }, + "cl-2": map[string]interface{}{ + "name": "cl-2", + "env": map[string]interface{}{ + "username": "user2", + "secret": "1234567", + }, + "public_key": "public_key_val2", + }, + }, + } + + err := metarepo.PutData("/", data, true) + assert.NoError(t, err) + + ip := "192.168.1.1" + rules := map[string][]store.AccessRule{ + ip: { + {Path: "/", Mode: store.AccessModeRead}, + }, + } + + err = metarepo.PutAccessRule(rules) + assert.NoError(t, err) + + time.Sleep(sleepTime) + + rulesGet := metarepo.GetAccessRule([]string{ip}) + assert.Equal(t, rules, rulesGet) + + _, dataGet := metarepo.Root(ip, "/") + assert.Equal(t, data, dataGet) + + metarepo.StopSync() +} + +func NewTestMetarepo() *MetadataRepo { + prefix := fmt.Sprintf("/prefix%v", rand.Intn(10000)) + group := fmt.Sprintf("/group%v", rand.Intn(10000)) + nodes := backends.GetDefaultBackends(backend) + + config := backends.Config{ + Backend: backend, + BackendNodes: nodes, + Prefix: prefix, + Group: group, + } + storeClient, err := backends.New(config) + if err != nil { + panic(err) + } + + return New(storeClient) +} + func FillTestData(metarepo *MetadataRepo) map[string]string { nodes := make(map[string]interface{}) for i := 0; i < maxNode; i++ { diff --git a/store/access.go b/store/access.go index b4a872d..ad6a402 100644 --- a/store/access.go +++ b/store/access.go @@ -3,6 +3,7 @@ package store import ( "encoding/json" "fmt" + "path" "strings" "sync" ) @@ -53,19 +54,27 @@ func UnmarshalAccessRule(data string) ([]AccessRule, error) { return rules, err } -type AccessNode struct { +type accessNode struct { Name string Mode AccessMode - parent *AccessNode - Children []*AccessNode + parent *accessNode + Children []*accessNode } -func (n *AccessNode) HasChild() bool { +func (n *accessNode) Path() string { + if n.parent == nil { + return n.Name + } else { + return path.Join(n.parent.Path(), n.Name) + } +} + +func (n *accessNode) HasChild() bool { return len(n.Children) > 0 } -func (n *AccessNode) GetChild(name string, strict bool) *AccessNode { - var wildcardNode *AccessNode +func (n *accessNode) GetChild(name string, strict bool) *accessNode { + var wildcardNode *accessNode for _, c := range n.Children { if name == c.Name { return c @@ -140,21 +149,35 @@ func (s *accessStore) GetAccessRule(hosts []string) map[string][]AccessRule { } type AccessTree interface { - GetRoot() *AccessNode + GetRoot() *accessNode ToAccessRule() []AccessRule Json() string } type accessTree struct { - Root *AccessNode + Root *accessNode } -func (t *accessTree) GetRoot() *AccessNode { +func (t *accessTree) GetRoot() *accessNode { return t.Root } func (t *accessTree) ToAccessRule() []AccessRule { - return nil + rules := []AccessRule{} + rules = t.toAccessRule(t.Root, rules) + return rules +} + +func (t *accessTree) toAccessRule(node *accessNode, rules []AccessRule) []AccessRule { + if node.Mode != AccessModeNil { + rules = append(rules, AccessRule{Path: node.Path(), Mode: node.Mode}) + } + if node.HasChild() { + for _, child := range node.Children { + rules = t.toAccessRule(child, rules) + } + } + return rules } func (t *accessTree) Json() string { @@ -163,7 +186,7 @@ func (t *accessTree) Json() string { } func NewAccessTree(rules []AccessRule) AccessTree { - root := &AccessNode{ + root := &accessNode{ Name: "/", Mode: AccessModeNil, parent: nil, @@ -182,7 +205,7 @@ func NewAccessTree(rules []AccessRule) AccessTree { } child := curr.GetChild(component, true) if child == nil { - child = &AccessNode{Name: component, Mode: AccessModeNil, parent: curr} + child = &accessNode{Name: component, Mode: AccessModeNil, parent: curr} curr.Children = append(curr.Children, child) } curr = child diff --git a/store/access_test.go b/store/access_test.go new file mode 100644 index 0000000..17fe9c8 --- /dev/null +++ b/store/access_test.go @@ -0,0 +1,58 @@ +package store + +import ( + "encoding/json" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestAccessStore(t *testing.T) { + accessStore := NewAccessStore() + ip := "192.168.1.1" + rules := []AccessRule{ + {Path: "/", Mode: AccessModeForbidden}, + {Path: "/clusters", Mode: AccessModeRead}, + {Path: "/clusters/cl-1/env/secret", Mode: AccessModeForbidden}, + } + accessStore.Put(ip, rules) + + tree := accessStore.Get(ip) + assert.NotNil(t, tree) + + ip2 := "192.168.1.2" + accessStore.Puts(map[string][]AccessRule{ + ip2: rules, + }) + rulesGet := accessStore.GetAccessRule([]string{ip}) + rules1 := rulesGet[ip] + assert.Equal(t, rules, rules1) + + rulesGet2 := accessStore.GetAccessRule([]string{}) + assert.Equal(t, rules, rulesGet2[ip]) + assert.Equal(t, rules, rulesGet2[ip2]) + + accessStore.Delete(ip) + rulesGet3 := accessStore.GetAccessRule([]string{}) + assert.Equal(t, 1, len(rulesGet3)) +} + +func TestAccessTree(t *testing.T) { + rules := []AccessRule{ + {Path: "/", Mode: AccessModeForbidden}, + {Path: "/clusters", Mode: AccessModeRead}, + {Path: "/clusters/*/env", Mode: AccessModeForbidden}, + {Path: "/clusters/cl-1/env/secret", Mode: AccessModeRead}, + } + tree := NewAccessTree(rules) + jsonStr := tree.Json() + jsonMap := map[string]interface{}{} + err := json.Unmarshal([]byte(jsonStr), &jsonMap) + assert.NoError(t, err) + root := tree.GetRoot() + assert.Equal(t, AccessModeForbidden, root.Mode) + assert.Equal(t, AccessModeRead, root.GetChild("clusters", true).Mode) + assert.Equal(t, AccessModeForbidden, root.GetChild("clusters", true). + GetChild("cl-2", false).GetChild("env", true).Mode) + assert.Equal(t, AccessModeRead, root.GetChild("clusters", true). + GetChild("cl-1", false).GetChild("env", true).GetChild("secret", true).Mode) +} diff --git a/store/traveller.go b/store/traveller.go index 96880f6..992ed57 100644 --- a/store/traveller.go +++ b/store/traveller.go @@ -22,7 +22,7 @@ type Traveller interface { } type stackElement struct { - node *AccessNode + node *accessNode mode AccessMode } @@ -52,7 +52,7 @@ type nodeTraveller struct { store *store access AccessTree currNode *node - currAccessNode *AccessNode + currAccessNode *accessNode currMode AccessMode stack travellerStack } @@ -94,7 +94,7 @@ func (t *nodeTraveller) enter(node string) bool { if n == nil { return false } - var an *AccessNode + var an *accessNode if t.currAccessNode != nil { an = t.currAccessNode.GetChild(node, false) } From bad6013c545c3aa390f3d3a6e14c957fce406d79 Mon Sep 17 00:00:00 2001 From: jolestar Date: Fri, 2 Jun 2017 16:52:03 +0800 Subject: [PATCH 09/11] fix get rule api bug. --- backends/etcdv3/client.go | 3 +++ metad.go | 10 ++++++++-- metad_test.go | 10 +++++++++- metadata/metarepo.go | 3 +++ store/access.go | 3 +++ store/access_test.go | 2 +- 6 files changed, 27 insertions(+), 4 deletions(-) diff --git a/backends/etcdv3/client.go b/backends/etcdv3/client.go index 3c7ecdd..70fafae 100644 --- a/backends/etcdv3/client.go +++ b/backends/etcdv3/client.go @@ -167,6 +167,9 @@ func (c *Client) PutAccessRule(rules map[string][]store.AccessRule) error { func (c *Client) DeleteAccessRule(hosts []string) error { for _, host := range hosts { + if host == "" { + continue + } err := c.internalDelete(c.rulePrefix, path.Join("/", host), false) if err != nil { return err diff --git a/metad.go b/metad.go index 9c37d1d..abc28d7 100644 --- a/metad.go +++ b/metad.go @@ -311,7 +311,10 @@ func (m *Metad) mappingDelete(ctx context.Context, req *http.Request) (interface func (m *Metad) accessRuleGet(ctx context.Context, req *http.Request) (interface{}, *HttpError) { hostsStr := req.FormValue("hosts") - hosts := strings.Split(hostsStr, ",") + var hosts []string + if hostsStr != "" { + hosts = strings.Split(hostsStr, ",") + } val := m.metadataRepo.GetAccessRule(hosts) return val, nil } @@ -337,7 +340,10 @@ func (m *Metad) accessRuleUpdate(ctx context.Context, req *http.Request) (interf func (m *Metad) accessRuleDelete(ctx context.Context, req *http.Request) (interface{}, *HttpError) { hostsStr := req.FormValue("hosts") - hosts := strings.Split(hostsStr, ",") + var hosts []string + if hostsStr != "" { + hosts = strings.Split(hostsStr, ",") + } err := m.metadataRepo.DeleteAccessRule(hosts) return nil, NewServerError(err) } diff --git a/metad_test.go b/metad_test.go index d13b46c..1198d8e 100644 --- a/metad_test.go +++ b/metad_test.go @@ -465,6 +465,14 @@ func TestMetadAccessRule(t *testing.T) { time.Sleep(sleepTime) + req = httptest.NewRequest("GET", "/v1/rule", nil) + req.Header.Set("accept", "application/json") + w = httptest.NewRecorder() + metad.manageRouter.ServeHTTP(w, req) + assert.Equal(t, 200, w.Code) + assert.Equal(t, "/", util.GetMapValue(parse(w), "/192.168.1.1/0/path")) + assert.Equal(t, "1", util.GetMapValue(parse(w), "/192.168.1.1/2/mode")) + req = httptest.NewRequest("GET", "/", nil) req.RemoteAddr = "192.168.1.1:1234" req.Header.Set("accept", "application/json") @@ -541,7 +549,7 @@ func parse(w *httptest.ResponseRecorder) interface{} { log.Debug("%s response %s", requestID, w.Body.String()) err := json.Unmarshal(w.Body.Bytes(), &result) if err != nil { - log.Error("json_err: %s", err.Error()) + panic(fmt.Errorf("json_err: %s", err.Error())) } return result } diff --git a/metadata/metarepo.go b/metadata/metarepo.go index f8b33f6..2a5a0da 100644 --- a/metadata/metarepo.go +++ b/metadata/metarepo.go @@ -448,6 +448,9 @@ func (r *MetadataRepo) PutAccessRule(rulesMap map[string][]store.AccessRule) err } func (r *MetadataRepo) DeleteAccessRule(hosts []string) error { + if len(hosts) == 0 { + return nil + } return r.storeClient.DeleteAccessRule(hosts) } diff --git a/store/access.go b/store/access.go index ad6a402..439f321 100644 --- a/store/access.go +++ b/store/access.go @@ -139,6 +139,9 @@ func (s *accessStore) GetAccessRule(hosts []string) map[string][]AccessRule { } } else { for _, host := range hosts { + if host == "" { + continue + } t, ok := s.m[host] if ok { result[host] = t.ToAccessRule() diff --git a/store/access_test.go b/store/access_test.go index 17fe9c8..77fbed6 100644 --- a/store/access_test.go +++ b/store/access_test.go @@ -27,7 +27,7 @@ func TestAccessStore(t *testing.T) { rules1 := rulesGet[ip] assert.Equal(t, rules, rules1) - rulesGet2 := accessStore.GetAccessRule([]string{}) + rulesGet2 := accessStore.GetAccessRule(nil) assert.Equal(t, rules, rulesGet2[ip]) assert.Equal(t, rules, rulesGet2[ip2]) From 23e51d90b3b706132eab261f0ff1b8e3a2a6ea98 Mon Sep 17 00:00:00 2001 From: jolestar Date: Fri, 2 Jun 2017 18:21:14 +0800 Subject: [PATCH 10/11] remove only self parameter. --- config.go | 5 ----- config_test.go | 1 - 2 files changed, 6 deletions(-) diff --git a/config.go b/config.go index 449aab6..4da6576 100644 --- a/config.go +++ b/config.go @@ -31,7 +31,6 @@ var ( pprof bool logLevel string enableXff bool - onlySelf bool prefix string listen string listenManage string @@ -55,7 +54,6 @@ type Config struct { PIDFile string `yaml:"pid_file"` EnableXff bool `yaml:"xff"` Prefix string `yaml:"prefix"` - OnlySelf bool `yaml:"only_self"` Listen string `yaml:"listen"` ListenManage string `yaml:"listen_manage"` BasicAuth bool `yaml:"basic_auth"` @@ -77,7 +75,6 @@ func init() { flag.StringVar(&pidFile, "pid_file", "", "PID to write to") flag.BoolVar(&enableXff, "xff", false, "X-Forwarded-For header support") flag.StringVar(&prefix, "prefix", "", "Backend key path prefix") - flag.BoolVar(&onlySelf, "only_self", false, "Only support self metadata query") flag.StringVar(&group, "group", "default", "The metad's group name, same group share same mapping config from backend") flag.StringVar(&listen, "listen", ":80", "Address to listen to (TCP)") flag.StringVar(&listenManage, "listen_manage", "127.0.0.1:9611", "Address to listen to for manage requests (TCP)") @@ -165,8 +162,6 @@ func setConfigFromFlag(config *Config, f *flag.Flag) { config.EnableXff = enableXff case "prefix": config.Prefix = prefix - case "only_self": - config.OnlySelf = onlySelf case "group": config.Group = group case "listen": diff --git a/config_test.go b/config_test.go index 002a593..9a7ba0a 100644 --- a/config_test.go +++ b/config_test.go @@ -15,7 +15,6 @@ func TestConfigFile(t *testing.T) { PIDFile: "/var/run/metad.pid", EnableXff: true, Prefix: "/users/uid1", - OnlySelf: true, Group: "default", Listen: ":8080", ListenManage: "127.0.0.1:9611", From 40c5feddc441341ffb576ab48482baaa5ef482a3 Mon Sep 17 00:00:00 2001 From: jolestar Date: Sat, 3 Jun 2017 17:25:46 +0800 Subject: [PATCH 11/11] refactor etcd client internalSync. --- backends/etcdv3/client.go | 69 +++++++++++++++++++++------------- backends/etcdv3/client_test.go | 47 +++++++++++++++++++++-- 2 files changed, 85 insertions(+), 31 deletions(-) diff --git a/backends/etcdv3/client.go b/backends/etcdv3/client.go index 70fafae..c12339d 100644 --- a/backends/etcdv3/client.go +++ b/backends/etcdv3/client.go @@ -15,6 +15,7 @@ import ( "path" "reflect" "strings" + "sync" "time" ) @@ -106,9 +107,10 @@ func (c *Client) Delete(nodePath string, dir bool) error { } func (c *Client) Sync(store store.Store, stopChan chan bool) { - startedChan := make(chan bool) - go c.internalSync(c.prefix, stopChan, startedChan, c.newInitStoreFunc(c.prefix, store), newProcessSyncChangeFunc(store)) - <-startedChan + initWG := &sync.WaitGroup{} + initWG.Add(1) + go c.internalSync(c.prefix, stopChan, initWG, c.newInitStoreFunc(c.prefix, store), newProcessSyncChangeFunc(store)) + initWG.Wait() } func (c *Client) GetMapping(nodePath string, dir bool) (interface{}, error) { @@ -134,9 +136,10 @@ func (c *Client) DeleteMapping(nodePath string, dir bool) error { } func (c *Client) SyncMapping(mapping store.Store, stopChan chan bool) { - startedChan := make(chan bool) - go c.internalSync(c.mappingPrefix, stopChan, startedChan, c.newInitStoreFunc(c.mappingPrefix, mapping), newProcessSyncChangeFunc(mapping)) - <-startedChan + initWG := &sync.WaitGroup{} + initWG.Add(1) + go c.internalSync(c.mappingPrefix, stopChan, initWG, c.newInitStoreFunc(c.mappingPrefix, mapping), newProcessSyncChangeFunc(mapping)) + initWG.Wait() } func (c *Client) GetAccessRule() (map[string][]store.AccessRule, error) { @@ -179,8 +182,9 @@ func (c *Client) DeleteAccessRule(hosts []string) error { } func (c *Client) SyncAccessRule(accessStore store.AccessStore, stopChan chan bool) { - startedChan := make(chan bool) - go c.internalSync(c.rulePrefix, stopChan, startedChan, func() error { + initWG := &sync.WaitGroup{} + initWG.Add(1) + go c.internalSync(c.rulePrefix, stopChan, initWG, func() error { val, err := c.GetAccessRule() if err != nil { return err @@ -202,7 +206,7 @@ func (c *Client) SyncAccessRule(accessStore store.AccessStore, stopChan chan boo log.Warning("Unknow watch event type: %s ", event.Type) } }) - <-startedChan + initWG.Wait() } func (c *Client) internalGets(prefix, nodePath string) (map[string]string, error) { @@ -250,33 +254,46 @@ func handleGetResp(prefix string, resp *client.GetResponse, vars map[string]stri return nil } -func (c *Client) internalSync(prefix string, stopChan chan bool, startedChan chan bool, initStoreFunc func() error, processChangeFunc func(event *client.Event, nodePath, value string)) { - +func (c *Client) internalSync(prefix string, stopChan chan bool, initWG *sync.WaitGroup, initStoreFunc func() error, processChangeFunc func(event *client.Event, nodePath, value string)) { var rev int64 = 0 init := false + stop := false cancelRoutine := make(chan bool) defer close(cancelRoutine) - for { + var ctx context.Context + var cancel context.CancelFunc + + go func() { select { + case <-stopChan: + log.Info("Sync %s stop.", prefix) + stop = true + if cancel != nil { + cancel() + } case <-cancelRoutine: return - default: } + }() - ctx, cancel := context.WithCancel(context.Background()) - watchChan := c.client.Watch(ctx, prefix, client.WithPrefix(), client.WithRev(rev)) - - go func() { - select { - case <-stopChan: - log.Info("Sync %s stop.", prefix) - cancel() - cancelRoutine <- true + for { + if stop { + if !init { + initWG.Done() } - }() - + return + } + ctx, cancel = context.WithCancel(context.Background()) + watchChan := c.client.Watch(ctx, prefix, client.WithPrefix(), client.WithRev(rev)) + if watchChan == nil { + continue + } for !init { + if stop { + initWG.Done() + return + } err := initStoreFunc() if err != nil { log.Error("Get init value from etcd nodePath:%s, error-type: %s, error: %s", prefix, reflect.TypeOf(err), err.Error()) @@ -286,9 +303,7 @@ func (c *Client) internalSync(prefix string, stopChan chan bool, startedChan cha } log.Info("Init store for prefix %s success.", prefix) init = true - go func() { - startedChan <- true - }() + initWG.Done() } for resp := range watchChan { for _, event := range resp.Events { diff --git a/backends/etcdv3/client_test.go b/backends/etcdv3/client_test.go index 887e2c2..40f0cc5 100644 --- a/backends/etcdv3/client_test.go +++ b/backends/etcdv3/client_test.go @@ -6,6 +6,7 @@ import ( "github.com/yunify/metad/log" "github.com/yunify/metad/store" "math/rand" + "sync" "testing" "time" ) @@ -31,9 +32,47 @@ func TestClientSyncStop(t *testing.T) { metastore := store.New() // expect internalSync not block after stopChan has signal - startedChan := make(chan bool) - storeClient.internalSync(prefix, stopChan, startedChan, storeClient.newInitStoreFunc(prefix, metastore), newProcessSyncChangeFunc(metastore)) - initialized := <-startedChan - log.Info(fmt.Sprint("sync status:", initialized)) + initWG := &sync.WaitGroup{} + initWG.Add(1) + doneWG := &sync.WaitGroup{} + doneWG.Add(1) + + go func() { + storeClient.internalSync(prefix, stopChan, initWG, storeClient.newInitStoreFunc(prefix, metastore), newProcessSyncChangeFunc(metastore)) + doneWG.Done() + }() + initWG.Wait() + doneWG.Wait() +} + +func TestClientSyncStopWhenInitError(t *testing.T) { + + prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) + + stopChan := make(chan bool) + log.Info("prefix is %s", prefix) + nodes := []string{"http://127.0.0.1:2379"} + storeClient, err := NewEtcdClient("default", prefix, nodes, "", "", "", false, "", "") + assert.NoError(t, err) + go func() { + time.Sleep(3 * time.Second) + stopChan <- true + }() + + metastore := store.New() + // expect internalSync not block after stopChan has signal + initWG := &sync.WaitGroup{} + initWG.Add(1) + + doneWG := &sync.WaitGroup{} + doneWG.Add(1) + go func() { + storeClient.internalSync(prefix, stopChan, initWG, func() error { + return fmt.Errorf("always error") + }, newProcessSyncChangeFunc(metastore)) + doneWG.Done() + }() + initWG.Wait() + doneWG.Wait() }