Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Add RowTrackingBackfillCommand #3449

Merged

Conversation

longvu-db
Copy link
Contributor

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Adding the RowTrackingBackfillCommand, the ability to assign row IDs to table rows after the table creation.

How was this patch tested?

Added UTs.

Does this PR introduce any user-facing changes?

No.

Comment on lines +55 to +69
def recordBackfillBatchStats(txnId: String, wasSuccessful: Boolean): Unit = {
if (wasSuccessful) {
numSuccessfulBatch.incrementAndGet()
} else {
numFailedBatch.incrementAndGet()
}
val totalExecutionTimeInMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)
val batchStats = BackfillBatchStats(
origTxn.txnId, txnId, batchId, filesInBatch.size, totalExecutionTimeInMs, wasSuccessful)
recordDeltaEvent(
origTxn.deltaLog,
opType = backfillBatchStatsOpType,
data = batchStats
)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: define as a private method in BackfillBatch and add a comment for better readability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johanl-db I'm not sure if it is more readable since

  1. It is separate from the function, so I will have to look back and forth.
  2. I will have to pass origTxn, startTimeMs, batchId, numSuccessfulBatch, numFailedBatch as arguments.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, nevermind

Comment on lines +92 to +93
val mayInterruptIfRunning = false
futures.foreach(_.cancel(mayInterruptIfRunning))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val mayInterruptIfRunning = false
futures.foreach(_.cancel(mayInterruptIfRunning))
futures.foreach(_.cancel(mayInterruptIfRunning = false))

Copy link
Contributor Author

@longvu-db longvu-db Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johanl-db This actually doesn't work because cancel is from Java Future so we don't have the Scala's syntactic sugar.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see

Copy link
Collaborator

@johanl-db johanl-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, some minor comments

@longvu-db longvu-db requested a review from johanl-db July 31, 2024 14:35
@tdas tdas merged commit 8eb7a4f into delta-io:master Aug 2, 2024
9 of 10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants