Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,18 @@ public static String constructPropertyStorePathForSegmentLineage(String tableNam
return StringUtil.join("/", PROPERTYSTORE_SEGMENT_LINEAGE, tableNameWithType);
}

public static String getPropertyStorePathForMinionTaskMetadataPrefix() {
return PROPERTYSTORE_MINION_TASK_METADATA_PREFIX;
}

public static String constructPropertyStorePathForMinionTaskMetadata(String tableNameWithType, String taskType) {
return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, tableNameWithType, taskType);
}

public static String constructPropertyStorePathForMinionTaskMetadata(String tableNameWithType) {
return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, tableNameWithType);
}

@Deprecated
public static String constructPropertyStorePathForMinionTaskMetadataDeprecated(String taskType,
String tableNameWithType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.minion;

import java.util.List;
import javax.annotation.Nullable;
import org.apache.helix.AccessOption;
import org.apache.helix.store.HelixPropertyStore;
Expand All @@ -36,7 +37,7 @@ private MinionTaskMetadataUtils() {
}

/**
* Fetches the ZNRecord for the given minion task and tableName. Fetch from the new path
* Fetches the minion task metadata ZNRecord for the given minion task and tableName. Fetch from the new path
* MINION_TASK_METADATA/${tableNameWthType}/{taskType} if it exists; otherwise, fetch from the old path
* MINION_TASK_METADATA/${taskType}/${tableNameWthType}.
*/
Expand All @@ -63,7 +64,7 @@ private static ZNRecord fetchTaskMetadata(HelixPropertyStore<ZNRecord> propertyS
}

/**
* Deletes the ZNRecord for the given minion task and tableName, from both the new path
* Deletes the minion task metadata ZNRecord for the given minion task and tableName, from both the new path
* MINION_TASK_METADATA/${tableNameWthType}/${taskType} and the old path
* MINION_TASK_METADATA/${taskType}/${tableNameWthType}.
*/
Expand All @@ -79,10 +80,44 @@ public static void deleteTaskMetadata(HelixPropertyStore<ZNRecord> propertyStore
}
}

/**
* Deletes the minion task metadata ZNRecord for the given tableName, from both the new path
* MINION_TASK_METADATA/${tableNameWthType} and the old path
* MINION_TASK_METADATA/<any task type>/${tableNameWthType}
*/
public static void deleteTaskMetadata(HelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType) {
// delete the minion task metadata ZNRecord MINION_TASK_METADATA/${tableNameWthType}
String path = ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(tableNameWithType);
if (!propertyStore.remove(path, AccessOption.PERSISTENT)) {
throw new ZkException("Failed to delete task metadata for table: " + tableNameWithType);
}
// delete the minion task metadata ZNRecord MINION_TASK_METADATA/<any task type>/${tableNameWthType}
// TODO: another way of finding old minion task metadata path is: (1) use reflection to find all task types,
// similar to what TaskGeneratorRegistry.java does (2) construct possible old minion task metadata path
// using those types.
// The tradeoff is: (1) the current approach uses ZK as the source of truth, so we will not miss any ZNode
// (2) the other approach will reduce ZK load if there are thousands of tables, because we need to talk to
// the ZK to find all its direct children in the current approach.
List<String> childNames =
propertyStore.getChildNames(ZKMetadataProvider.getPropertyStorePathForMinionTaskMetadataPrefix(),
AccessOption.PERSISTENT);
if (childNames != null && !childNames.isEmpty()) {
for (String child : childNames) {
// Even though some child names are not task types (e.g., in the new metadata path, the child name
// is a table name), it does not harm to try to delete the non-existent constructed path.
String oldPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadataDeprecated(child, tableNameWithType);
if (!propertyStore.remove(oldPath, AccessOption.PERSISTENT)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the path doesn't exist, propertyStore.remove() will return false. In this case, an exception will be thrown and the for loop will stop, which contradicts to the comment several lines above though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I tested locally using integration test, if we are trying to remove a non-existent path, it returns true.

Could you please let me know your test setup so I can try it myself? Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nvm, saw the source code in helix-core jar. I feel the better approach would be to traverse all the existing task types instead of going over all the children nodes (there could be thousands of tables in a cluster), but for now I'm fine with this. You can add a TODO here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have another version implemented locally 😄 , which scans jars and find all minion task types, similar to what https://github.com/apache/pinot/blob/master/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java does.

I chose the current one (which reads from ZK) because I want to use ZK as the source of truth, so we will not miss any ZNode.

But you point out a very good point that there might be thousands of tables. I can add a TODO and we can decide later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know your thought, we can switch to the other approach or we can merge the current approach and get back to it later.

(BTW, could you please merge it if we want to go with the current approach as for now because I cannot merge the code 😄 )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get back to it later. I'll merge this one. Thanks for making the change!

throw new ZkException("Failed to delete task metadata: " + child + ", " + tableNameWithType);
}
}
}
}

/**
* Generic method for persisting {@link BaseTaskMetadata} to MINION_TASK_METADATA. The metadata will
* be saved in the ZNode under the new path /MINION_TASK_METADATA/${tableNameWithType}/${taskType} if
* the old path already exists; otherwise, it will be saved in the ZNode under the old path
* it exists or the old path does not exist; otherwise, it will be saved in the ZNode under the old path
* /MINION_TASK_METADATA/${taskType}/${tableNameWithType}.
*
* Will fail if expectedVersion does not match.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.pinot.common.utils.helix;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
Expand All @@ -40,6 +42,15 @@ public ZNRecord get(String path, Stat stat, int options) {
return _contents.get(path);
}

@Override
public List<String> getChildNames(String parentPath, int options) {
return _contents.keySet().stream()
.filter(e -> e.startsWith(parentPath))
.map(e -> e.replaceFirst(parentPath + "/", "").split("/")[0])
.distinct()
.collect(Collectors.toList());
}

@Override
public boolean exists(String path, int options) {
return _contents.containsKey(path);
Expand Down Expand Up @@ -72,7 +83,8 @@ public boolean set(String path, ZNRecord stat, int options) {

@Override
public boolean remove(String path, int options) {
_contents.remove(path);
List<String> descendants = _contents.keySet().stream().filter(e -> e.startsWith(path)).collect(Collectors.toList());
descendants.forEach(e -> _contents.remove(e));
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,32 @@ public void testDeleteTaskMetadata() {
MinionTaskMetadataUtils.deleteTaskMetadata(propertyStore, TASK_TYPE, TABLE_NAME_WITH_TYPE);
assertFalse(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
assertFalse(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));

// 1. ZNode MINION_TASK_METADATA/TestTable_OFFLINE and its descendants will be removed
// 2. ZNode MINION_TASK_METADATA/<any task type>/TestTable_OFFLINE will also be removed
String anotherTable = "anotherTable_OFFLINE";
String anotherOldMinionMetadataPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadataDeprecated(TASK_TYPE, anotherTable);
DummyTaskMetadata anotherOldTaskMetadata = new DummyTaskMetadata(anotherTable, 20);
String anotherNewMinionMetadataPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(anotherTable, TASK_TYPE);
DummyTaskMetadata anotherNewTaskMetadata = new DummyTaskMetadata(anotherTable, 200);
propertyStore = new FakePropertyStore();
propertyStore.set(OLD_MINION_METADATA_PATH, OLD_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
propertyStore.set(NEW_MINION_METADATA_PATH, NEW_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
propertyStore.set(anotherOldMinionMetadataPath, anotherOldTaskMetadata.toZNRecord(),
EXPECTED_VERSION, ACCESS_OPTION);
propertyStore.set(anotherNewMinionMetadataPath, anotherNewTaskMetadata.toZNRecord(),
EXPECTED_VERSION, ACCESS_OPTION);
assertTrue(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
assertTrue(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
assertTrue(propertyStore.exists(anotherOldMinionMetadataPath, ACCESS_OPTION));
assertTrue(propertyStore.exists(anotherNewMinionMetadataPath, ACCESS_OPTION));
MinionTaskMetadataUtils.deleteTaskMetadata(propertyStore, TABLE_NAME_WITH_TYPE);
assertFalse(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
assertFalse(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
assertTrue(propertyStore.exists(anotherOldMinionMetadataPath, ACCESS_OPTION));
assertTrue(propertyStore.exists(anotherNewMinionMetadataPath, ACCESS_OPTION));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.segment.local.utils.ReplicationUtils;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.ConfigUtils;
Expand Down Expand Up @@ -1859,9 +1858,8 @@ public void deleteOfflineTable(String tableName, @Nullable String retentionPerio
LOGGER.info("Deleting table {}: Removed segment lineage", offlineTableName);

// Remove task related metadata
MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore, MinionConstants.MergeRollupTask.TASK_TYPE,
offlineTableName);
LOGGER.info("Deleting table {}: Removed merge rollup task metadata", offlineTableName);
MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore, offlineTableName);
LOGGER.info("Deleting table {}: Removed all minion task metadata", offlineTableName);

// Remove table config
// this should always be the last step for deletion to avoid race condition in table re-create.
Expand Down Expand Up @@ -1922,13 +1920,8 @@ public void deleteRealtimeTable(String tableName, @Nullable String retentionPeri
LOGGER.info("Deleting table {}: Removed segment lineage", realtimeTableName);

// Remove task related metadata
MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore, MinionConstants.MergeRollupTask.TASK_TYPE,
realtimeTableName);
LOGGER.info("Deleting table {}: Removed merge rollup task metadata", realtimeTableName);

MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore, MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
realtimeTableName);
LOGGER.info("Deleting table {}: Removed merge realtime to offline metadata", realtimeTableName);
MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore, realtimeTableName);
LOGGER.info("Deleting table {}: Removed all minion task metadata", realtimeTableName);

// Remove groupId/partitionId mapping for HLC table
if (instancesForTable != null) {
Expand Down