Skip to content

Commit

Permalink
Implements compute node heartbeats to requester node(s)
Browse files Browse the repository at this point in the history
Adds pkg/node/heartbeat which contains a client and a server for sending
heartbeat messages over NATS PubSub.
  • Loading branch information
rossjones committed Mar 26, 2024
1 parent 5428f7f commit f5d9000
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 12 deletions.
29 changes: 26 additions & 3 deletions pkg/compute/management_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,23 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/compute/capacity"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/requests"
"github.com/bacalhau-project/bacalhau/pkg/node/heartbeat"
)

const (
infoUpdateFrequencyMinutes = 5
resourceUpdateFrequencySeconds = 30
heartbeatFrequencySeconds = 30
)

type ManagementClientParams struct {
NodeID string
LabelsProvider models.LabelsProvider
ManagementProxy ManagementEndpoint
NodeInfoDecorator models.NodeInfoDecorator
RegistrationFilePath string
ResourceTracker capacity.Tracker
RegistrationFilePath string
HeartbeatClient *heartbeat.HeartbeatClient
}

// ManagementClient is used to call management functions with
Expand All @@ -37,11 +40,12 @@ type ManagementClient struct {
managementProxy ManagementEndpoint
nodeID string
nodeInfoDecorator models.NodeInfoDecorator
registrationFile *RegistrationFile
resourceTracker capacity.Tracker
registrationFile *RegistrationFile
heartbeatClient *heartbeat.HeartbeatClient
}

func NewManagementClient(params ManagementClientParams) *ManagementClient {
func NewManagementClient(params *ManagementClientParams) *ManagementClient {
return &ManagementClient{
closeChannel: make(chan struct{}, 1),
labelsProvider: params.LabelsProvider,
Expand All @@ -50,6 +54,7 @@ func NewManagementClient(params ManagementClientParams) *ManagementClient {
nodeInfoDecorator: params.NodeInfoDecorator,
registrationFile: NewRegistrationFile(params.RegistrationFilePath),
resourceTracker: params.ResourceTracker,
heartbeatClient: params.HeartbeatClient,
}
}

Expand Down Expand Up @@ -128,10 +133,23 @@ func (m *ManagementClient) updateResources(ctx context.Context) {
}
}

func (m *ManagementClient) heartbeat(ctx context.Context, seq uint64) {
if err := m.heartbeatClient.SendHeartbeat(ctx, seq); err != nil {
log.Ctx(ctx).Error().Err(err).Msgf("heartbeat failed sending sequence %d", seq)
}
}

func (m *ManagementClient) Start(ctx context.Context) {
infoTicker := time.NewTicker(infoUpdateFrequencyMinutes * time.Minute)
resourceTicker := time.NewTicker(resourceUpdateFrequencySeconds * time.Second)

// The heartbeat ticker will fire twice as often as the configured, to ensure that
// we don't slip outside the window. If we only ever sent on the configured
// frequency we are at risk of the node's liveness flapping between good and bad.
heartbeatTicker := time.NewTicker((heartbeatFrequencySeconds / 2) * time.Second)

var heartbeatSequence uint64 = 0

loop := true
for loop {
select {
Expand All @@ -145,9 +163,14 @@ func (m *ManagementClient) Start(ctx context.Context) {
case <-resourceTicker.C:
// Send the latest resource info
m.updateResources(ctx)
case <-heartbeatTicker.C:
// Send a heartbeat to the requester node
heartbeatSequence += 1
m.heartbeat(ctx, heartbeatSequence)
}
}

heartbeatTicker.Stop()
resourceTicker.Stop()
infoTicker.Stop()
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/nats/transport/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (c *NATSTransportConfig) Validate() error {
}

type NATSTransport struct {
Config NATSTransportConfig
Config *NATSTransportConfig
nodeID string
natsServer *nats_helper.ServerManager
natsClient *nats_helper.ClientManager
Expand All @@ -93,7 +93,7 @@ type NATSTransport struct {

//nolint:funlen
func NewNATSTransport(ctx context.Context,
config NATSTransportConfig) (*NATSTransport, error) {
config *NATSTransportConfig) (*NATSTransport, error) {
log.Debug().Msgf("Creating NATS transport with config: %+v", config)
if err := config.Validate(); err != nil {
return nil, fmt.Errorf("error validating nats transport config. %w", err)
Expand Down Expand Up @@ -190,7 +190,7 @@ func NewNATSTransport(ctx context.Context,
}, nil
}

func CreateClient(ctx context.Context, config NATSTransportConfig) (*nats_helper.ClientManager, error) {
func CreateClient(ctx context.Context, config *NATSTransportConfig) (*nats_helper.ClientManager, error) {
// create nats client
log.Debug().Msgf("Creating NATS client with servers: %s", strings.Join(config.Orchestrators, ","))
clientOptions := []nats.Option{
Expand Down
7 changes: 5 additions & 2 deletions pkg/node/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
executor_util "github.com/bacalhau-project/bacalhau/pkg/executor/util"
"github.com/bacalhau-project/bacalhau/pkg/model"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/node/heartbeat"
"github.com/bacalhau-project/bacalhau/pkg/publicapi"
compute_endpoint "github.com/bacalhau-project/bacalhau/pkg/publicapi/endpoint/compute"
"github.com/bacalhau-project/bacalhau/pkg/publisher"
Expand Down Expand Up @@ -58,6 +59,7 @@ func NewComputeNode(
computeCallback compute.Callback,
managementProxy compute.ManagementEndpoint,
configuredLabels map[string]string,
heartbeatClient *heartbeat.HeartbeatClient,
) (*Compute, error) {
executionStore := config.ExecutionStore

Expand Down Expand Up @@ -236,7 +238,7 @@ func NewComputeNode(

var managementClient *compute.ManagementClient
// TODO: When we no longer use libP2P for management, we should remove this
// as the managementProxy will always be set.
// as the managementProxy will always be set for NATS
if managementProxy != nil {
// TODO: Make the registration lock folder a config option so that we have it
// available and don't have to depend on getting the repo folder.
Expand All @@ -247,13 +249,14 @@ func NewComputeNode(
// Set up the management client which will attempt to register this node
// with the requester node, and then if successful will send regular node
// info updates.
managementClient = compute.NewManagementClient(compute.ManagementClientParams{
managementClient = compute.NewManagementClient(&compute.ManagementClientParams{
NodeID: nodeID,
LabelsProvider: labelsProvider,
ManagementProxy: managementProxy,
NodeInfoDecorator: nodeInfoDecorator,
RegistrationFilePath: regFilename,
ResourceTracker: runningCapacityTracker,
HeartbeatClient: heartbeatClient,
})
if err := managementClient.RegisterNode(ctx); err != nil {
return nil, fmt.Errorf("failed to register node with requester: %s", err)
Expand Down
47 changes: 47 additions & 0 deletions pkg/node/heartbeat/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package heartbeat

import (
"context"

natsPubSub "github.com/bacalhau-project/bacalhau/pkg/nats/pubsub"
"github.com/bacalhau-project/bacalhau/pkg/pubsub"
"github.com/rs/zerolog/log"

"github.com/nats-io/nats.go"
)

type HeartbeatClient struct {
publisher *natsPubSub.PubSub[Heartbeat]
nodeID string
}

func NewClient(conn *nats.Conn, nodeID string) (*HeartbeatClient, error) {
subParams := natsPubSub.PubSubParams{
Subject: heartbeatTopic,
Conn: conn,
}

publisher, err := natsPubSub.NewPubSub[Heartbeat](subParams)
if err != nil {
return nil, err
}

return &HeartbeatClient{publisher: publisher, nodeID: nodeID}, nil
}

func (h *HeartbeatClient) Start(ctx context.Context) error {
// Waits until we are cancelled and then closes the publisher
<-ctx.Done()
return h.publisher.Close(ctx)
}

func (h *HeartbeatClient) SendHeartbeat(ctx context.Context, sequence uint64) error {
log.Ctx(ctx).Trace().Msgf("sending heartbeat seq: %d", sequence)
return h.Publish(ctx, Heartbeat{NodeID: h.nodeID, Sequence: sequence})
}

func (h *HeartbeatClient) Publish(ctx context.Context, message Heartbeat) error {
return h.publisher.Publish(ctx, message)
}

var _ pubsub.Publisher[Heartbeat] = (*HeartbeatClient)(nil)
54 changes: 54 additions & 0 deletions pkg/node/heartbeat/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package heartbeat

import (
"context"

natsPubSub "github.com/bacalhau-project/bacalhau/pkg/nats/pubsub"
"github.com/bacalhau-project/bacalhau/pkg/pubsub"

"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
)

type HeartbeatServer struct {
subscription *natsPubSub.PubSub[Heartbeat]
}

func NewServer(conn *nats.Conn) (*HeartbeatServer, error) {
subParams := natsPubSub.PubSubParams{
Subject: heartbeatTopic,
Conn: conn,
}

subscription, err := natsPubSub.NewPubSub[Heartbeat](subParams)
if err != nil {
return nil, err
}

return &HeartbeatServer{subscription: subscription}, nil
}

func (h *HeartbeatServer) Start(ctx context.Context) error {
if err := h.subscription.Subscribe(ctx, h); err != nil {
return err
}

go func(ctx context.Context) {
log.Ctx(ctx).Info().Msg("Heartbeat server started")
<-ctx.Done()
_ = h.subscription.Close(ctx)
log.Ctx(ctx).Info().Msg("Heartbeat server shutdown")
}(ctx)

return nil
}

func (h *HeartbeatServer) Handle(ctx context.Context, message Heartbeat) error {
log.Ctx(ctx).Trace().Msgf("heartbeat received from %s", message.NodeID)

// TODO: Process the heartbeat (e.g. update the node's last seen time)

return nil
}

var _ pubsub.Subscriber[Heartbeat] = (*HeartbeatServer)(nil)
13 changes: 13 additions & 0 deletions pkg/node/heartbeat/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package heartbeat

const heartbeatTopic = "heartbeat"

// Heartbeat represents a heartbeat message from a specific node.
// It contains the node ID and the sequence number of the heartbeat
// which is monotonically increasing (reboots aside). We do not
// use timestamps on the client, we rely solely on the server-side
// time to avoid clock drift issues.
type Heartbeat struct {
NodeID string
Sequence uint64
}
20 changes: 19 additions & 1 deletion pkg/node/manager/node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/lib/concurrency"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/requests"
"github.com/bacalhau-project/bacalhau/pkg/node/heartbeat"
"github.com/bacalhau-project/bacalhau/pkg/routing"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
Expand All @@ -25,10 +26,12 @@ const (
type NodeManager struct {
nodeInfo routing.NodeInfoStore
resourceMap *concurrency.StripedMap[models.Resources]
heartbeats *heartbeat.HeartbeatServer
}

type NodeManagerParams struct {
NodeInfo routing.NodeInfoStore
NodeInfo routing.NodeInfoStore
Heartbeats *heartbeat.HeartbeatServer
}

// NewNodeManager constructs a new node manager and returns a pointer
Expand All @@ -37,9 +40,24 @@ func NewNodeManager(params NodeManagerParams) *NodeManager {
return &NodeManager{
resourceMap: concurrency.NewStripedMap[models.Resources](resourceMapLockCount),
nodeInfo: params.NodeInfo,
heartbeats: params.Heartbeats,
}
}

func (n *NodeManager) Start(ctx context.Context) error {
log.Ctx(ctx).Info().Msg("Node manager started")

if n.heartbeats != nil {
err := n.heartbeats.Start(ctx)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to start heartbeat server")
return err
}
}

return nil
}

//
// ---- Implementation of compute.ManagementEndpoint ----
//
Expand Down
Loading

0 comments on commit f5d9000

Please sign in to comment.