Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
import org.apache.fluss.server.log.LogTablet;
import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.ServerTags;
import org.apache.fluss.types.DataTypeChecks;
Expand Down Expand Up @@ -1594,4 +1596,76 @@ public void testAddAndRemoveServerTags() throws Exception {
"Server tag PERMANENT_OFFLINE not exists for server 2, the current "
+ "server tag of this server is TEMPORARY_OFFLINE.");
}

@Test
void testAlterTableTieredLogLocalSegments() throws Exception {
// 1. Create table with default config = 2
TablePath tablePath = TablePath.of("test_db", "test_alter_tiered_segments");
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DEFAULT_SCHEMA)
.comment("test table for tiered log segments")
.distributedBy(3)
.build();

admin.createTable(tablePath, tableDescriptor, false).get();

// 2. Verify initial config (metadata level)
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.getTableConfig().getTieredLogLocalSegments()).isEqualTo(2);

// 3. Get LogTablet from TabletServer and verify initial state
long tableId = tableInfo.getTableId();
TableBucket tableBucket = new TableBucket(tableId, 0); // Get first bucket

// Wait and get leader replica
Replica replica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
LogTablet logTablet = replica.getLogTablet();

// Verify initial LogTablet internal state
assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(2);

// 4. Modify to 5 via Admin API
List<TableChange> tableChanges = new ArrayList<>();
tableChanges.add(TableChange.set(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), "5"));
admin.alterTable(tablePath, tableChanges, false).get();

// 5. Verify metadata has been updated
tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.getTableConfig().getTieredLogLocalSegments()).isEqualTo(5);

// 6. Wait for config to propagate to TabletServer (via UpdateMetadataRequest)
// Use retry mechanism to ensure config has propagated
waitUntil(
() -> logTablet.getTieredLogLocalSegments() == 5,
Duration.ofSeconds(30),
"Waiting for config to propagate to TabletServer");

// 7. Verify LogTablet internal state has been updated
assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(5);

// 8. Reset to default value
tableChanges.clear();
tableChanges.add(TableChange.reset(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key()));
admin.alterTable(tablePath, tableChanges, false).get();

// 9. Verify metadata reset success (config should be removed, using default value 2)
tableInfo = admin.getTableInfo(tablePath).get();
TableDescriptor td = tableInfo.toTableDescriptor();
assertThat(
td.getProperties()
.containsKey(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key()))
.isFalse();

// 10. Wait for config reset to propagate, verify LogTablet uses default value 2
waitUntil(
() -> logTablet.getTieredLogLocalSegments() == 2,
Duration.ofSeconds(30),
"Waiting for config reset to propagate to TabletServer");

assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(2);

// 11. Cleanup
admin.dropTable(tablePath, false).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public class FlussConfigUtils {
ALTERABLE_TABLE_OPTIONS =
Arrays.asList(
ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
ConfigOptions.TABLE_DATALAKE_FRESHNESS.key());
ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key());
}

public static boolean isTableStorageConfig(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public final class LogTablet {
private final Scheduler scheduler;
private final ScheduledFuture<?> writerExpireCheck;
private final LogFormat logFormat;
private final int tieredLogLocalSegments;
private volatile int tieredLogLocalSegments;
private final Clock clock;
private final boolean isChangeLog;

Expand Down Expand Up @@ -526,6 +526,14 @@ public void updateIsDataLakeEnabled(boolean isDataLakeEnabled) {
this.isDataLakeEnabled = isDataLakeEnabled;
}

public void updateTieredLogLocalSegments(int tieredLogLocalSegments) {
this.tieredLogLocalSegments = tieredLogLocalSegments;
}

public int getTieredLogLocalSegments() {
return tieredLogLocalSegments;
}

public void updateLakeTableSnapshotId(long snapshotId) {
if (snapshotId > this.lakeTableSnapshotId) {
this.lakeTableSnapshotId = snapshotId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,27 @@ public void updateIsDataLakeEnabled(boolean isDataLakeEnabled) {
isDataLakeEnabled);
}

/**
* Update the number of log segments to retain in local storage. This method is called when the
* table configuration is altered.
*
* @param tieredLogLocalSegments the new number of segments to retain locally
*/
public void updateTieredLogLocalSegments(int tieredLogLocalSegments) {
int oldValue = logTablet.getTieredLogLocalSegments();
if (oldValue == tieredLogLocalSegments) {
return;
}

logTablet.updateTieredLogLocalSegments(tieredLogLocalSegments);

LOG.info(
"Replica for {} tieredLogLocalSegments changed from {} to {}",
tableBucket,
oldValue,
tieredLogLocalSegments);
}

private void createKv() {
try {
// create a closeable registry for the closable related to kv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,16 +466,24 @@ public void maybeUpdateMetadataCache(int coordinatorEpoch, ClusterMetadata clust

private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
Map<Long, Boolean> tableIdToLakeFlag = new HashMap<>();
Map<Long, Integer> tableIdToTieredLogLocalSegments = new HashMap<>();

for (TableMetadata tableMetadata : clusterMetadata.getTableMetadataList()) {
TableInfo tableInfo = tableMetadata.getTableInfo();
long tableId = tableInfo.getTableId();

// Collect datalake enabled configuration
if (tableInfo.getTableConfig().getDataLakeFormat().isPresent()) {
long tableId = tableInfo.getTableId();
boolean dataLakeEnabled = tableInfo.getTableConfig().isDataLakeEnabled();
tableIdToLakeFlag.put(tableId, dataLakeEnabled);
}

// Collect tiered log local segments configuration
int tieredLogLocalSegments = tableInfo.getTableConfig().getTieredLogLocalSegments();
tableIdToTieredLogLocalSegments.put(tableId, tieredLogLocalSegments);
}

if (tableIdToLakeFlag.isEmpty()) {
if (tableIdToLakeFlag.isEmpty() && tableIdToTieredLogLocalSegments.isEmpty()) {
return;
}

Expand All @@ -484,9 +492,17 @@ private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
if (hostedReplica instanceof OnlineReplica) {
Replica replica = ((OnlineReplica) hostedReplica).getReplica();
long tableId = replica.getTableBucket().getTableId();

// Update datalake enabled configuration
if (tableIdToLakeFlag.containsKey(tableId)) {
replica.updateIsDataLakeEnabled(tableIdToLakeFlag.get(tableId));
}

// Update tiered log local segments configuration
if (tableIdToTieredLogLocalSegments.containsKey(tableId)) {
replica.updateTieredLogLocalSegments(
tableIdToTieredLogLocalSegments.get(tableId));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,65 @@ void testLookupOffsetForTimestamp(boolean partitionTable) throws Exception {
.isEqualTo(-1L);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testAlterTableTieredLogLocalSegments(boolean partitionedTable) throws Exception {
// 1. Create table with initial config tieredLogLocalSegments = 2
long tableId =
registerTableInZkClient(
DATA1_TABLE_PATH,
DATA1_SCHEMA,
200L,
Collections.emptyList(),
Collections.singletonMap(
ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), "2"));
TableBucket tb = makeTableBucket(tableId, partitionedTable);
makeLogTableAsLeader(tb, partitionedTable);

Replica replica = replicaManager.getReplicaOrException(tb);
LogTablet logTablet = replica.getLogTablet();

// Verify initial config
assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(2);

// 2. Generate 10 segments, upload 9 to remote (excluding active segment)
addMultiSegmentsToLogTablet(logTablet, 10);
remoteLogTaskScheduler.triggerPeriodicScheduledTasks();

// Verify upload success
List<RemoteLogSegment> remoteSegments = remoteLogManager.relevantRemoteLogSegments(tb, 0L);
assertThat(remoteSegments).hasSize(9);

// Verify 2 local segments retained
assertThat(logTablet.getSegments()).hasSize(2);

// 3. Directly update config via Replica (simulating metadata propagation)
replica.updateTieredLogLocalSegments(5);

// Verify LogTablet internal state has been updated
assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(5);

// 4. Generate more segments and trigger cleanup, verify new config takes effect
addMultiSegmentsToLogTablet(logTablet, 10);
remoteLogTaskScheduler.triggerPeriodicScheduledTasks();

// Should retain 5 local segments
assertThat(logTablet.getSegments()).hasSize(5);

// 5. Modify config to 3 again, verify multiple modifications work
replica.updateTieredLogLocalSegments(3);

// Verify LogTablet internal state updated again
assertThat(logTablet.getTieredLogLocalSegments()).isEqualTo(3);

// Generate more segments and verify new config takes effect
addMultiSegmentsToLogTablet(logTablet, 5);
remoteLogTaskScheduler.triggerPeriodicScheduledTasks();

// Should retain 3 local segments
assertThat(logTablet.getSegments()).hasSize(3);
}

private TableBucket makeTableBucket(boolean partitionTable) {
return makeTableBucket(DATA1_TABLE_ID, partitionTable);
}
Expand Down