From f5d90002b13a5eabcb839383e631a94893e95baa Mon Sep 17 00:00:00 2001 From: Ross Jones Date: Tue, 26 Mar 2024 16:29:44 +0000 Subject: [PATCH] Implements compute node heartbeats to requester node(s) Adds pkg/node/heartbeat which contains a client and a server for sending heartbeat messages over NATS PubSub. --- pkg/compute/management_client.go | 29 +++++++++++++++-- pkg/nats/transport/nats.go | 6 ++-- pkg/node/compute.go | 7 +++-- pkg/node/heartbeat/client.go | 47 +++++++++++++++++++++++++++ pkg/node/heartbeat/server.go | 54 ++++++++++++++++++++++++++++++++ pkg/node/heartbeat/types.go | 13 ++++++++ pkg/node/manager/node_manager.go | 20 +++++++++++- pkg/node/node.go | 43 +++++++++++++++++++++++-- pkg/test/compute/setup_test.go | 1 + 9 files changed, 208 insertions(+), 12 deletions(-) create mode 100644 pkg/node/heartbeat/client.go create mode 100644 pkg/node/heartbeat/server.go create mode 100644 pkg/node/heartbeat/types.go diff --git a/pkg/compute/management_client.go b/pkg/compute/management_client.go index 31b0726ed9..274c47376b 100644 --- a/pkg/compute/management_client.go +++ b/pkg/compute/management_client.go @@ -11,11 +11,13 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/compute/capacity" "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/models/requests" + "github.com/bacalhau-project/bacalhau/pkg/node/heartbeat" ) const ( infoUpdateFrequencyMinutes = 5 resourceUpdateFrequencySeconds = 30 + heartbeatFrequencySeconds = 30 ) type ManagementClientParams struct { @@ -23,8 +25,9 @@ type ManagementClientParams struct { LabelsProvider models.LabelsProvider ManagementProxy ManagementEndpoint NodeInfoDecorator models.NodeInfoDecorator - RegistrationFilePath string ResourceTracker capacity.Tracker + RegistrationFilePath string + HeartbeatClient *heartbeat.HeartbeatClient } // ManagementClient is used to call management functions with @@ -37,11 +40,12 @@ type ManagementClient struct { managementProxy ManagementEndpoint nodeID string nodeInfoDecorator models.NodeInfoDecorator - registrationFile *RegistrationFile resourceTracker capacity.Tracker + registrationFile *RegistrationFile + heartbeatClient *heartbeat.HeartbeatClient } -func NewManagementClient(params ManagementClientParams) *ManagementClient { +func NewManagementClient(params *ManagementClientParams) *ManagementClient { return &ManagementClient{ closeChannel: make(chan struct{}, 1), labelsProvider: params.LabelsProvider, @@ -50,6 +54,7 @@ func NewManagementClient(params ManagementClientParams) *ManagementClient { nodeInfoDecorator: params.NodeInfoDecorator, registrationFile: NewRegistrationFile(params.RegistrationFilePath), resourceTracker: params.ResourceTracker, + heartbeatClient: params.HeartbeatClient, } } @@ -128,10 +133,23 @@ func (m *ManagementClient) updateResources(ctx context.Context) { } } +func (m *ManagementClient) heartbeat(ctx context.Context, seq uint64) { + if err := m.heartbeatClient.SendHeartbeat(ctx, seq); err != nil { + log.Ctx(ctx).Error().Err(err).Msgf("heartbeat failed sending sequence %d", seq) + } +} + func (m *ManagementClient) Start(ctx context.Context) { infoTicker := time.NewTicker(infoUpdateFrequencyMinutes * time.Minute) resourceTicker := time.NewTicker(resourceUpdateFrequencySeconds * time.Second) + // The heartbeat ticker will fire twice as often as the configured, to ensure that + // we don't slip outside the window. If we only ever sent on the configured + // frequency we are at risk of the node's liveness flapping between good and bad. + heartbeatTicker := time.NewTicker((heartbeatFrequencySeconds / 2) * time.Second) + + var heartbeatSequence uint64 = 0 + loop := true for loop { select { @@ -145,9 +163,14 @@ func (m *ManagementClient) Start(ctx context.Context) { case <-resourceTicker.C: // Send the latest resource info m.updateResources(ctx) + case <-heartbeatTicker.C: + // Send a heartbeat to the requester node + heartbeatSequence += 1 + m.heartbeat(ctx, heartbeatSequence) } } + heartbeatTicker.Stop() resourceTicker.Stop() infoTicker.Stop() } diff --git a/pkg/nats/transport/nats.go b/pkg/nats/transport/nats.go index d72312b13c..8a03df6bfa 100644 --- a/pkg/nats/transport/nats.go +++ b/pkg/nats/transport/nats.go @@ -80,7 +80,7 @@ func (c *NATSTransportConfig) Validate() error { } type NATSTransport struct { - Config NATSTransportConfig + Config *NATSTransportConfig nodeID string natsServer *nats_helper.ServerManager natsClient *nats_helper.ClientManager @@ -93,7 +93,7 @@ type NATSTransport struct { //nolint:funlen func NewNATSTransport(ctx context.Context, - config NATSTransportConfig) (*NATSTransport, error) { + config *NATSTransportConfig) (*NATSTransport, error) { log.Debug().Msgf("Creating NATS transport with config: %+v", config) if err := config.Validate(); err != nil { return nil, fmt.Errorf("error validating nats transport config. %w", err) @@ -190,7 +190,7 @@ func NewNATSTransport(ctx context.Context, }, nil } -func CreateClient(ctx context.Context, config NATSTransportConfig) (*nats_helper.ClientManager, error) { +func CreateClient(ctx context.Context, config *NATSTransportConfig) (*nats_helper.ClientManager, error) { // create nats client log.Debug().Msgf("Creating NATS client with servers: %s", strings.Join(config.Orchestrators, ",")) clientOptions := []nats.Option{ diff --git a/pkg/node/compute.go b/pkg/node/compute.go index 55fefdc7e8..cb01ee81e9 100644 --- a/pkg/node/compute.go +++ b/pkg/node/compute.go @@ -20,6 +20,7 @@ import ( executor_util "github.com/bacalhau-project/bacalhau/pkg/executor/util" "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/node/heartbeat" "github.com/bacalhau-project/bacalhau/pkg/publicapi" compute_endpoint "github.com/bacalhau-project/bacalhau/pkg/publicapi/endpoint/compute" "github.com/bacalhau-project/bacalhau/pkg/publisher" @@ -58,6 +59,7 @@ func NewComputeNode( computeCallback compute.Callback, managementProxy compute.ManagementEndpoint, configuredLabels map[string]string, + heartbeatClient *heartbeat.HeartbeatClient, ) (*Compute, error) { executionStore := config.ExecutionStore @@ -236,7 +238,7 @@ func NewComputeNode( var managementClient *compute.ManagementClient // TODO: When we no longer use libP2P for management, we should remove this - // as the managementProxy will always be set. + // as the managementProxy will always be set for NATS if managementProxy != nil { // TODO: Make the registration lock folder a config option so that we have it // available and don't have to depend on getting the repo folder. @@ -247,13 +249,14 @@ func NewComputeNode( // Set up the management client which will attempt to register this node // with the requester node, and then if successful will send regular node // info updates. - managementClient = compute.NewManagementClient(compute.ManagementClientParams{ + managementClient = compute.NewManagementClient(&compute.ManagementClientParams{ NodeID: nodeID, LabelsProvider: labelsProvider, ManagementProxy: managementProxy, NodeInfoDecorator: nodeInfoDecorator, RegistrationFilePath: regFilename, ResourceTracker: runningCapacityTracker, + HeartbeatClient: heartbeatClient, }) if err := managementClient.RegisterNode(ctx); err != nil { return nil, fmt.Errorf("failed to register node with requester: %s", err) diff --git a/pkg/node/heartbeat/client.go b/pkg/node/heartbeat/client.go new file mode 100644 index 0000000000..b16a51a5ab --- /dev/null +++ b/pkg/node/heartbeat/client.go @@ -0,0 +1,47 @@ +package heartbeat + +import ( + "context" + + natsPubSub "github.com/bacalhau-project/bacalhau/pkg/nats/pubsub" + "github.com/bacalhau-project/bacalhau/pkg/pubsub" + "github.com/rs/zerolog/log" + + "github.com/nats-io/nats.go" +) + +type HeartbeatClient struct { + publisher *natsPubSub.PubSub[Heartbeat] + nodeID string +} + +func NewClient(conn *nats.Conn, nodeID string) (*HeartbeatClient, error) { + subParams := natsPubSub.PubSubParams{ + Subject: heartbeatTopic, + Conn: conn, + } + + publisher, err := natsPubSub.NewPubSub[Heartbeat](subParams) + if err != nil { + return nil, err + } + + return &HeartbeatClient{publisher: publisher, nodeID: nodeID}, nil +} + +func (h *HeartbeatClient) Start(ctx context.Context) error { + // Waits until we are cancelled and then closes the publisher + <-ctx.Done() + return h.publisher.Close(ctx) +} + +func (h *HeartbeatClient) SendHeartbeat(ctx context.Context, sequence uint64) error { + log.Ctx(ctx).Trace().Msgf("sending heartbeat seq: %d", sequence) + return h.Publish(ctx, Heartbeat{NodeID: h.nodeID, Sequence: sequence}) +} + +func (h *HeartbeatClient) Publish(ctx context.Context, message Heartbeat) error { + return h.publisher.Publish(ctx, message) +} + +var _ pubsub.Publisher[Heartbeat] = (*HeartbeatClient)(nil) diff --git a/pkg/node/heartbeat/server.go b/pkg/node/heartbeat/server.go new file mode 100644 index 0000000000..d10e9eb392 --- /dev/null +++ b/pkg/node/heartbeat/server.go @@ -0,0 +1,54 @@ +package heartbeat + +import ( + "context" + + natsPubSub "github.com/bacalhau-project/bacalhau/pkg/nats/pubsub" + "github.com/bacalhau-project/bacalhau/pkg/pubsub" + + "github.com/nats-io/nats.go" + "github.com/rs/zerolog/log" +) + +type HeartbeatServer struct { + subscription *natsPubSub.PubSub[Heartbeat] +} + +func NewServer(conn *nats.Conn) (*HeartbeatServer, error) { + subParams := natsPubSub.PubSubParams{ + Subject: heartbeatTopic, + Conn: conn, + } + + subscription, err := natsPubSub.NewPubSub[Heartbeat](subParams) + if err != nil { + return nil, err + } + + return &HeartbeatServer{subscription: subscription}, nil +} + +func (h *HeartbeatServer) Start(ctx context.Context) error { + if err := h.subscription.Subscribe(ctx, h); err != nil { + return err + } + + go func(ctx context.Context) { + log.Ctx(ctx).Info().Msg("Heartbeat server started") + <-ctx.Done() + _ = h.subscription.Close(ctx) + log.Ctx(ctx).Info().Msg("Heartbeat server shutdown") + }(ctx) + + return nil +} + +func (h *HeartbeatServer) Handle(ctx context.Context, message Heartbeat) error { + log.Ctx(ctx).Trace().Msgf("heartbeat received from %s", message.NodeID) + + // TODO: Process the heartbeat (e.g. update the node's last seen time) + + return nil +} + +var _ pubsub.Subscriber[Heartbeat] = (*HeartbeatServer)(nil) diff --git a/pkg/node/heartbeat/types.go b/pkg/node/heartbeat/types.go new file mode 100644 index 0000000000..6ded7a8234 --- /dev/null +++ b/pkg/node/heartbeat/types.go @@ -0,0 +1,13 @@ +package heartbeat + +const heartbeatTopic = "heartbeat" + +// Heartbeat represents a heartbeat message from a specific node. +// It contains the node ID and the sequence number of the heartbeat +// which is monotonically increasing (reboots aside). We do not +// use timestamps on the client, we rely solely on the server-side +// time to avoid clock drift issues. +type Heartbeat struct { + NodeID string + Sequence uint64 +} diff --git a/pkg/node/manager/node_manager.go b/pkg/node/manager/node_manager.go index 3d87f17a12..75c7389f72 100644 --- a/pkg/node/manager/node_manager.go +++ b/pkg/node/manager/node_manager.go @@ -8,6 +8,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/lib/concurrency" "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/models/requests" + "github.com/bacalhau-project/bacalhau/pkg/node/heartbeat" "github.com/bacalhau-project/bacalhau/pkg/routing" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" @@ -25,10 +26,12 @@ const ( type NodeManager struct { nodeInfo routing.NodeInfoStore resourceMap *concurrency.StripedMap[models.Resources] + heartbeats *heartbeat.HeartbeatServer } type NodeManagerParams struct { - NodeInfo routing.NodeInfoStore + NodeInfo routing.NodeInfoStore + Heartbeats *heartbeat.HeartbeatServer } // NewNodeManager constructs a new node manager and returns a pointer @@ -37,9 +40,24 @@ func NewNodeManager(params NodeManagerParams) *NodeManager { return &NodeManager{ resourceMap: concurrency.NewStripedMap[models.Resources](resourceMapLockCount), nodeInfo: params.NodeInfo, + heartbeats: params.Heartbeats, } } +func (n *NodeManager) Start(ctx context.Context) error { + log.Ctx(ctx).Info().Msg("Node manager started") + + if n.heartbeats != nil { + err := n.heartbeats.Start(ctx) + if err != nil { + log.Ctx(ctx).Error().Err(err).Msg("failed to start heartbeat server") + return err + } + } + + return nil +} + // // ---- Implementation of compute.ManagementEndpoint ---- // diff --git a/pkg/node/node.go b/pkg/node/node.go index de157e28d2..0756edd6e8 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -22,6 +22,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/models" nats_transport "github.com/bacalhau-project/bacalhau/pkg/nats/transport" + "github.com/bacalhau-project/bacalhau/pkg/node/heartbeat" "github.com/bacalhau-project/bacalhau/pkg/node/manager" "github.com/bacalhau-project/bacalhau/pkg/node/metrics" "github.com/bacalhau-project/bacalhau/pkg/publicapi" @@ -195,10 +196,13 @@ func NewNode( // node info store that is used for both discovering compute nodes, as to find addresses of other nodes for routing requests. + var natsConfig *nats_transport.NATSTransportConfig var transportLayer transport.TransportLayer var tracingInfoStore routing.NodeInfoStore + var heartbeatSvr *heartbeat.HeartbeatServer + if config.NetworkConfig.Type == models.NetworkTypeNATS { - natsConfig := nats_transport.NATSTransportConfig{ + natsConfig = &nats_transport.NATSTransportConfig{ NodeID: config.NodeID, Port: config.NetworkConfig.Port, AdvertisedAddress: config.NetworkConfig.AdvertisedAddress, @@ -234,6 +238,11 @@ func NewNode( } tracingInfoStore = tracing.NewNodeStore(nodeInfoStore) + heartbeatSvr, err = heartbeat.NewServer(natsClient.Client) + if err != nil { + return nil, errors.Wrap(err, "failed to create heartbeat server using NATS transport connection info") + } + // Once the KV store has been created, it can be offered to the transport layer to be used as a consumer // of node info. if err := transportLayer.RegisterNodeInfoConsumer(ctx, tracingInfoStore); err != nil { @@ -283,11 +292,20 @@ func NewNode( ) // Create a new node manager to keep track of compute nodes connecting - // to the network. + // to the network. Provide it with a mechanism to lookup (and enhance) + // node info, and a reference to the heartbeat server if running NATS. nodeManager := manager.NewNodeManager(manager.NodeManagerParams{ - NodeInfo: tracingInfoStore, + NodeInfo: tracingInfoStore, + Heartbeats: heartbeatSvr, }) + // Start the nodemanager, ensuring it doesn't block the main thread and + // that any errors are logged. If we are unable to start the manager + // then we should not start the node. + if err := nodeManager.Start(ctx); err != nil { + return nil, errors.Wrap(err, "failed to start node manager") + } + // NodeManager node wraps the node manager and implements the routing.NodeInfoStore // interface so that it can return nodes and add the most recent resource information // to the node info returned. When the libp2p transport is no longer necessary, we @@ -348,6 +366,24 @@ func NewNode( attribute.StringSlice("node_engines", executors.Keys(ctx)), ) + var hbClient *heartbeat.HeartbeatClient + + // We want to provide a heartbeat client to the compute node if we are using NATS. + // We can only create a heartbeat client if we have a NATS client, and we can + // only do that if the configuration is available. Whilst we support libp2p this + // is not always the case. + if natsConfig != nil { + natsClient, err := nats_transport.CreateClient(ctx, natsConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to create NATS client for node info store") + } + + hbClient, err = heartbeat.NewClient(natsClient.Client, config.NodeID) + if err != nil { + return nil, errors.Wrap(err, "failed to create heartbeat client") + } + } + // setup compute node computeNode, err = NewComputeNode( ctx, @@ -362,6 +398,7 @@ func NewNode( transportLayer.CallbackProxy(), transportLayer.ManagementProxy(), config.Labels, + hbClient, ) if err != nil { return nil, err diff --git a/pkg/test/compute/setup_test.go b/pkg/test/compute/setup_test.go index f5ecceae33..de3bf7ccfa 100644 --- a/pkg/test/compute/setup_test.go +++ b/pkg/test/compute/setup_test.go @@ -118,6 +118,7 @@ func (s *ComputeSuite) setupNode() { callback, nil, // until we switch to testing with NATS map[string]string{}, // empty configured labels + nil, // no heartbeat client ) s.NoError(err) s.stateResolver = *resolver.NewStateResolver(resolver.StateResolverParams{