Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongqishang committed Jul 19, 2024
1 parent 512e5da commit 948a562
Showing 1 changed file with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception {

// Update the checkpoint state.
long startNano = System.nanoTime();
writeToManifestSinceLastSnapshot(checkpointId);
writeToManifestUptoLatestCheckpoint(checkpointId);

// Reset the snapshot state to the latest state.
checkpointsState.clear();
Expand Down Expand Up @@ -437,19 +437,20 @@ public void processElement(StreamRecord<FlinkWriteResult> element) {
public void endInput() throws IOException {
// Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID;
writeToManifestSinceLastSnapshot(currentCheckpointId);
writeToManifestUptoLatestCheckpoint(currentCheckpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, currentCheckpointId);
}

private void writeToManifestSinceLastSnapshot(long checkpointId) throws IOException {
private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws IOException {
if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
}

for (Map.Entry<Long, List<WriteResult>> writeResultsOfCkpt :
for (Map.Entry<Long, List<WriteResult>> writeResultsOfCheckpoint :
writeResultsSinceLastSnapshot.entrySet()) {
dataFilesPerCheckpoint.put(
writeResultsOfCkpt.getKey(), writeToManifest(writeResultsOfCkpt.getKey()));
writeResultsOfCheckpoint.getKey(),
writeToManifest(writeResultsOfCheckpoint.getKey(), writeResultsOfCheckpoint.getValue()));
}

// Clear the local buffer for current checkpoint.
Expand All @@ -460,8 +461,8 @@ private void writeToManifestSinceLastSnapshot(long checkpointId) throws IOExcept
* Write all the complete data files to a newly created manifest file and return the manifest's
* avro serialized bytes.
*/
private byte[] writeToManifest(long checkpointId) throws IOException {
List<WriteResult> writeResults = writeResultsSinceLastSnapshot.get(checkpointId);
private byte[] writeToManifest(long checkpointId, List<WriteResult> writeResults)
throws IOException {
WriteResult result = WriteResult.builder().addAll(writeResults).build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
Expand Down

0 comments on commit 948a562

Please sign in to comment.