Skip to content

Commit

Permalink
Fix more deepsource warnings in raft.go (#4293)
Browse files Browse the repository at this point in the history
  • Loading branch information
martinmr authored Nov 19, 2019
1 parent 0bdccbb commit a38c92e
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 27 deletions.
10 changes: 5 additions & 5 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (n *Node) SetPeer(pid uint64, addr string) {
}

// Send sends the given RAFT message from this node.
func (n *Node) Send(msg raftpb.Message) {
func (n *Node) Send(msg *raftpb.Message) {
x.AssertTruef(n.Id != msg.To, "Sending message to itself")
data, err := msg.Marshal()
x.Check(err)
Expand Down Expand Up @@ -535,15 +535,15 @@ func (n *Node) DeletePeer(pid uint64) {

var errInternalRetry = errors.New("Retry proposal again")

func (n *Node) proposeConfChange(ctx context.Context, pb raftpb.ConfChange) error {
func (n *Node) proposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
cctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

ch := make(chan error, 1)
id := n.storeConfChange(ch)
// TODO: Delete id from the map.
pb.ID = id
if err := n.Raft().ProposeConfChange(cctx, pb); err != nil {
conf.ID = id
if err := n.Raft().ProposeConfChange(cctx, conf); err != nil {
if cctx.Err() != nil {
return errInternalRetry
}
Expand Down Expand Up @@ -661,7 +661,7 @@ func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadSt
return 0, errors.New("Closer has been called")
case rs := <-readStateCh:
if !bytes.Equal(activeRctx, rs.RequestCtx) {
glog.V(3).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx[:])
glog.V(3).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx)
goto again
}
return rs.Index, nil
Expand Down
33 changes: 19 additions & 14 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,12 @@ func (n *node) applyProposal(e raftpb.Entry) (string, error) {
state.License.Enabled = time.Now().UTC().Before(expiry)
}

if p.MaxLeaseId > state.MaxLeaseId {
switch {
case p.MaxLeaseId > state.MaxLeaseId:
state.MaxLeaseId = p.MaxLeaseId
} else if p.MaxTxnTs > state.MaxTxnTs {
case p.MaxTxnTs > state.MaxTxnTs:
state.MaxTxnTs = p.MaxTxnTs
} else if p.MaxLeaseId != 0 || p.MaxTxnTs != 0 {
case p.MaxLeaseId != 0 || p.MaxTxnTs != 0:
// Could happen after restart when some entries were there in WAL and did not get
// snapshotted.
glog.Infof("Could not apply proposal, ignoring: p.MaxLeaseId=%v, p.MaxTxnTs=%v maxLeaseId=%d"+
Expand Down Expand Up @@ -446,7 +447,8 @@ func (n *node) initAndStartNode() error {
_, restart, err := n.PastLife()
x.Check(err)

if restart {
switch {
case restart:
glog.Infoln("Restarting node for dgraphzero")
sp, err := n.Store.Snapshot()
x.Checkf(err, "Unable to get existing snapshot")
Expand All @@ -464,7 +466,7 @@ func (n *node) initAndStartNode() error {

n.SetRaft(raft.RestartNode(n.Cfg))

} else if len(opts.peer) > 0 {
case len(opts.peer) > 0:
p := conn.GetPools().Connect(opts.peer)
if p == nil {
return errors.Errorf("Unhealthy connection to %v", opts.peer)
Expand All @@ -475,14 +477,15 @@ func (n *node) initAndStartNode() error {
timeout := 8 * time.Second
for {
ctx, cancel := context.WithTimeout(n.ctx, timeout)
defer cancel()
// JoinCluster can block indefinitely, raft ignores conf change proposal
// if it has pending configuration.
_, err := c.JoinCluster(ctx, n.RaftContext)
if err == nil {
cancel()
break
}
if x.ShouldCrash(err) {
cancel()
log.Fatalf("Error while joining cluster: %v", err)
}
glog.Errorf("Error while joining cluster: %v\n", err)
Expand All @@ -491,11 +494,12 @@ func (n *node) initAndStartNode() error {
timeout = 32 * time.Second
}
time.Sleep(timeout) // This is useful because JoinCluster can exit immediately.
cancel()
}
glog.Infof("[%#x] Starting node\n", n.Id)
n.SetRaft(raft.StartNode(n.Cfg, nil))

} else {
default:
data, err := n.RaftContext.Marshal()
x.Check(err)
peers := []raft.Peer{{ID: n.Id, Context: data}}
Expand Down Expand Up @@ -671,8 +675,8 @@ func (n *node) Run() {
}
if leader {
// Leader can send messages in parallel with writing to disk.
for _, msg := range rd.Messages {
n.Send(msg)
for i := range rd.Messages {
n.Send(&rd.Messages[i])
}
}
n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
Expand All @@ -693,18 +697,19 @@ func (n *node) Run() {

for _, entry := range rd.CommittedEntries {
n.Applied.Begin(entry.Index)
if entry.Type == raftpb.EntryConfChange {
switch {
case entry.Type == raftpb.EntryConfChange:
n.applyConfChange(entry)
glog.Infof("Done applying conf change at %#x", n.Id)

} else if entry.Type == raftpb.EntryNormal {
case entry.Type == raftpb.EntryNormal:
key, err := n.applyProposal(entry)
if err != nil {
glog.Errorf("While applying proposal: %v\n", err)
}
n.Proposals.Done(key, err)

} else {
default:
glog.Infof("Unhandled entry: %+v\n", entry)
}
n.Applied.Done(entry.Index)
Expand All @@ -713,8 +718,8 @@ func (n *node) Run() {

if !leader {
// Followers should send messages later.
for _, msg := range rd.Messages {
n.Send(msg)
for i := range rd.Messages {
n.Send(&rd.Messages[i])
}
}
span.Annotate(nil, "Sent messages")
Expand Down
8 changes: 4 additions & 4 deletions systest/group-delete/group_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ func TestNodes(t *testing.T) {
require.NoError(t, err)

for pred := range state1.Groups["3"].Tablets {
url := fmt.Sprintf("http://"+testutil.SockAddrZeroHttp+"/moveTablet?tablet=%s&group=2",
moveUrl := fmt.Sprintf("http://"+testutil.SockAddrZeroHttp+"/moveTablet?tablet=%s&group=2",
url.QueryEscape(pred))
resp, err := http.Get(url)
resp, err := http.Get(moveUrl)
require.NoError(t, err)
require.NoError(t, getError(resp.Body))
time.Sleep(time.Second)
Expand Down Expand Up @@ -166,9 +166,9 @@ func TestNodes(t *testing.T) {
require.NoError(t, err)

for pred := range state1.Groups["2"].Tablets {
url := fmt.Sprintf("http://"+testutil.SockAddrZeroHttp+"/moveTablet?tablet=%s&group=1",
moveUrl := fmt.Sprintf("http://"+testutil.SockAddrZeroHttp+"/moveTablet?tablet=%s&group=1",
url.QueryEscape(pred))
resp, err := http.Get(url)
resp, err := http.Get(moveUrl)
require.NoError(t, err)
require.NoError(t, getError(resp.Body))
time.Sleep(time.Second)
Expand Down
8 changes: 4 additions & 4 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,9 +803,9 @@ func (n *node) Run() {
}
if leader {
// Leader can send messages in parallel with writing to disk.
for _, msg := range rd.Messages {
for i := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
n.Send(msg)
n.Send(&rd.Messages[i])
}
}
if span != nil {
Expand Down Expand Up @@ -957,9 +957,9 @@ func (n *node) Run() {

if !leader {
// Followers should send messages later.
for _, msg := range rd.Messages {
for i := range rd.Messages {
// NOTE: We can do some optimizations here to drop messages.
n.Send(msg)
n.Send(&rd.Messages[i])
}
}
if span != nil {
Expand Down

0 comments on commit a38c92e

Please sign in to comment.