Skip to content

Commit

Permalink
Fix more golint warnings in the zero package. (#3562)
Browse files Browse the repository at this point in the history
  • Loading branch information
martinmr authored Jun 18, 2019
1 parent c730e91 commit bed4013
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 7 deletions.
15 changes: 14 additions & 1 deletion dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type syncMark struct {
ts uint64
}

// Oracle stores and manages the transaction state and conflict detection.
type Oracle struct {
x.SafeMutex
commits map[uint64]uint64 // startTs -> commitTs
Expand All @@ -54,6 +55,7 @@ type Oracle struct {
syncMarks []syncMark
}

// Init initializes the oracle.
func (o *Oracle) Init() {
o.commits = make(map[uint64]uint64)
o.keyCommit = make(map[string]uint64)
Expand Down Expand Up @@ -274,6 +276,7 @@ func (o *Oracle) storePending(ids *pb.AssignedIds) {
o.maxAssigned = x.Max(o.maxAssigned, max)
}

// MaxPending returns the maximum assigned timestamp.
func (o *Oracle) MaxPending() uint64 {
o.RLock()
defer o.RUnlock()
Expand Down Expand Up @@ -397,6 +400,10 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
return s.proposeTxn(ctx, src)
}

// CommitOrAbort either commits a transaction or aborts it.
// The abortion can happen under the following conditions
// 1) the api.TxnContext.Aborted flag is set in the src argument
// 2) if there's an error (e.g server is not the leader or there's a conflicting transaction)
func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.TxnContext, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
Expand All @@ -417,7 +424,11 @@ func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.T
var errClosed = errors.New("Streaming closed by oracle")
var errNotLeader = errors.New("Node is no longer leader")

func (s *Server) Oracle(unused *api.Payload, server pb.Zero_OracleServer) error {
// Oracle streams the oracle state to the alphas.
// The first entry sent by Zero contains the entire state of transactions. Zero periodically
// confirms receipt from the group, and truncates its state. This 2-way acknowledgement is a
// safe way to get the status of all the transactions.
func (s *Server) Oracle(_ *api.Payload, server pb.Zero_OracleServer) error {
if !s.Node.AmLeader() {
return errNotLeader
}
Expand Down Expand Up @@ -448,6 +459,7 @@ func (s *Server) Oracle(unused *api.Payload, server pb.Zero_OracleServer) error
}
}

// SyncedUntil returns the timestamp up to which all the nodes have synced.
func (s *Server) SyncedUntil() uint64 {
s.orc.Lock()
defer s.orc.Unlock()
Expand All @@ -467,6 +479,7 @@ func (s *Server) SyncedUntil() uint64 {
return syncUntil
}

// TryAbort attempts to abort the given transactions which are not already committed..
func (s *Server) TryAbort(ctx context.Context,
txns *pb.TxnTimestamps) (*pb.OracleDelta, error) {
delta := &pb.OracleDelta{}
Expand Down
1 change: 1 addition & 0 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type options struct {

var opts options

// Zero is the sub-command used to start Zero servers.
var Zero x.SubCommand

func init() {
Expand Down
1 change: 1 addition & 0 deletions dgraph/cmd/zero/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/golang/glog"
)

// Telemetry holds information about the state of the zero server.
type Telemetry struct {
Arch string
Cid string
Expand Down
13 changes: 12 additions & 1 deletion dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
errServerShutDown = errors.New("Server is being shut down")
)

// Server implements the zero server.
type Server struct {
x.SafeMutex
Node *node
Expand All @@ -62,6 +63,7 @@ type Server struct {
blockCommitsOn *sync.Map
}

// Init initializes the zero server.
func (s *Server) Init() {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -143,6 +145,7 @@ func (s *Server) member(addr string) *pb.Member {
return nil
}

// Leader returns a connection pool to the zero leader.
func (s *Server) Leader(gid uint32) *conn.Pool {
s.RLock()
defer s.RUnlock()
Expand Down Expand Up @@ -171,6 +174,7 @@ func (s *Server) Leader(gid uint32) *conn.Pool {
return healthyPool
}

// KnownGroups returns a list of the known groups.
func (s *Server) KnownGroups() []uint32 {
var groups []uint32
s.RLock()
Expand Down Expand Up @@ -198,6 +202,7 @@ func (s *Server) hasLeader(gid uint32) bool {
return false
}

// SetMembershipState updates the membership state to the given one.
func (s *Server) SetMembershipState(state *pb.MembershipState) {
s.Lock()
defer s.Unlock()
Expand All @@ -220,6 +225,7 @@ func (s *Server) SetMembershipState(state *pb.MembershipState) {
s.nextGroup = uint32(len(state.Groups) + 1)
}

// MarshalMembershipState returns the marshaled membership state.
func (s *Server) MarshalMembershipState() ([]byte, error) {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -369,7 +375,9 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
return res, nil
}

// Its users responsibility to ensure that node doesn't come back again before calling the api.
// removeNode removes the given node from the given group.
// It's the user's responsibility to ensure that node doesn't come back again
// before calling the api.
func (s *Server) removeNode(ctx context.Context, nodeId uint64, groupId uint32) error {
if groupId == 0 {
return s.Node.ProposePeerRemoval(ctx, nodeId)
Expand Down Expand Up @@ -525,6 +533,7 @@ func (s *Server) Connect(ctx context.Context,
return resp, nil
}

// ShouldServe returns the tablet serving the predicate passed in the request.
func (s *Server) ShouldServe(
ctx context.Context, tablet *pb.Tablet) (resp *pb.Tablet, err error) {
ctx, span := otrace.StartSpan(ctx, "Zero.ShouldServe")
Expand Down Expand Up @@ -577,6 +586,7 @@ func (s *Server) ShouldServe(
return tab, nil
}

// UpdateMembership updates the membership of the given group.
func (s *Server) UpdateMembership(ctx context.Context, group *pb.Group) (*api.Payload, error) {
proposals, err := s.createProposals(group)
if err != nil {
Expand Down Expand Up @@ -670,6 +680,7 @@ func (s *Server) deletePredicates(ctx context.Context, group *pb.Group) error {
return nil
}

// StreamMembership periodically streams the membership state to the given stream.
func (s *Server) StreamMembership(_ *api.Payload, stream pb.Zero_StreamMembershipServer) error {
// Send MembershipState right away. So, the connection is correctly established.
ctx := stream.Context()
Expand Down
3 changes: 1 addition & 2 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,7 @@ func (n *node) leaderBlocking() (*conn.Pool, error) {
if pool == nil {
// Functions like retrieveSnapshot and joinPeers are blocking at initial start and
// leader election for a group might not have happened when it is called. If we can't
// find a leader, get latest state from
// Zero.
// find a leader, get latest state from Zero.
if err := UpdateMembershipState(context.Background()); err != nil {
return nil, fmt.Errorf("Error while trying to update membership state: %+v", err)
}
Expand Down
3 changes: 0 additions & 3 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,9 +808,6 @@ func (g *groupi) processOracleDeltaStream() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := pb.NewZeroClient(pl.Get())
// The first entry send by Zero contains the entire state of transactions. Zero periodically
// confirms receipt from the group, and truncates its state. This 2-way acknowledgement is a
// safe way to get the status of all the transactions.
stream, err := c.Oracle(ctx, &api.Payload{})
if err != nil {
glog.Errorf("Error while calling Oracle %v\n", err)
Expand Down

0 comments on commit bed4013

Please sign in to comment.