Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6416] Completion markers for handling execution engine (spark) … #9035

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

nbalajee
Copy link
Contributor

@nbalajee nbalajee commented Jun 22, 2023

…retries

Change Logs

During spark stage retries, spark driver may have all the information to reconcile the commit and proceed with next steps, while a stray executor may still be writing to a data file and complete later (beyond reconcile step, before the JVM exit).

Extra files left on the dataset, excluded from reconcile commit step could show up as data quality issue for query engines with duplicate records (without use of metadata table)

This change brings completion markers which tries to prevent the dataset from experiencing data quality issues, in such corner case scenarios.

This change, would prevent the second/subsequent tasks/executors from creating additional files (with a different write token) and reuse the write status of successfully completed data files, by storing the serialized write status in the completion marker file (Available only with Direct Markers, but can be extended to timeline server based markers as well). During the second and subsequent attempts, write status stored in the completion marker is returned so that the same data file created by the first attempt can be reused.

Impact

Improved reliability, data quality due to infrastructure related failures, resulting in stage/task retries.

Risk level (write none, low medium or high below)

Low/Medium: This change has been in production for about a year now at Uber.

Documentation Update

ENFORCE_COMPLETION_MARKER_CHECKS - Allows configuring whether to fail the job or continue with retries, when an already completed file is being retried. With a planned change, this would allow the second/subsequent attempt to create a file to succeed using the previously created copy of data.

ENFORCE_FINALIZE_WRITE_CHECK - Allows configuring whether to fail the job if commit reconciliation step has been completed and the write stage is retried (say a block from writeStatus RDD is found to be lost, when iterating over write statues for record index update). Single executor writing to multiple files (data spilling over to more than one file) and stage failure post reconciliation results in data quality issues. This flag is helpful in failing the job, instead of creating incorrect commit.

Contributor's checklist

  • [x ] Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @nbalajee , In general I'm confused why we need two marker files for each base file, before the patch, we have in-progress marker file and write status real paths, we can diff out the corrupt/retry files by comparing the in-progress marker file handles and the paths recorded in writestatus.

And we also have some instant completion check in HoodieFileSystemView, to ignore the files/file blocks that are still pending, so why the reader view could read data sets that are not intented to be exposed?

@danny0405 danny0405 added priority:minor everything else; usability gaps; questions; feature reqs and removed priority:blocker labels Jun 22, 2023
@danny0405 danny0405 self-assigned this Jun 22, 2023
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there any changes done around partial file writes. if yes, is there a chance we can simplify for cloud stores like S3.
Our storageScheme has a property called isWriteTransactional and we can leverage that.

I am yet to fully review it. sending out the ones I have for now.
btw, I could not locate the place where we create the FINALIZE_WRITE marker file. can you point me to it.
Also, lets guard all changes under the configs that are newly added. even creation of completion markers.

@nbalajee
Copy link
Contributor Author

Thanks for the contribution @nbalajee , In general I'm confused why we need two marker files for each base file, before the patch, we have in-progress marker file and write status real paths, we can diff out the corrupt/retry files by comparing the in-progress marker file handles and the paths recorded in writestatus.

And we also have some instant completion check in HoodieFileSystemView, to ignore the files/file blocks that are still pending, so why the reader view could read data sets that are not intented to be exposed?

Thanks for your review @dannyhchen and @nsivabalan for the review.

Thanks for the contribution @nbalajee , In general I'm confused why we need two marker files for each base file, before the patch, we have in-progress marker file and write status real paths, we can diff out the corrupt/retry files by comparing the in-progress marker file handles and the paths recorded in writestatus.

And we also have some instant completion check in HoodieFileSystemView, to ignore the files/file blocks that are still pending, so why the reader view could read data sets that are not intented to be exposed?

Following diagram summarizes the issue.
(a) when a batch of records given to an executor for writing, spills over to multiple data files (split into multiple parts due to file size limits, f1-0_w1_c1.parquet, f1-1_w1_c1.parquet etc)
(b) A spark stage is retried as a result all tasks are retried (some of the tasks from previous attempts could still be on-going). Mainly happens with spark fetchfailed exception.

Screenshot 2023-06-25 at 9 15 35 PM

@nbalajee nbalajee force-pushed the completion_marker_HUDI_6416 branch 2 times, most recently from f105db7 to f073527 Compare June 30, 2023 22:05
@nsivabalan nsivabalan added priority:blocker and removed priority:minor everything else; usability gaps; questions; feature reqs labels Jul 5, 2023
@nbalajee nbalajee force-pushed the completion_marker_HUDI_6416 branch from f073527 to e132cdb Compare July 6, 2023 04:29
@nbalajee nbalajee force-pushed the completion_marker_HUDI_6416 branch 2 times, most recently from 9085094 to b29e7ca Compare July 10, 2023 17:35
@bvaradar bvaradar self-assigned this Jul 11, 2023
…retries.

Support for recovering writeStatus from previously successful writer attempt
@nbalajee nbalajee force-pushed the completion_marker_HUDI_6416 branch from b29e7ca to 93088bc Compare July 11, 2023 19:34
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nbalajee
Copy link
Contributor Author

With respect to the partial files, when the feature flag is enabled, it will reduce the possibility of a partial data files getting created. Since second and subsequent attempts, in the presence of of completed marker, would return success (with previously completed writeStatus) we are minimizing multiple tasks attempting to create files.

Deleting the .hoodie/.temp/ marker folder, after the finalizeWrite, ensures that any new task attempting to create a data file (that would show up as a partially written file, when the Job/JVM exits) will fail at the time of writeHandle creation, if the marker folder is absent (and not create a partial file).

@KnightChess
Copy link
Contributor

@nbalajee is this can resolve the orphan file which product by spark speculation execution, which create maker file after commit submit

+ " as the job is trying to re-write the data files, after writes have been finalized.");
}
if (config.optimizeTaskRetriesWithMarkers()
&& writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(partitionPath, fileId, markerInstantTime, getIOType()))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have a question, if one partition split will product two parquet file in stage 1, file name is:

  • p1/f1-0_0-1-0_001.parquet
  • p1/f1-1_0-1-0_001.parquet

exec:

  1. if p1/f1-0_0-1-0_001.parquet is write success, and p1/f1-1_0-1-0_001.parquet is failed
  2. will have p1/f1-0_001.success.CREATE file
  3. when task or stage retry, here will check p1/f1-0_001.success.CREATE, it will return true, handle will be null, it will cause NullPointExecption? And p1/f1-1_0-x-x_001.parquet will can not product?

If I am wrong, please help correct

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC it should skip and move forward with p1/f1-1_... . @nbalajee is this right?

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Feb 26, 2024
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I understand the idea of needing completion markers.
IIUC :

  1. This fences re-attempts from redoing the same "completed" task again. But I wonder how we resolve TOCTOU issue with direct markers. i.e two tasks (1 original, 1 re-attempt/stage retry) can both create in-progress markers (no checks to prevent this on cloud storage) and both proceed to completion markers (again. ignoreExisting may be work atomically on cloud storage)
  2. This also helps avoid re-attempt tasks from wasting cluster/compute resources.

I wonder if we assume fewer things (timeline based markers, timeline server), then we can achieve this by

  • Having all marker creation go through a lock on the timeline server/driver -> no TOCTOU issues
  • Timeline server can cut down amount of files in marker storage -> scales on cloud storage.
  • See if write finalization of data table can happen (i.e reconciliation of markers) after MT table is written -> reduces this situation by a lot?

// If this is a second or subsequent attempt to create the data file, try to recover existing version.
if (recoverWriteStatusIfAvailable(partitionPath,FSUtils.makeBaseFileName(this.instantTime, this.writeToken,
this.fileId, hoodieTable.getBaseFileExtension()), this.instantTime)) {
this.writer = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to close the writer as well?

if (config.isFailRetriesAfterFinalizeWrite()
&& writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(StringUtils.EMPTY_STRING,
FINALIZE_WRITE_COMPLETED, markerInstantTime, IOType.CREATE))) {
throw new HoodieCorruptedDataException(" Failing retry attempt for instant " + instantTime
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is n't this actually preventing a re-attempt of the write, failing the stray task? i.e thus preventing corruption. Throwing "CorruptedData" exception makes sense if the table is deemed corrupted at this point, if not, then lets please rename the exception

protected void createMarkerFile(String partitionPath, String dataFileName) {
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
.create(partitionPath, dataFileName, getIOType(), config, fileId, hoodieTable.getMetaClient().getActiveTimeline());
private boolean recoverWriteStatuses(WriteMarkers writeMarkers, String dataFileName) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nts: need to review deeply to ensure the DFS accesses are optimal.

+ " as the job is trying to re-write the data files, after writes have been finalized.");
}
if (config.optimizeTaskRetriesWithMarkers()
&& writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(partitionPath, fileId, markerInstantTime, getIOType()))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC it should skip and move forward with p1/f1-1_... . @nbalajee is this right?

@@ -766,6 +767,10 @@ protected void reconcileAgainstMarkers(HoodieEngineContext context,
waitForAllFiles(context, invalidPathsByPartition, FileVisibility.DISAPPEAR);
}
}
if (config.isFailRetriesAfterFinalizeWrite()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, there is no good reason for us to finalize the write of the data table, then write to MT table, I feel? Would n't that cut down the number of times you hit these issues, with MT files partition filtering out files written by any stray executors?

return path.substring(0, path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN));
}

public static String stripOldStyleMarkerSuffix(String path) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we change marker file naming, we should see if we can introduce versioning for those? (like we do with other places where bits are written to storage?)

return Option.of(markerPath);
}

public static long generateChecksum(byte[] data) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nothing reuse from IOUtils? can we move this to some other utils class, more generic across code base

@@ -132,6 +148,25 @@ public Set<String> allMarkerFilePaths() {
}
}

@Override
public void createMarkerDir() throws HoodieIOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make timeline based markers work really well IMO. In cloud storage, direct markers can have same access scaling issues that MT solves for e.g

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:blocker size:XL PR with lines of changes > 1000
Projects
Status: 🏗 Under discussion
Development

Successfully merging this pull request may close these issues.

8 participants