-
Notifications
You must be signed in to change notification settings - Fork 427
Add metrics and audit logging for async operations #10033
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
0768bea to
30ece9b
Compare
8a90f6e to
2b314b3
Compare
Introduce TaskObserver interface to enable monitoring of async task lifecycle events (submit, start, complete, expire). This provides a hook point for Enterprise to collect metrics without modifying core async operation logic. OSS uses a no-op implementation; Enterprise can override via BuildTaskObserver factory.
2b314b3 to
e00483f
Compare
itaigilo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zubron for handling this -
I assume that every change that also involves the Enterprise repo might be tricky,
And this one is pretty nice.
Some comments, mainly about the interface.
modules/catalog/factory/build.go
Outdated
| // BuildTaskObserver returns the task observer for async operation lifecycle events. | ||
| // OSS returns a no-op observer. Enterprise overrides this to provide metrics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // BuildTaskObserver returns the task observer for async operation lifecycle events. | |
| // OSS returns a no-op observer. Enterprise overrides this to provide metrics. | |
| // BuildTaskObserver returns a no-op task observer for Tasks lifecycle events. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not specific to async ops, but every Task created by the catalog.
Plus, I'd avoid mentioning Enterprise, since it's pretty much implicit here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a discussion with @guy-har earlier, I ended up moving all the metrics into this change so there is no need for the factory function here. Removed.
pkg/catalog/catalog.go
Outdated
| } | ||
|
|
||
| // Notify observer that execution has begun | ||
| notifyObserver(taskID, func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not to notify the observer as part of UpdateTaskStatus(),
And let the observer decide about the logic?
pkg/catalog/catalog.go
Outdated
| func (c *Catalog) executeTaskSteps(ctx context.Context, log logging.Logger, repository *graveler.RepositoryRecord, taskID string, task *Task, taskStatus protoreflect.ProtoMessage, steps []TaskStep) { | ||
| // Mark task as started | ||
| task.UpdatedAt = timestamppb.Now() | ||
| if err := UpdateTaskStatus(ctx, c.KVStore, repository, taskID, taskStatus); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is risky, because it's not only about adding metrics, but also adds a task update.
Also - why this marks a task as "started"? Isn't it the same call as in line 2329?
pkg/catalog/catalog.go
Outdated
| if err := UpdateTaskStatus(ctx, c.KVStore, repository, taskID, taskStatus); err != nil { | ||
| log.WithError(err).Error("Catalog failed to update task status") | ||
| } | ||
| // Notify observer of completion (success with no steps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that such comment are both useful for agents and agents like to add them,
But we should decide -
Either add these comments to all the code in a file, or not to add them at all.
Having these comments sporadically makes me think that there's something worth noting there, in these cases of self-explanatory code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I had gone through and removed these self-explanatory comments on the other change but not this one. Thanks for pointing it out. Definitely something to think about moving forward, and should part of the agent context discussion 👍
pkg/catalog/catalog.go
Outdated
| // and the provided expiry duration. If expired, marks the task as done with timeout error | ||
| // and notifies the observer. The observer is notified exactly once when the task | ||
| // transitions to expired state (already-done tasks are skipped). | ||
| func checkAndMarkTaskExpired(statusMsg protoreflect.ProtoMessage, expiryDuration time.Duration, observer TaskObserver) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to make this function a "member" of Catalog and use c.observer instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out - turns out I misunderstood this function and realised that it doesn't actually persist the expiry to the KV. This is called every time the commit/merge status is queried so we can't make any guarantee about only counting an expired task once. I ended up moving the expiry notification elsewhere. The change to this function has been reverted.
pkg/catalog/catalog.go
Outdated
| // and the provided expiry duration. If expired, marks the task as done with timeout error | ||
| // and notifies the observer. The observer is notified exactly once when the task | ||
| // transitions to expired state (already-done tasks are skipped). | ||
| func checkAndMarkTaskExpired(statusMsg protoreflect.ProtoMessage, expiryDuration time.Duration, observer TaskObserver) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, notifying the observer is detailed in the comment but not in the func name.
Should it be reflected in the func name?
pkg/catalog/task_observer.go
Outdated
| // TaskObserver receives notifications about task lifecycle events. | ||
| // Implementations can use these for metrics, logging, or audit trails. | ||
| // All callbacks are synchronous and should be fast. | ||
| type TaskObserver interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, I believe that a single OnTaskUpdated() method would be cleaner and more agnostic, for future use.
Keep all task metric collection in this repository. Remove the use of the factory function to create the observer and instead use the Async metrics observer. Move the expiry notification from the heartbeat check to cleanup of expired tasks. Expiry status isn't persisted in the KV so it would be counted during every task status check.
|
Thanks for the review, @itaigilo! Really helpful comments 👍 Re: your suggestions about using a single UpdateTaskStatus is a simple write function that persists task state to KV without lifecycle knowledge. Adding observer notifications there would muddy its responsibility (it becomes both persistence layer and event dispatcher) and require lifecycle inference - it doesn't know if this write is "submitted", "started", "completed", or just a heartbeat. The caller has that context. With a single OnTaskUpdated(taskID, task) method, every observer implementation would need to track previous state internally, diff new state against old, and infer the lifecycle phase (start? completion? failure?). This pushes complexity into every observer rather than keeping it at the call site where the lifecycle phase is already known. The current interface is more explicit and I believe less error-prone. Open to discussing more though if you have other concerns! |
Well, my main point was not setting the task's lifecycle in the notifier, but have the observer decide about the lifecycle based on the task's content. This is more agnostic and more reusable for future use-cases. I guess is that the goal is to keep the OSS as lean as possible in such cases, and expose very little in these interfaces. You are right about actually not overloading |
pkg/catalog/async_metrics.go
Outdated
| } | ||
|
|
||
| var ( | ||
| asyncOperationsPending = promauto.NewGaugeVec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why these appear now both here and on Enterprise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussing with @guy-har, he recommended moving the metric definitions into this repo. The Enterprise PR has been closed.
Ah - I see what you mean. Sorry, I think I misunderstood your previous comment! I'll add that change 👍
Sounds good! I think there are some broader changes to make here after your comments and comments from @guy-har so I'm going to put this PR into draft for now. I'll re-request review when it's ready. |
guy-har
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commenting here what we talked F2F.
I think the observer here is implemented nicely, but I believe that in our case which is adding metrics, the observer is much more than required and adds a bit of complexity here.
IMO if we decide to have metrics for the AsyncOperator with labels for the specific operations we will:
- Reduce the need for the observer
- Have metrics for all our async operations (e.g refs dump)
should look something like
var taskDurationHistograms = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "...",
Help: "...",
},
[]string{"task_type", "success", "status_code"})
// executeTaskSteps runs each step sequentially, updating task status after each step.
func (c *Catalog) executeTaskSteps(ctx context.Context, log logging.Logger, repository *graveler.RepositoryRecord, taskID string, task *Task, taskStatus protoreflect.ProtoMessage, steps []TaskStep) {
// Mark task as started
start := time.Now()
taskName := "willknowthis"
defer taskDurationHistograms.
WithLabelValues(taskName, strconv.FormatBool(task.ErrorMsg == ""), strconv.Itoa(int(task.StatusCode))).
Observe(time.Since(start).Seconds())[7:12 PM]Add a metric (in this example taskDurationHistogramsAt the beginning of executeTaskSteps defer a call to the metric with Observe
fbf8cbf to
23a2b0a
Compare
Introduces TaskMonitor to track async task lifecycle (commit, merge, dump_refs, restore_refs, gc_prepare_commits) with Prometheus metrics and structured logging. Metrics: - lakefs_async_operations_running (gauge): currently executing operations - lakefs_async_operations_total (counter): completed operations by status - lakefs_async_operation_duration_seconds (histogram): operation duration Status labels distinguish outcomes: - success: completed successfully - failure: completed with error - expired: completed but exceeded client-facing deadline - orphaned: stopped heartbeating, cleaned up by background job
23a2b0a to
136fe73
Compare
itaigilo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zubron , this implementation looks much cleaner.
Overall looks good,
Adding some small comments,
And leaving the approval for others with a bit more context.
pkg/catalog/task_monitor.go
Outdated
| return | ||
| } | ||
|
|
||
| asyncOperationsRunning.WithLabelValues(ts.operation).Dec() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If End() is accidentally called twice, the gauge decrements twice but was only incremented once, causing negative values. Consider:
type TaskSpan struct {
// ...
ended bool // or use sync/atomic
}
And:
func (ts *TaskSpan) End() {
if ts.operation == "" || ts.task == nil || ts.ended {
return
}
ts.ended = true
// ...
}
pkg/catalog/task_monitor.go
Outdated
| status := statusSuccess | ||
| if ts.task.StatusCode == http.StatusRequestTimeout { | ||
| status = statusExpired | ||
| } else if ts.task.ErrorMsg != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggesting to prioritize error over expired -
Not realistic in the current flow, but this aligns with the way we write go code.
pkg/catalog/task_monitor.go
Outdated
| commitAsyncTaskIDPrefix = "CA" | ||
| mergeAsyncTaskIDPrefix = "MA" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tasks are async anyway. I suggest "CMT" and "MRG" as prefixes.
Not blocking, letting other with more context to approve.
guy-har
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great change!
Thank you!
Requesting changes mainly due to the taskIDToOperation, I prefer we don't conclude it from the prefix.
pkg/catalog/task_monitor.go
Outdated
| func (m *TaskMonitor) taskIDToOperation(taskID string) string { | ||
| switch { | ||
| case strings.HasPrefix(taskID, commitAsyncTaskIDPrefix): | ||
| return opCommit | ||
| case strings.HasPrefix(taskID, mergeAsyncTaskIDPrefix): | ||
| return opMerge | ||
| case strings.HasPrefix(taskID, DumpRefsTaskIDPrefix): | ||
| return opDumpRefs | ||
| case strings.HasPrefix(taskID, RestoreRefsTaskIDPrefix): | ||
| return opRestoreRefs | ||
| case strings.HasPrefix(taskID, GarbageCollectionPrepareCommitsPrefix): | ||
| return opGCPrepareCommits | ||
| default: | ||
| m.logger.WithField(taskIDFieldKey, taskID).Debug("Unknown task ID prefix, skipping metrics") | ||
| return "" | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concluding the operation from the prefix requires a maintenance overhead of:
- Updating this function for each new task type.
- Inserting external information, such as the Async commit prefix
I suggest adding the operation type to the Protobuff of the task and each task will be submitted with his type and that will be used as the metric operation
pkg/catalog/catalog.go
Outdated
| span := c.taskMonitor.StartSpan(ctx, task, string(repository.RepositoryID)) | ||
| defer span.End() // reads status from task.StatusCode/ErrorMsg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we choose the task monitor to be a member of the catalog?
I think it would be cleaner to call the metric...observer directly from the catalog. We can have a helper function if we don't want to do it directly. But I'm not sure I understand why this is a catalog member
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, it does not need to be a member of the catalog. I've updated it so it's just calling the start and end functions directly with the available logger.
pkg/catalog/task_monitor.go
Outdated
| if ctx != nil { | ||
| if user, err := auth.GetUser(ctx); err == nil && user != nil { | ||
| fields[userIDFieldKey] = user.Username | ||
| } | ||
| if reqID := httputil.RequestIDFromContext(ctx); reqID != nil { | ||
| fields[logging.RequestIDFieldKey] = *reqID | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can this happen?
pkg/catalog/task_monitor.go
Outdated
| // NewTaskMonitor creates a new TaskMonitor with the given logger and audit log level. | ||
| func NewTaskMonitor(logger logging.Logger, auditLogLevel string, isAdvancedAuth bool) *TaskMonitor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in charge of both, metrics and logs, if we choose to leave it, this should be documented somehow
|
@zubron, can we replace the title to be more generic, Add metrics for Async operations |
- Add 'operation' field to Task proto to explicitly identify operation type instead of inferring from task ID prefix - Replace TaskMonitor struct with stateless functions (StartTaskSpan, RecordOrphanedTask) that take logger as parameter - Remove TaskMonitor from Catalog config - use logger from call site - Add double-call protection to TaskSpan.End() - Reorder RunBackgroundTaskSteps params: (ctx, repo, operation, taskID, ...) - Rename task_monitor.go to task_observability.go
Change Description
Adds Prometheus metrics and structured logging for async task lifecycle (commit, merge, dump_refs, restore_refs, gc_prepare_commits). Async operations return immediately so existing middleware only captures initial request/response metrics - this change tracks the complete async lifecycle.
Implementation
operationfield to Task proto to explicitly identify the operation typeStartTaskSpan(log, task)returns a span,defer span.End()records completionMetrics
lakefs_async_operations_running(gauge): currently executing operationslakefs_async_operations_total(counter): completed operations by statuslakefs_async_operation_duration_seconds(histogram): operation durationStatus labels distinguish outcomes:
success: completed successfullyfailure: completed with errorexpired: completed but exceeded client-facing deadlineorphaned: stopped heartbeat, cleaned up by background jobTesting Details
Unit tests and manual testing locally
Closes #10047 / treeverse/lakeFS-Enterprise#1298