Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,29 @@ public void open() {
public void processElement(StreamRecord<TableBucketWriteResult<WriteResult>> streamRecord)
throws Exception {
TableBucketWriteResult<WriteResult> tableBucketWriteResult = streamRecord.getValue();

// Check if this is a failure marker from upstream Reader
if (tableBucketWriteResult.isFailedMarker()) {
long failedTableId = tableBucketWriteResult.tableBucket().getTableId();
String failReason = tableBucketWriteResult.failReason();
LOG.info(
"Received failure marker for table {}. Reason: {}. "
+ "Cleaning up collected write results for this table.",
failedTableId,
failReason);

// Clean up any partially collected write results for the failed table
List<TableBucketWriteResult<WriteResult>> removedResults =
collectedTableBucketWriteResults.remove(failedTableId);
if (removedResults != null) {
LOG.info(
"Cleaned up {} collected write results for failed table {}.",
removedResults.size(),
failedTableId);
}
return;
}

TableBucket tableBucket = tableBucketWriteResult.tableBucket();
long tableId = tableBucket.getTableId();
registerTableBucketWriteResult(tableId, tableBucketWriteResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,15 @@ public long getTableId() {
public String failReason() {
return failReason;
}

@Override
public String toString() {
return "FailedTieringEvent{"
+ "tableId="
+ tableId
+ ", failReason='"
+ failReason
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class TableBucketWriteResult<WriteResult> implements Serializable {
@Nullable private final String partitionName;

// will be null when no any data write, such as for tiering an empty log split
// or when this is a failure marker
@Nullable private final WriteResult writeResult;

// the end offset of tiering, should be the last tiered record's offset + 1
Expand All @@ -57,6 +58,12 @@ public class TableBucketWriteResult<WriteResult> implements Serializable {
// for the round of tiering is finished
private final int numberOfWriteResults;

// indicates whether this is a failure marker rather than an actual write result
private final boolean failedMarker;

// the reason for failure when this is a failure marker
@Nullable private final String failReason;

public TableBucketWriteResult(
TablePath tablePath,
TableBucket tableBucket,
Expand All @@ -65,13 +72,53 @@ public TableBucketWriteResult(
long logEndOffset,
long maxTimestamp,
int numberOfWriteResults) {
this(
tablePath,
tableBucket,
partitionName,
writeResult,
logEndOffset,
maxTimestamp,
numberOfWriteResults,
false,
null);
}

private TableBucketWriteResult(
TablePath tablePath,
TableBucket tableBucket,
@Nullable String partitionName,
@Nullable WriteResult writeResult,
long logEndOffset,
long maxTimestamp,
int numberOfWriteResults,
boolean failedMarker,
@Nullable String failReason) {
this.tablePath = tablePath;
this.tableBucket = tableBucket;
this.partitionName = partitionName;
this.writeResult = writeResult;
this.logEndOffset = logEndOffset;
this.maxTimestamp = maxTimestamp;
this.numberOfWriteResults = numberOfWriteResults;
this.failedMarker = failedMarker;
this.failReason = failReason;
}

/**
* Creates a failure marker result that indicates tiering for a table has failed.
*
* @param tableId the ID of the failed table
* @param failReason the reason for the failure
* @param <WriteResult> the type of write result
* @return a failure marker TableBucketWriteResult
*/
public static <WriteResult> TableBucketWriteResult<WriteResult> failedMarker(
long tableId, String failReason) {
// Use a dummy TableBucket with only tableId for the failure marker
TableBucket dummyBucket = new TableBucket(tableId, -1);
return new TableBucketWriteResult<>(
null, dummyBucket, null, null, -1, -1, -1, true, failReason);
}

public TablePath tablePath() {
Expand Down Expand Up @@ -103,4 +150,13 @@ public long logEndOffset() {
public long maxTimestamp() {
return maxTimestamp;
}

public boolean isFailedMarker() {
return failedMarker;
}

@Nullable
public String failReason() {
return failReason;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class TableBucketWriteResultSerializer<WriteResult>
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));

private static final int CURRENT_VERSION = 1;
private static final int CURRENT_VERSION = 2;

private final org.apache.fluss.lake.serializer.SimpleVersionedSerializer<WriteResult>
writeResultSerializer;
Expand All @@ -53,44 +53,58 @@ public int getVersion() {
public byte[] serialize(TableBucketWriteResult<WriteResult> tableBucketWriteResult)
throws IOException {
final DataOutputSerializer out = SERIALIZER_CACHE.get();
// serialize table path
TablePath tablePath = tableBucketWriteResult.tablePath();
out.writeUTF(tablePath.getDatabaseName());
out.writeUTF(tablePath.getTableName());

// serialize bucket
TableBucket tableBucket = tableBucketWriteResult.tableBucket();
out.writeLong(tableBucket.getTableId());
// write partition
if (tableBucket.getPartitionId() != null) {
out.writeBoolean(true);
out.writeLong(tableBucket.getPartitionId());
out.writeUTF(tableBucketWriteResult.partitionName());
} else {
out.writeBoolean(false);
}
out.writeInt(tableBucket.getBucket());

// serialize write result
WriteResult writeResult = tableBucketWriteResult.writeResult();
if (writeResult == null) {
// write -1 to mark write result as null
out.writeInt(-1);
// serialize failed marker flag first
out.writeBoolean(tableBucketWriteResult.isFailedMarker());

if (tableBucketWriteResult.isFailedMarker()) {
// for failed marker, only serialize tableId and failReason
out.writeLong(tableBucketWriteResult.tableBucket().getTableId());
String failReason = tableBucketWriteResult.failReason();
out.writeBoolean(failReason != null);
if (failReason != null) {
out.writeUTF(failReason);
}
} else {
byte[] serializeBytes = writeResultSerializer.serialize(writeResult);
out.writeInt(serializeBytes.length);
out.write(serializeBytes);
// serialize table path
TablePath tablePath = tableBucketWriteResult.tablePath();
out.writeUTF(tablePath.getDatabaseName());
out.writeUTF(tablePath.getTableName());

// serialize bucket
TableBucket tableBucket = tableBucketWriteResult.tableBucket();
out.writeLong(tableBucket.getTableId());
// write partition
if (tableBucket.getPartitionId() != null) {
out.writeBoolean(true);
out.writeLong(tableBucket.getPartitionId());
out.writeUTF(tableBucketWriteResult.partitionName());
} else {
out.writeBoolean(false);
}
out.writeInt(tableBucket.getBucket());

// serialize write result
WriteResult writeResult = tableBucketWriteResult.writeResult();
if (writeResult == null) {
// write -1 to mark write result as null
out.writeInt(-1);
} else {
byte[] serializeBytes = writeResultSerializer.serialize(writeResult);
out.writeInt(serializeBytes.length);
out.write(serializeBytes);
}

// serialize log end offset
out.writeLong(tableBucketWriteResult.logEndOffset());

// serialize max timestamp
out.writeLong(tableBucketWriteResult.maxTimestamp());

// serialize number of write results
out.writeInt(tableBucketWriteResult.numberOfWriteResults());
}

// serialize log end offset
out.writeLong(tableBucketWriteResult.logEndOffset());

// serialize max timestamp
out.writeLong(tableBucketWriteResult.maxTimestamp());

// serialize number of write results
out.writeInt(tableBucketWriteResult.numberOfWriteResults());

final byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
Expand All @@ -99,9 +113,17 @@ public byte[] serialize(TableBucketWriteResult<WriteResult> tableBucketWriteResu
@Override
public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] serialized)
throws IOException {
if (version != CURRENT_VERSION) {
if (version == 1) {
return deserializeV1(serialized);
} else if (version == CURRENT_VERSION) {
return deserializeV2(serialized);
} else {
throw new IOException("Unknown version " + version);
}
}

private TableBucketWriteResult<WriteResult> deserializeV1(byte[] serialized)
throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
// deserialize table path
String databaseName = in.readUTF();
Expand All @@ -125,7 +147,7 @@ public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] seria
if (writeResultLength >= 0) {
byte[] writeResultBytes = new byte[writeResultLength];
in.readFully(writeResultBytes);
writeResult = writeResultSerializer.deserialize(version, writeResultBytes);
writeResult = writeResultSerializer.deserialize(1, writeResultBytes);
} else {
writeResult = null;
}
Expand All @@ -145,4 +167,66 @@ public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] seria
maxTimestamp,
numberOfWriteResults);
}

private TableBucketWriteResult<WriteResult> deserializeV2(byte[] serialized)
throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);

// read failed marker flag
boolean isFailedMarker = in.readBoolean();

if (isFailedMarker) {
// deserialize failed marker
long tableId = in.readLong();
String failReason = null;
if (in.readBoolean()) {
failReason = in.readUTF();
}
return TableBucketWriteResult.failedMarker(tableId, failReason);
} else {
// deserialize table path
String databaseName = in.readUTF();
String tableName = in.readUTF();
TablePath tablePath = new TablePath(databaseName, tableName);

// deserialize bucket
long tableId = in.readLong();
Long partitionId = null;
String partitionName = null;
if (in.readBoolean()) {
partitionId = in.readLong();
partitionName = in.readUTF();
}
int bucketId = in.readInt();
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);

// deserialize write result
int writeResultLength = in.readInt();
WriteResult writeResult;
if (writeResultLength >= 0) {
byte[] writeResultBytes = new byte[writeResultLength];
in.readFully(writeResultBytes);
writeResult =
writeResultSerializer.deserialize(
writeResultSerializer.getVersion(), writeResultBytes);
} else {
writeResult = null;
}

// deserialize log end offset
long logEndOffset = in.readLong();
// deserialize max timestamp
long maxTimestamp = in.readLong();
// deserialize number of write results
int numberOfWriteResults = in.readInt();
return new TableBucketWriteResult<>(
tablePath,
tableBucket,
partitionName,
writeResult,
logEndOffset,
maxTimestamp,
numberOfWriteResults);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,61 @@ public void markTableReachTieringMaxDuration(long tableId) {
}
}

/** Notify the SplitReader that a table tiering has failed and should be cleaned up. */
public void notifyTableTieringFailed(long tableId) {
if (!fetchers.isEmpty()) {
LOG.info("Notifying SplitReader that table {} tiering has failed", tableId);
fetchers.values()
.forEach(
splitFetcher ->
enqueueNotifyTableTieringFailedTask(splitFetcher, tableId));
} else {
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher =
createSplitFetcher();
LOG.info(
"fetchers is empty, enqueue notify table tiering failed for table {}", tableId);
enqueueNotifyTableTieringFailedTask(splitFetcher, tableId);
startFetcher(splitFetcher);
}
}

/**
* Poll all failed table infos from the SplitReaders.
*
* @param consumer the consumer to process each failed table info
*/
public void pollFailedTableInfos(
java.util.function.Consumer<TieringSplitReader.FailedTableInfo> consumer) {
for (SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> fetcher :
fetchers.values()) {
TieringSplitReader<WriteResult> splitReader =
(TieringSplitReader<WriteResult>) fetcher.getSplitReader();
TieringSplitReader.FailedTableInfo failedInfo;
while ((failedInfo = splitReader.pollFailedTableInfo()) != null) {
consumer.accept(failedInfo);
}
}
}

private void enqueueNotifyTableTieringFailedTask(
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher,
long failedTableId) {
splitFetcher.enqueueTask(
new SplitFetcherTask() {
@Override
public boolean run() {
((TieringSplitReader<WriteResult>) splitFetcher.getSplitReader())
.notifyTableTieringFailed(failedTableId);
return true;
}

@Override
public void wakeUp() {
// do nothing
}
});
}

private void enqueueMarkTableReachTieringMaxDurationTask(
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher,
long reachTieringDeadlineTable) {
Expand Down
Loading