Skip to content

Conversation

@zubron
Copy link
Contributor

@zubron zubron commented Jan 22, 2026

Change Description

Adds Prometheus metrics and structured logging for async task lifecycle (commit, merge, dump_refs, restore_refs, gc_prepare_commits). Async operations return immediately so existing middleware only captures initial request/response metrics - this change tracks the complete async lifecycle.

Implementation

  • Adds operation field to Task proto to explicitly identify the operation type
  • Uses simple span-based tracking: StartTaskSpan(log, task) returns a span, defer span.End() records completion
  • Leverages existing logger from call site (already has task_id, repository, audit context)
  • Falls back to "unknown" operation for legacy tasks without the field

Metrics

  • lakefs_async_operations_running (gauge): currently executing operations
  • lakefs_async_operations_total (counter): completed operations by status
  • lakefs_async_operation_duration_seconds (histogram): operation duration

Status labels distinguish outcomes:

  • success: completed successfully
  • failure: completed with error
  • expired: completed but exceeded client-facing deadline
  • orphaned: stopped heartbeat, cleaned up by background job

Testing Details

Unit tests and manual testing locally

Closes #10047 / treeverse/lakeFS-Enterprise#1298

@zubron zubron added the exclude-changelog PR description should not be included in next release changelog label Jan 22, 2026
@zubron zubron force-pushed the task/add-metrics-for-async-commit-merge branch from 0768bea to 30ece9b Compare January 22, 2026 14:12
@github-actions github-actions bot added area/cataloger Improvements or additions to the cataloger area/testing Improvements or additions to tests labels Jan 22, 2026
@zubron zubron marked this pull request as draft January 22, 2026 14:13
@zubron zubron force-pushed the task/add-metrics-for-async-commit-merge branch 2 times, most recently from 8a90f6e to 2b314b3 Compare January 24, 2026 19:56
Introduce TaskObserver interface to enable monitoring of async task
lifecycle events (submit, start, complete, expire). This provides a
hook point for Enterprise to collect metrics without modifying core
async operation logic. OSS uses a no-op implementation; Enterprise
can override via BuildTaskObserver factory.
@zubron zubron force-pushed the task/add-metrics-for-async-commit-merge branch from 2b314b3 to e00483f Compare January 24, 2026 20:07
@zubron zubron requested a review from a team January 24, 2026 20:15
@zubron zubron marked this pull request as ready for review January 24, 2026 20:15
@Annaseli Annaseli self-requested a review January 26, 2026 09:52
Copy link
Contributor

@itaigilo itaigilo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zubron for handling this -
I assume that every change that also involves the Enterprise repo might be tricky,
And this one is pretty nice.

Some comments, mainly about the interface.

Comment on lines 18 to 19
// BuildTaskObserver returns the task observer for async operation lifecycle events.
// OSS returns a no-op observer. Enterprise overrides this to provide metrics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// BuildTaskObserver returns the task observer for async operation lifecycle events.
// OSS returns a no-op observer. Enterprise overrides this to provide metrics.
// BuildTaskObserver returns a no-op task observer for Tasks lifecycle events.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not specific to async ops, but every Task created by the catalog.
Plus, I'd avoid mentioning Enterprise, since it's pretty much implicit here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a discussion with @guy-har earlier, I ended up moving all the metrics into this change so there is no need for the factory function here. Removed.

}

// Notify observer that execution has begun
notifyObserver(taskID, func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to notify the observer as part of UpdateTaskStatus(),
And let the observer decide about the logic?

func (c *Catalog) executeTaskSteps(ctx context.Context, log logging.Logger, repository *graveler.RepositoryRecord, taskID string, task *Task, taskStatus protoreflect.ProtoMessage, steps []TaskStep) {
// Mark task as started
task.UpdatedAt = timestamppb.Now()
if err := UpdateTaskStatus(ctx, c.KVStore, repository, taskID, taskStatus); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is risky, because it's not only about adding metrics, but also adds a task update.

Also - why this marks a task as "started"? Isn't it the same call as in line 2329?

if err := UpdateTaskStatus(ctx, c.KVStore, repository, taskID, taskStatus); err != nil {
log.WithError(err).Error("Catalog failed to update task status")
}
// Notify observer of completion (success with no steps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that such comment are both useful for agents and agents like to add them,
But we should decide -
Either add these comments to all the code in a file, or not to add them at all.
Having these comments sporadically makes me think that there's something worth noting there, in these cases of self-explanatory code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I had gone through and removed these self-explanatory comments on the other change but not this one. Thanks for pointing it out. Definitely something to think about moving forward, and should part of the agent context discussion 👍

// and the provided expiry duration. If expired, marks the task as done with timeout error
// and notifies the observer. The observer is notified exactly once when the task
// transitions to expired state (already-done tasks are skipped).
func checkAndMarkTaskExpired(statusMsg protoreflect.ProtoMessage, expiryDuration time.Duration, observer TaskObserver) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to make this function a "member" of Catalog and use c.observer instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out - turns out I misunderstood this function and realised that it doesn't actually persist the expiry to the KV. This is called every time the commit/merge status is queried so we can't make any guarantee about only counting an expired task once. I ended up moving the expiry notification elsewhere. The change to this function has been reverted.

// and the provided expiry duration. If expired, marks the task as done with timeout error
// and notifies the observer. The observer is notified exactly once when the task
// transitions to expired state (already-done tasks are skipped).
func checkAndMarkTaskExpired(statusMsg protoreflect.ProtoMessage, expiryDuration time.Duration, observer TaskObserver) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, notifying the observer is detailed in the comment but not in the func name.
Should it be reflected in the func name?

// TaskObserver receives notifications about task lifecycle events.
// Implementations can use these for metrics, logging, or audit trails.
// All callbacks are synchronous and should be fast.
type TaskObserver interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I believe that a single OnTaskUpdated() method would be cleaner and more agnostic, for future use.

Keep all task metric collection in this repository. Remove the use of
the factory function to create the observer and instead use the Async
metrics observer.

Move the expiry notification from the heartbeat check to cleanup of
expired tasks. Expiry status isn't persisted in the KV so it would be
counted during every task status check.
@zubron zubron changed the title Add TaskObserver interface for async events Add metrics for async commit and merge operations Jan 26, 2026
@zubron
Copy link
Contributor Author

zubron commented Jan 26, 2026

Thanks for the review, @itaigilo! Really helpful comments 👍

Re: your suggestions about using a single OnTaskUpdated function and notifying from UpdateTaskStatus - I see where you're coming from! Let me explain why I chose the above approach:

UpdateTaskStatus is a simple write function that persists task state to KV without lifecycle knowledge. Adding observer notifications there would muddy its responsibility (it becomes both persistence layer and event dispatcher) and require lifecycle inference - it doesn't know if this write is "submitted", "started", "completed", or just a heartbeat. The caller has that context.

With a single OnTaskUpdated(taskID, task) method, every observer implementation would need to track previous state internally, diff new state against old, and infer the lifecycle phase (start? completion? failure?). This pushes complexity into every observer rather than keeping it at the call site where the lifecycle phase is already known. The current interface is more explicit and I believe less error-prone.

Open to discussing more though if you have other concerns!

@zubron zubron requested a review from itaigilo January 26, 2026 23:13
@itaigilo
Copy link
Contributor

Thanks for the review, @itaigilo! Really helpful comments 👍

Re: your suggestions about using a single OnTaskUpdated function and notifying from UpdateTaskStatus - I see where you're coming from! Let me explain why I chose the above approach:

UpdateTaskStatus is a simple write function that persists task state to KV without lifecycle knowledge. Adding observer notifications there would muddy its responsibility (it becomes both persistence layer and event dispatcher) and require lifecycle inference - it doesn't know if this write is "submitted", "started", "completed", or just a heartbeat. The caller has that context.

With a single OnTaskUpdated(taskID, task) method, every observer implementation would need to track previous state internally, diff new state against old, and infer the lifecycle phase (start? completion? failure?). This pushes complexity into every observer rather than keeping it at the call site where the lifecycle phase is already known. The current interface is more explicit and I believe less error-prone.

Open to discussing more though if you have other concerns!

Well, my main point was not setting the task's lifecycle in the notifier, but have the observer decide about the lifecycle based on the task's content. This is more agnostic and more reusable for future use-cases. I guess is that the goal is to keep the OSS as lean as possible in such cases, and expose very little in these interfaces.

You are right about actually not overloading UpdateTaskStatus() with extra responsibilities, but how about wrapping it with UpdateTaskStatusAndNotify(), to prevent repetition?

}

var (
asyncOperationsPending = promauto.NewGaugeVec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why these appear now both here and on Enterprise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with @guy-har, he recommended moving the metric definitions into this repo. The Enterprise PR has been closed.

@zubron
Copy link
Contributor Author

zubron commented Jan 27, 2026

Well, my main point was not setting the task's lifecycle in the notifier, but have the observer decide about the lifecycle based on the task's content. This is more agnostic and more reusable for future use-cases. I guess is that the goal is to keep the OSS as lean as possible in such cases, and expose very little in these interfaces.

Ah - I see what you mean. Sorry, I think I misunderstood your previous comment! I'll add that change 👍

You are right about actually not overloading UpdateTaskStatus() with extra responsibilities, but how about wrapping it with UpdateTaskStatusAndNotify(), to prevent repetition?

Sounds good!

I think there are some broader changes to make here after your comments and comments from @guy-har so I'm going to put this PR into draft for now. I'll re-request review when it's ready.

@zubron zubron marked this pull request as draft January 27, 2026 19:36
Copy link
Contributor

@guy-har guy-har left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting here what we talked F2F.

I think the observer here is implemented nicely, but I believe that in our case which is adding metrics, the observer is much more than required and adds a bit of complexity here.
IMO if we decide to have metrics for the AsyncOperator with labels for the specific operations we will:

  1. Reduce the need for the observer
  2. Have metrics for all our async operations (e.g refs dump)

should look something like

var taskDurationHistograms = promauto.NewHistogramVec(
	prometheus.HistogramOpts{
		Name: "...",
		Help: "...",
	},
	[]string{"task_type", "success", "status_code"})

// executeTaskSteps runs each step sequentially, updating task status after each step.
func (c *Catalog) executeTaskSteps(ctx context.Context, log logging.Logger, repository *graveler.RepositoryRecord, taskID string, task *Task, taskStatus protoreflect.ProtoMessage, steps []TaskStep) {
	// Mark task as started
	start := time.Now()
	taskName := "willknowthis"
	defer taskDurationHistograms.
		WithLabelValues(taskName, strconv.FormatBool(task.ErrorMsg == ""), strconv.Itoa(int(task.StatusCode))).
		Observe(time.Since(start).Seconds())[7:12 PM]Add a metric (in this example taskDurationHistogramsAt the beginning of executeTaskSteps defer a call to the metric with Observe

@zubron zubron force-pushed the task/add-metrics-for-async-commit-merge branch 2 times, most recently from fbf8cbf to 23a2b0a Compare January 29, 2026 21:26
Introduces TaskMonitor to track async task lifecycle (commit, merge,
dump_refs, restore_refs, gc_prepare_commits) with Prometheus metrics
and structured logging.

Metrics:
- lakefs_async_operations_running (gauge): currently executing operations
- lakefs_async_operations_total (counter): completed operations by status
- lakefs_async_operation_duration_seconds (histogram): operation duration

Status labels distinguish outcomes:
- success: completed successfully
- failure: completed with error
- expired: completed but exceeded client-facing deadline
- orphaned: stopped heartbeating, cleaned up by background job
@zubron zubron force-pushed the task/add-metrics-for-async-commit-merge branch from 23a2b0a to 136fe73 Compare January 29, 2026 22:13
@zubron zubron marked this pull request as ready for review January 29, 2026 22:13
@zubron zubron requested review from guy-har and itaigilo January 29, 2026 22:13
Copy link
Contributor

@itaigilo itaigilo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zubron , this implementation looks much cleaner.

Overall looks good,
Adding some small comments,
And leaving the approval for others with a bit more context.

return
}

asyncOperationsRunning.WithLabelValues(ts.operation).Dec()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If End() is accidentally called twice, the gauge decrements twice but was only incremented once, causing negative values. Consider:

type TaskSpan struct {
    // ...
    ended bool  // or use sync/atomic
}

And:

func (ts *TaskSpan) End() {
    if ts.operation == "" || ts.task == nil || ts.ended {
        return
    }
    ts.ended = true
    // ...
}

status := statusSuccess
if ts.task.StatusCode == http.StatusRequestTimeout {
status = statusExpired
} else if ts.task.ErrorMsg != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggesting to prioritize error over expired -
Not realistic in the current flow, but this aligns with the way we write go code.

Comment on lines 19 to 20
commitAsyncTaskIDPrefix = "CA"
mergeAsyncTaskIDPrefix = "MA"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tasks are async anyway. I suggest "CMT" and "MRG" as prefixes.

@itaigilo itaigilo dismissed their stale review January 30, 2026 20:05

Not blocking, letting other with more context to approve.

Copy link
Contributor

@guy-har guy-har left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great change!
Thank you!

Requesting changes mainly due to the taskIDToOperation, I prefer we don't conclude it from the prefix.

Comment on lines 191 to 207
func (m *TaskMonitor) taskIDToOperation(taskID string) string {
switch {
case strings.HasPrefix(taskID, commitAsyncTaskIDPrefix):
return opCommit
case strings.HasPrefix(taskID, mergeAsyncTaskIDPrefix):
return opMerge
case strings.HasPrefix(taskID, DumpRefsTaskIDPrefix):
return opDumpRefs
case strings.HasPrefix(taskID, RestoreRefsTaskIDPrefix):
return opRestoreRefs
case strings.HasPrefix(taskID, GarbageCollectionPrepareCommitsPrefix):
return opGCPrepareCommits
default:
m.logger.WithField(taskIDFieldKey, taskID).Debug("Unknown task ID prefix, skipping metrics")
return ""
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concluding the operation from the prefix requires a maintenance overhead of:

  • Updating this function for each new task type.
  • Inserting external information, such as the Async commit prefix

I suggest adding the operation type to the Protobuff of the task and each task will be submitted with his type and that will be used as the metric operation

Comment on lines 2311 to 2312
span := c.taskMonitor.StartSpan(ctx, task, string(repository.RepositoryID))
defer span.End() // reads status from task.StatusCode/ErrorMsg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we choose the task monitor to be a member of the catalog?
I think it would be cleaner to call the metric...observer directly from the catalog. We can have a helper function if we don't want to do it directly. But I'm not sure I understand why this is a catalog member

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, it does not need to be a member of the catalog. I've updated it so it's just calling the start and end functions directly with the available logger.

Comment on lines 180 to 186
if ctx != nil {
if user, err := auth.GetUser(ctx); err == nil && user != nil {
fields[userIDFieldKey] = user.Username
}
if reqID := httputil.RequestIDFromContext(ctx); reqID != nil {
fields[logging.RequestIDFieldKey] = *reqID
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this happen?

Comment on lines 74 to 75
// NewTaskMonitor creates a new TaskMonitor with the given logger and audit log level.
func NewTaskMonitor(logger logging.Logger, auditLogLevel string, isAdvancedAuth bool) *TaskMonitor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in charge of both, metrics and logs, if we choose to leave it, this should be documented somehow

@guy-har
Copy link
Contributor

guy-har commented Feb 1, 2026

@zubron, can we replace the title to be more generic, Add metrics for Async operations
If we stay with the audit logs maybe we should mention that as well

@zubron zubron changed the title Add metrics for async commit and merge operations Add metrics for async operations Feb 2, 2026
@zubron zubron changed the title Add metrics for async operations Add metrics and audit logging for async operations Feb 2, 2026
- Add 'operation' field to Task proto to explicitly identify operation
  type instead of inferring from task ID prefix
- Replace TaskMonitor struct with stateless functions (StartTaskSpan,
  RecordOrphanedTask) that take logger as parameter
- Remove TaskMonitor from Catalog config - use logger from call site
- Add double-call protection to TaskSpan.End()
- Reorder RunBackgroundTaskSteps params: (ctx, repo, operation, taskID, ...)
- Rename task_monitor.go to task_observability.go
@zubron zubron requested a review from guy-har February 2, 2026 21:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/cataloger Improvements or additions to the cataloger area/testing Improvements or additions to tests exclude-changelog PR description should not be included in next release changelog

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add metrics for async commit and merge

4 participants