diff --git a/conn/node.go b/conn/node.go index e00b9c2e7db..5ec79005f7d 100644 --- a/conn/node.go +++ b/conn/node.go @@ -233,7 +233,7 @@ func (n *Node) SetPeer(pid uint64, addr string) { } // Send sends the given RAFT message from this node. -func (n *Node) Send(msg raftpb.Message) { +func (n *Node) Send(msg *raftpb.Message) { x.AssertTruef(n.Id != msg.To, "Sending message to itself") data, err := msg.Marshal() x.Check(err) @@ -535,15 +535,15 @@ func (n *Node) DeletePeer(pid uint64) { var errInternalRetry = errors.New("Retry proposal again") -func (n *Node) proposeConfChange(ctx context.Context, pb raftpb.ConfChange) error { +func (n *Node) proposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { cctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() ch := make(chan error, 1) id := n.storeConfChange(ch) // TODO: Delete id from the map. - pb.ID = id - if err := n.Raft().ProposeConfChange(cctx, pb); err != nil { + conf.ID = id + if err := n.Raft().ProposeConfChange(cctx, conf); err != nil { if cctx.Err() != nil { return errInternalRetry } @@ -661,7 +661,7 @@ func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadSt return 0, errors.New("Closer has been called") case rs := <-readStateCh: if !bytes.Equal(activeRctx, rs.RequestCtx) { - glog.V(3).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx[:]) + glog.V(3).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx) goto again } return rs.Index, nil diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index be3db53eb17..53dedaf35a2 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -374,11 +374,12 @@ func (n *node) applyProposal(e raftpb.Entry) (string, error) { state.License.Enabled = time.Now().UTC().Before(expiry) } - if p.MaxLeaseId > state.MaxLeaseId { + switch { + case p.MaxLeaseId > state.MaxLeaseId: state.MaxLeaseId = p.MaxLeaseId - } else if p.MaxTxnTs > state.MaxTxnTs { + case p.MaxTxnTs > state.MaxTxnTs: state.MaxTxnTs = p.MaxTxnTs - } else if p.MaxLeaseId != 0 || p.MaxTxnTs != 0 { + case p.MaxLeaseId != 0 || p.MaxTxnTs != 0: // Could happen after restart when some entries were there in WAL and did not get // snapshotted. glog.Infof("Could not apply proposal, ignoring: p.MaxLeaseId=%v, p.MaxTxnTs=%v maxLeaseId=%d"+ @@ -446,7 +447,8 @@ func (n *node) initAndStartNode() error { _, restart, err := n.PastLife() x.Check(err) - if restart { + switch { + case restart: glog.Infoln("Restarting node for dgraphzero") sp, err := n.Store.Snapshot() x.Checkf(err, "Unable to get existing snapshot") @@ -464,7 +466,7 @@ func (n *node) initAndStartNode() error { n.SetRaft(raft.RestartNode(n.Cfg)) - } else if len(opts.peer) > 0 { + case len(opts.peer) > 0: p := conn.GetPools().Connect(opts.peer) if p == nil { return errors.Errorf("Unhealthy connection to %v", opts.peer) @@ -475,14 +477,15 @@ func (n *node) initAndStartNode() error { timeout := 8 * time.Second for { ctx, cancel := context.WithTimeout(n.ctx, timeout) - defer cancel() // JoinCluster can block indefinitely, raft ignores conf change proposal // if it has pending configuration. _, err := c.JoinCluster(ctx, n.RaftContext) if err == nil { + cancel() break } if x.ShouldCrash(err) { + cancel() log.Fatalf("Error while joining cluster: %v", err) } glog.Errorf("Error while joining cluster: %v\n", err) @@ -491,11 +494,12 @@ func (n *node) initAndStartNode() error { timeout = 32 * time.Second } time.Sleep(timeout) // This is useful because JoinCluster can exit immediately. + cancel() } glog.Infof("[%#x] Starting node\n", n.Id) n.SetRaft(raft.StartNode(n.Cfg, nil)) - } else { + default: data, err := n.RaftContext.Marshal() x.Check(err) peers := []raft.Peer{{ID: n.Id, Context: data}} @@ -671,8 +675,8 @@ func (n *node) Run() { } if leader { // Leader can send messages in parallel with writing to disk. - for _, msg := range rd.Messages { - n.Send(msg) + for i := range rd.Messages { + n.Send(&rd.Messages[i]) } } n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot) @@ -693,18 +697,19 @@ func (n *node) Run() { for _, entry := range rd.CommittedEntries { n.Applied.Begin(entry.Index) - if entry.Type == raftpb.EntryConfChange { + switch { + case entry.Type == raftpb.EntryConfChange: n.applyConfChange(entry) glog.Infof("Done applying conf change at %#x", n.Id) - } else if entry.Type == raftpb.EntryNormal { + case entry.Type == raftpb.EntryNormal: key, err := n.applyProposal(entry) if err != nil { glog.Errorf("While applying proposal: %v\n", err) } n.Proposals.Done(key, err) - } else { + default: glog.Infof("Unhandled entry: %+v\n", entry) } n.Applied.Done(entry.Index) @@ -713,8 +718,8 @@ func (n *node) Run() { if !leader { // Followers should send messages later. - for _, msg := range rd.Messages { - n.Send(msg) + for i := range rd.Messages { + n.Send(&rd.Messages[i]) } } span.Annotate(nil, "Sent messages") diff --git a/systest/group-delete/group_delete_test.go b/systest/group-delete/group_delete_test.go index 621e19f9000..d78816b3300 100644 --- a/systest/group-delete/group_delete_test.go +++ b/systest/group-delete/group_delete_test.go @@ -134,9 +134,9 @@ func TestNodes(t *testing.T) { require.NoError(t, err) for pred := range state1.Groups["3"].Tablets { - url := fmt.Sprintf("http://"+testutil.SockAddrZeroHttp+"/moveTablet?tablet=%s&group=2", + moveUrl := fmt.Sprintf("http://"+testutil.SockAddrZeroHttp+"/moveTablet?tablet=%s&group=2", url.QueryEscape(pred)) - resp, err := http.Get(url) + resp, err := http.Get(moveUrl) require.NoError(t, err) require.NoError(t, getError(resp.Body)) time.Sleep(time.Second) @@ -166,9 +166,9 @@ func TestNodes(t *testing.T) { require.NoError(t, err) for pred := range state1.Groups["2"].Tablets { - url := fmt.Sprintf("http://"+testutil.SockAddrZeroHttp+"/moveTablet?tablet=%s&group=1", + moveUrl := fmt.Sprintf("http://"+testutil.SockAddrZeroHttp+"/moveTablet?tablet=%s&group=1", url.QueryEscape(pred)) - resp, err := http.Get(url) + resp, err := http.Get(moveUrl) require.NoError(t, err) require.NoError(t, getError(resp.Body)) time.Sleep(time.Second) diff --git a/worker/draft.go b/worker/draft.go index c66ddaef302..8f25b255cd3 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -803,9 +803,9 @@ func (n *node) Run() { } if leader { // Leader can send messages in parallel with writing to disk. - for _, msg := range rd.Messages { + for i := range rd.Messages { // NOTE: We can do some optimizations here to drop messages. - n.Send(msg) + n.Send(&rd.Messages[i]) } } if span != nil { @@ -957,9 +957,9 @@ func (n *node) Run() { if !leader { // Followers should send messages later. - for _, msg := range rd.Messages { + for i := range rd.Messages { // NOTE: We can do some optimizations here to drop messages. - n.Send(msg) + n.Send(&rd.Messages[i]) } } if span != nil {