Skip to content

Commit

Permalink
Merge pull request moby#1860 from fcrisciani/network-db-stabilization
Browse files Browse the repository at this point in the history
Network db stabilization
  • Loading branch information
mavenugo authored Aug 1, 2017
2 parents b919385 + 26db0b9 commit e85aeed
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 107 deletions.
3 changes: 2 additions & 1 deletion networkdb/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ type tableEventMessage struct {
}

func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
return false
otherm := other.(*tableEventMessage)
return m.tname == otherm.tname && m.id == otherm.id && m.key == otherm.key
}

func (m *tableEventMessage) Message() []byte {
Expand Down
7 changes: 0 additions & 7 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,6 @@ func (nDB *NetworkDB) reconnectNode() {
return
}

// Update all the local table state to a new time to
// force update on the node we are trying to rejoin, just in
// case that node has these in deleting state still. This is
// facilitate fast convergence after recovering from a gossip
// failure.
nDB.updateLocalTableTime()

logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
nDB.bulkSync([]string{node.Name}, true)
}
Expand Down
49 changes: 25 additions & 24 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,12 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
}

func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
var flushEntries bool
// Update our local clock if the received messages has newer
// time.
nDB.networkClock.Witness(nEvent.LTime)

nDB.Lock()
defer func() {
nDB.Unlock()
// When a node leaves a network on the last task removal cleanup the
// local entries for this network & node combination. When the tasks
// on a network are removed we could have missed the gossip updates.
// Not doing this cleanup can leave stale entries because bulksyncs
// from the node will no longer include this network state.
//
// deleteNodeNetworkEntries takes nDB lock.
if flushEntries {
nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
}
}()
defer nDB.Unlock()

if nEvent.NodeName == nDB.config.NodeName {
return false
Expand Down Expand Up @@ -179,7 +166,12 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
n.leaving = nEvent.Type == NetworkEventTypeLeave
if n.leaving {
n.reapTime = reapInterval
flushEntries = true

// The remote node is leaving the network, but not the gossip cluster.
// Mark all its entries in deleted state, this will guarantee that
// if some node bulk sync with us, the deleted state of
// these entries will be propagated.
nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
}

if nEvent.Type == NetworkEventTypeLeave {
Expand Down Expand Up @@ -214,17 +206,22 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
nDB.RLock()
networks := nDB.networks[nDB.config.NodeName]
network, ok := networks[tEvent.NetworkID]
nDB.RUnlock()
if !ok || network.leaving {
return true
// Check if the owner of the event is still part of the network
nodes := nDB.networkNodes[tEvent.NetworkID]
var nodePresent bool
for _, node := range nodes {
if node == tEvent.NodeName {
nodePresent = true
break
}
}

e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we don't have the entry here nothing to do.
nDB.RUnlock()
if !ok || network.leaving || !nodePresent {
// I'm out of the network OR the event owner is not anymore part of the network so do not propagate
return false
}

e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
if err == nil {
// We have the latest state. Ignore the event
// since it is stale.
Expand All @@ -249,6 +246,11 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
nDB.Unlock()

if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we didn't have the entry here don't repropagate
return true
}

var op opType
switch tEvent.Type {
case TableEventTypeCreate:
Expand Down Expand Up @@ -289,8 +291,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
return
}

// Do not rebroadcast a bulk sync
if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast && !isBulkSync {
if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
var err error
buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions networkdb/event_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
var failed bool
logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
e.broadcastNodeEvent(mn.Addr, opDelete)
e.nDB.deleteNodeTableEntries(mn.Name)
e.nDB.deleteNetworkEntriesForNode(mn.Name)
// The node left or failed, delete all the entries created by it.
// If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted
// If the node instead left because was going down, then it makes sense to just delete all its state
e.nDB.Lock()
e.nDB.deleteNetworkEntriesForNode(mn.Name)
e.nDB.deleteNodeTableEntries(mn.Name)
if n, ok := e.nDB.nodes[mn.Name]; ok {
delete(e.nDB.nodes, mn.Name)

Expand All @@ -61,7 +64,6 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
if failed {
logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
}

}

func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {
Expand Down
120 changes: 48 additions & 72 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
}

func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
nDB.Lock()
for nid, nodes := range nDB.networkNodes {
updatedNodes := make([]string, 0, len(nodes))
for _, node := range nodes {
Expand All @@ -433,11 +432,25 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
}

delete(nDB.networks, deletedNode)
nDB.Unlock()
}

// deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes:
// 1) when a notification is coming of a node leaving the network
// - Walk all the network entries and mark the leaving node's entries for deletion
// These will be garbage collected when the reap timer will expire
// 2) when the local node is leaving the network
// - Walk all the network entries:
// A) if the entry is owned by the local node
// then we will mark it for deletion. This will ensure that if a node did not
// yet received the notification that the local node is leaving, will be aware
// of the entries to be deleted.
// B) if the entry is owned by a remote node, then we can safely delete it. This
// ensures that if we join back this network as we receive the CREATE event for
// entries owned by remote nodes, we will accept them and we notify the application
func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
nDB.Lock()
// Indicates if the delete is triggered for the local node
isNodeLocal := node == nDB.config.NodeName

nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
func(path string, v interface{}) bool {
oldEntry := v.(*entry)
Expand All @@ -446,7 +459,15 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
tname := params[1]
key := params[2]

if oldEntry.node != node {
// If the entry is owned by a remote node and this node is not leaving the network
if oldEntry.node != node && !isNodeLocal {
// Don't do anything because the event is triggered for a node that does not own this entry
return false
}

// If this entry is already marked for deletion and this node is not leaving the network
if oldEntry.deleting && !isNodeLocal {
// Don't do anything this entry will be already garbage collected using the old reapTime
return false
}

Expand All @@ -458,17 +479,29 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
reapTime: reapInterval,
}

nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
// we arrived at this point in 2 cases:
// 1) this entry is owned by the node that is leaving the network
// 2) the local node is leaving the network
if oldEntry.node == node {
if isNodeLocal {
// TODO fcrisciani: this can be removed if there is no way to leave the network
// without doing a delete of all the objects
entry.ltime++
}
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
} else {
// the local node is leaving the network, all the entries of remote nodes can be safely removed
nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
}

nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
return false
})
nDB.Unlock()
}

func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nDB.Lock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
oldEntry := v.(*entry)
if oldEntry.node != node {
Expand All @@ -480,21 +513,12 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nid := params[1]
key := params[2]

entry := &entry{
ltime: oldEntry.ltime,
node: node,
value: oldEntry.value,
deleting: true,
reapTime: reapInterval,
}

nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))

nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
return false
})
nDB.Unlock()
}

// WalkTable walks a single table in NetworkDB and invokes the passed
Expand Down Expand Up @@ -573,37 +597,12 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {

nDB.Lock()
defer nDB.Unlock()
var (
paths []string
entries []*entry
)

// Remove myself from the list of the nodes participating to the network
nDB.deleteNetworkNode(nid, nDB.config.NodeName)

nwWalker := func(path string, v interface{}) bool {
entry, ok := v.(*entry)
if !ok {
return false
}
paths = append(paths, path)
entries = append(entries, entry)
return false
}

nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), nwWalker)
for _, path := range paths {
params := strings.Split(path[1:], "/")
tname := params[1]
key := params[2]

if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
}

if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}
}
// Update all the local entries marking them for deletion and delete all the remote entries
nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeName)

nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
if !ok {
Expand All @@ -616,6 +615,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
}

n.ltime = ltime
n.reapTime = reapInterval
n.leaving = true
return nil
}
Expand Down Expand Up @@ -679,27 +679,3 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
n.ltime = ltime
}
}

func (nDB *NetworkDB) updateLocalTableTime() {
nDB.Lock()
defer nDB.Unlock()

ltime := nDB.tableClock.Increment()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
entry := v.(*entry)
if entry.node != nDB.config.NodeName {
return false
}

params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
key := params[2]
entry.ltime = ltime

nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)

return false
})
}

0 comments on commit e85aeed

Please sign in to comment.