-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-6416] Completion markers for handling execution engine (spark) … #9035
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @nbalajee , In general I'm confused why we need two marker files for each base file, before the patch, we have in-progress marker file and write status real paths, we can diff out the corrupt/retry files by comparing the in-progress marker file handles and the paths recorded in writestatus.
And we also have some instant completion check in HoodieFileSystemView, to ignore the files/file blocks that are still pending, so why the reader view could read data sets that are not intented to be exposed?
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
Outdated
Show resolved
Hide resolved
...client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
Show resolved
Hide resolved
...client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
Show resolved
Hide resolved
...lient-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
Outdated
Show resolved
Hide resolved
...lient-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there any changes done around partial file writes. if yes, is there a chance we can simplify for cloud stores like S3.
Our storageScheme has a property called isWriteTransactional and we can leverage that.
I am yet to fully review it. sending out the ones I have for now.
btw, I could not locate the place where we create the FINALIZE_WRITE marker file. can you point me to it.
Also, lets guard all changes under the configs that are newly added. even creation of completion markers.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
Outdated
Show resolved
Hide resolved
...client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
Show resolved
Hide resolved
...client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
Show resolved
Hide resolved
Thanks for your review @dannyhchen and @nsivabalan for the review.
Following diagram summarizes the issue. |
...client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
Show resolved
Hide resolved
f105db7
to
f073527
Compare
f073527
to
e132cdb
Compare
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
Show resolved
Hide resolved
9085094
to
b29e7ca
Compare
…retries. Support for recovering writeStatus from previously successful writer attempt
b29e7ca
to
93088bc
Compare
With respect to the partial files, when the feature flag is enabled, it will reduce the possibility of a partial data files getting created. Since second and subsequent attempts, in the presence of of completed marker, would return success (with previously completed writeStatus) we are minimizing multiple tasks attempting to create files. Deleting the .hoodie/.temp/ marker folder, after the finalizeWrite, ensures that any new task attempting to create a data file (that would show up as a partially written file, when the Job/JVM exits) will fail at the time of writeHandle creation, if the marker folder is absent (and not create a partial file). |
@nbalajee is this can resolve the orphan file which product by spark speculation execution, which create maker file after commit submit |
+ " as the job is trying to re-write the data files, after writes have been finalized."); | ||
} | ||
if (config.optimizeTaskRetriesWithMarkers() | ||
&& writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(partitionPath, fileId, markerInstantTime, getIOType()))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have a question, if one partition split will product two parquet file in stage 1, file name is:
- p1/f1-0_0-1-0_001.parquet
- p1/f1-1_0-1-0_001.parquet
exec:
- if p1/f1-0_0-1-0_001.parquet is write success, and p1/f1-1_0-1-0_001.parquet is failed
- will have
p1/f1-0_001.success.CREATE
file - when task or stage retry, here will check
p1/f1-0_001.success.CREATE
, it will return true, handle will be null, it will cause NullPointExecption? Andp1/f1-1_0-x-x_001.parquet
will can not product?
If I am wrong, please help correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC it should skip and move forward with p1/f1-1_...
. @nbalajee is this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I understand the idea of needing completion markers.
IIUC :
- This fences re-attempts from redoing the same "completed" task again. But I wonder how we resolve TOCTOU issue with direct markers. i.e two tasks (1 original, 1 re-attempt/stage retry) can both create in-progress markers (no checks to prevent this on cloud storage) and both proceed to completion markers (again.
ignoreExisting
may be work atomically on cloud storage) - This also helps avoid re-attempt tasks from wasting cluster/compute resources.
I wonder if we assume fewer things (timeline based markers, timeline server), then we can achieve this by
- Having all marker creation go through a lock on the timeline server/driver -> no TOCTOU issues
- Timeline server can cut down amount of files in marker storage -> scales on cloud storage.
- See if write finalization of data table can happen (i.e reconciliation of markers) after MT table is written -> reduces this situation by a lot?
// If this is a second or subsequent attempt to create the data file, try to recover existing version. | ||
if (recoverWriteStatusIfAvailable(partitionPath,FSUtils.makeBaseFileName(this.instantTime, this.writeToken, | ||
this.fileId, hoodieTable.getBaseFileExtension()), this.instantTime)) { | ||
this.writer = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to close the writer
as well?
if (config.isFailRetriesAfterFinalizeWrite() | ||
&& writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(StringUtils.EMPTY_STRING, | ||
FINALIZE_WRITE_COMPLETED, markerInstantTime, IOType.CREATE))) { | ||
throw new HoodieCorruptedDataException(" Failing retry attempt for instant " + instantTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is n't this actually preventing a re-attempt of the write, failing the stray task? i.e thus preventing corruption. Throwing "CorruptedData" exception makes sense if the table is deemed corrupted at this point, if not, then lets please rename the exception
protected void createMarkerFile(String partitionPath, String dataFileName) { | ||
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime) | ||
.create(partitionPath, dataFileName, getIOType(), config, fileId, hoodieTable.getMetaClient().getActiveTimeline()); | ||
private boolean recoverWriteStatuses(WriteMarkers writeMarkers, String dataFileName) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nts: need to review deeply to ensure the DFS accesses are optimal.
+ " as the job is trying to re-write the data files, after writes have been finalized."); | ||
} | ||
if (config.optimizeTaskRetriesWithMarkers() | ||
&& writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(partitionPath, fileId, markerInstantTime, getIOType()))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC it should skip and move forward with p1/f1-1_...
. @nbalajee is this right?
@@ -766,6 +767,10 @@ protected void reconcileAgainstMarkers(HoodieEngineContext context, | |||
waitForAllFiles(context, invalidPathsByPartition, FileVisibility.DISAPPEAR); | |||
} | |||
} | |||
if (config.isFailRetriesAfterFinalizeWrite()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, there is no good reason for us to finalize the write of the data table, then write to MT table, I feel? Would n't that cut down the number of times you hit these issues, with MT files
partition filtering out files written by any stray executors?
return path.substring(0, path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN)); | ||
} | ||
|
||
public static String stripOldStyleMarkerSuffix(String path) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we change marker file naming, we should see if we can introduce versioning for those? (like we do with other places where bits are written to storage?)
return Option.of(markerPath); | ||
} | ||
|
||
public static long generateChecksum(byte[] data) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nothing reuse from IOUtils? can we move this to some other utils class, more generic across code base
@@ -132,6 +148,25 @@ public Set<String> allMarkerFilePaths() { | |||
} | |||
} | |||
|
|||
@Override | |||
public void createMarkerDir() throws HoodieIOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make timeline based markers work really well IMO. In cloud storage, direct markers can have same access scaling issues that MT solves for e.g
…retries
Change Logs
During spark stage retries, spark driver may have all the information to reconcile the commit and proceed with next steps, while a stray executor may still be writing to a data file and complete later (beyond reconcile step, before the JVM exit).
Extra files left on the dataset, excluded from reconcile commit step could show up as data quality issue for query engines with duplicate records (without use of metadata table)
This change brings completion markers which tries to prevent the dataset from experiencing data quality issues, in such corner case scenarios.
This change, would prevent the second/subsequent tasks/executors from creating additional files (with a different write token) and reuse the write status of successfully completed data files, by storing the serialized write status in the completion marker file (Available only with Direct Markers, but can be extended to timeline server based markers as well). During the second and subsequent attempts, write status stored in the completion marker is returned so that the same data file created by the first attempt can be reused.
Impact
Improved reliability, data quality due to infrastructure related failures, resulting in stage/task retries.
Risk level (write none, low medium or high below)
Low/Medium: This change has been in production for about a year now at Uber.
Documentation Update
ENFORCE_COMPLETION_MARKER_CHECKS - Allows configuring whether to fail the job or continue with retries, when an already completed file is being retried. With a planned change, this would allow the second/subsequent attempt to create a file to succeed using the previously created copy of data.
ENFORCE_FINALIZE_WRITE_CHECK - Allows configuring whether to fail the job if commit reconciliation step has been completed and the write stage is retried (say a block from writeStatus RDD is found to be lost, when iterating over write statues for record index update). Single executor writing to multiple files (data spilling over to more than one file) and stage failure post reconciliation results in data quality issues. This flag is helpful in failing the job, instead of creating incorrect commit.
Contributor's checklist