-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Add RowTrackingBackfillCommand #3449
[Spark] Add RowTrackingBackfillCommand #3449
Conversation
...c/test/scala/org/apache/spark/sql/delta/rowtracking/RowTrackingConflictResolutionSuite.scala
Outdated
Show resolved
Hide resolved
def recordBackfillBatchStats(txnId: String, wasSuccessful: Boolean): Unit = { | ||
if (wasSuccessful) { | ||
numSuccessfulBatch.incrementAndGet() | ||
} else { | ||
numFailedBatch.incrementAndGet() | ||
} | ||
val totalExecutionTimeInMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs) | ||
val batchStats = BackfillBatchStats( | ||
origTxn.txnId, txnId, batchId, filesInBatch.size, totalExecutionTimeInMs, wasSuccessful) | ||
recordDeltaEvent( | ||
origTxn.deltaLog, | ||
opType = backfillBatchStatsOpType, | ||
data = batchStats | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: define as a private method in BackfillBatch and add a comment for better readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johanl-db I'm not sure if it is more readable since
- It is separate from the function, so I will have to look back and forth.
- I will have to pass
origTxn
,startTimeMs
,batchId
,numSuccessfulBatch
,numFailedBatch
as arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right, nevermind
spark/src/main/scala/org/apache/spark/sql/delta/commands/backfill/BackfillExecutor.scala
Outdated
Show resolved
Hide resolved
val mayInterruptIfRunning = false | ||
futures.foreach(_.cancel(mayInterruptIfRunning)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val mayInterruptIfRunning = false | |
futures.foreach(_.cancel(mayInterruptIfRunning)) | |
futures.foreach(_.cancel(mayInterruptIfRunning = false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johanl-db This actually doesn't work because cancel
is from Java Future so we don't have the Scala's syntactic sugar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, some minor comments
…ow-tracking-backfill-command
Which Delta project/connector is this regarding?
Description
Adding the RowTrackingBackfillCommand, the ability to assign row IDs to table rows after the table creation.
How was this patch tested?
Added UTs.
Does this PR introduce any user-facing changes?
No.