-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Node heartbeats #3709
Node heartbeats #3709
Conversation
Important Auto Review SkippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the To trigger a single review, invoke the TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few drive by comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good stuff!
Left a few comment's I'd like addressed/discussed before merging.
0ec0bfc
to
6bf5028
Compare
Adds pkg/node/heartbeat which contains a client and a server for sending heartbeat messages over NATS PubSub.
Enqueues, or re-enqueues heartbeats from nodes as they arrive into a priority queue.
Also enables the use of ordering and filtering of the status field (nodeinfo.State). This means that the approval field is no longer labelled as Status, but as Approval.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but i think we should go with just a binary classification
cmd/cli/node/list.go
Outdated
@@ -16,15 +16,17 @@ import ( | |||
|
|||
var defaultColumnGroups = []string{"labels", "capacity"} | |||
var orderByFields = []string{"id", "type", "available_cpu", "available_memory", "available_disk", "available_gpu", "status"} | |||
var filterStatusValues = []string{"approved", "pending", "rejected"} | |||
var filterApprovalValues = []string{"approved", "pending", "rejected"} | |||
var filterStatusValues = []string{"healthy", "unhealthy", "unknown"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it should be binary - "connected" or "unconnected" (we could add something more sophisticated like "unhealthy" when we have more health checks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for the filters, or for thenode list
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the node list as well - "healthy" and "unhealthy" imply something i don't think we know. We only know if it's connected or disconnected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went with Connected/Disconnected for now, discussed with Walid and will add another duration in future to mark the point beyond which we don't think the node is coming back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, once comments are address
nodeCmd.Flags().StringVar(&o.FilterByStatus, "filter-status", o.FilterByStatus, | ||
fmt.Sprintf("Filter nodes by status. One of: %q", filterStatusValues)) | ||
|
||
return nodeCmd | ||
} | ||
|
||
// Run executes node command | ||
func (o *ListOptions) run(cmd *cobra.Command, _ []string) { | ||
func (o *ListOptions) run(cmd *cobra.Command, _ []string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-blocking: if we are going to return an error from this method can we replace all the util.Fatal
calls while we are at it? (be wary this might break some tests that (annoyingly) override the util.Fatal
field 😞)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can definitely add a ticket to address this, yeah. There's a mix of approaches atm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added #3764
cmd/cli/node/list.go
Outdated
return fmt.Errorf("cannot use '%s' as filter approval value, should be one of: %q", o.FilterByApproval, filterApprovalValues) | ||
} | ||
} | ||
|
||
if o.FilterByStatus != "" { | ||
if !slices.Contains(filterStatusValues, o.FilterByStatus) { | ||
util.Fatal(cmd, fmt.Errorf("cannot use '%s' as filter status value, should be one of: %q", o.FilterByStatus, filterStatusValues), 1) | ||
return fmt.Errorf("cannot use '%s' as filter status value, should be one of: %q", o.FilterByStatus, filterStatusValues) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider printing the specific flag name in the error message e.g. filter-status
and filter-approval
|
||
# Command: `node approve` | ||
|
||
The `bacalhau node approve` command offers administrations the ability to approve the cluster membership for a node using its unique identifier. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
command offers administrations administrators?
unique identifier being synonymous with name, right?
|
||
- `-m message`: | ||
|
||
- A message to be attached to the approval action. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes me wonder if we should also include the ClientID of the user issuing the approval, but that could always be included later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think at the moment something being audited just means it was logged somewhere, and so the client id should be in the request.
sidebar_label: delete | ||
--- | ||
|
||
# Command: `node approve` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
node delete
pkg/models/node_state.go
Outdated
// To add a new state, | ||
// * add it to the end of the list in the const below | ||
// * add it to strLivenessArray and typeLivenessMap | ||
// * add it to the livenessContainer and corresponding NodeStates var. | ||
// * add it to the All() method in the livenessContainer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<3
@@ -124,36 +127,62 @@ func (m *ManagementClient) updateResources(ctx context.Context) { | |||
Resources: resources, | |||
}) | |||
if err != nil { | |||
log.Ctx(ctx).Error().Err(err).Msg("failed to send resource update to requester node") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I am thinking about this. Above on line 103 where we call getNodeInfo
we do:
nodeInfo := m.getNodeInfo(ctx)
response, err := m.managementProxy.UpdateInfo(ctx, requests.UpdateInfoRequest{
Info: nodeInfo,
})
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to send update info to requester node")
}
// TODO response will be nil when err != nil and this code will panic (has panicked for me)
if response.Accepted {
log.Ctx(ctx).Debug().Msg("update info accepted")
} else {
log.Ctx(ctx).Error().Msgf("update info rejected: %s", response.Reason)
}
Can we address the panic here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I intended this to return after logging the error. Fixed in a96c1b0
pkg/node/heartbeat/server.go
Outdated
|
||
go func(ctx context.Context) { | ||
defer func() { | ||
_ = h.subscription.Close(ctx) // We're closing down, not much we can do with an error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can log it on the line below
pkg/node/manager/node_manager.go
Outdated
} | ||
} | ||
|
||
func (n *NodeManager) Start(ctx context.Context) error { | ||
log.Ctx(ctx).Info().Msg("Node manager started") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log this after the thing starts, it technically hasn't started yet, and it may not start if n.heartbeats
is nil
(why do we have this conditional?)
func (n *NodeManager) Start(ctx context.Context) error { | ||
log.Ctx(ctx).Info().Msg("Node manager started") | ||
|
||
if n.heartbeats != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when can this condition evaluate to false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we're running some tests.
Implements heartbeats for compute nodes, sending heartbeat messages to the requester node over NATS PubSub. The server, upon receiving a heartbeat updates the map of nodes to include the current server-side timestamp. Compute nodes using the heartbeat client, will continuously send heartbeat messages every n seconds. The heartbeat server receiving these heartbeats maintains a priority queue, which dequeues oldest items (lowest timestamp) first. Every 5 seconds any item older than a specific timestamp is dequeued, and its state either set to unhealthy (if it is the first missed heartbeat) or unknown if it is the second. The default for timestamps is * 30s since heartbeat - unhealthy * 60s since heartbeat - unknown (node may be live but disconnected) The next heartbeat sent by a unhealthy of unknown node will make it healthy again and ready to receive work. The current state of the node is added to the nodeinfo during a Get/GetByPrefix/List call to the node info store. This means that the liveness is dynamic and not persisted to the kvstore for node info.
Implements heartbeats for compute nodes, sending heartbeat messages to the requester node over NATS PubSub. The server, upon receiving a heartbeat updates the map of nodes to include the current server-side timestamp.
Compute nodes using the heartbeat client, will continuously send heartbeat messages every n seconds.
The heartbeat server receiving these heartbeats maintains a priority queue, which dequeues oldest items (lowest timestamp) first. Every 5 seconds any item older than a specific timestamp is dequeued, and its state either set to unhealthy (if it is the first missed heartbeat) or unknown if it is the second. The default for timestamps is
The next heartbeat sent by a unhealthy of unknown node will make it healthy again and ready to receive work.
The current state of the node is added to the nodeinfo during a Get/GetByPrefix/List call to the node info store. This means that the liveness is dynamic and not persisted to the kvstore for node info.