Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@

import org.apache.helix.HelixManager;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.minion.BaseTaskMetadata;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;


/**
* An abstraction on top of {@link HelixManager}, created for the {@link PinotTaskExecutor}, restricted to only
Expand All @@ -37,24 +35,24 @@ public MinionTaskZkMetadataManager(HelixManager helixManager) {
}

/**
* Fetch the ZNRecord under MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask for
* Fetch the ZNRecord under MINION_TASK_METADATA/${tableNameWithType}/${taskType} for
* the given tableNameWithType
*/
public ZNRecord getRealtimeToOfflineSegmentsTaskZNRecord(String tableNameWithType) {
return MinionTaskMetadataUtils
.fetchTaskMetadata(_helixManager.getHelixPropertyStore(), RealtimeToOfflineSegmentsTask.TASK_TYPE,
tableNameWithType);
public ZNRecord getTaskMetadataZNRecord(String tableNameWithType, String taskType) {
return MinionTaskMetadataUtils.fetchTaskMetadata(_helixManager.getHelixPropertyStore(), taskType,
tableNameWithType);
}

/**
* Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into the ZNode at
* MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
* for the corresponding tableNameWithType
* Sets the {@link BaseTaskMetadata} ito the ZNode at
* MINION_TASK_METADATA/${tableNameWithType}/${taskType}
* for the corresponding tableNameWitType
* @param taskMetadata Task metadata which is to be written
* @param taskType taskType for which metadata is to be updated
* @param expectedVersion Version expected to be updating, failing the call if there's a mismatch
*/
public void setRealtimeToOfflineSegmentsTaskMetadata(
RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata, int expectedVersion) {
MinionTaskMetadataUtils.persistTaskMetadata(_helixManager.getHelixPropertyStore(),
RealtimeToOfflineSegmentsTask.TASK_TYPE, realtimeToOfflineSegmentsTaskMetadata, expectedVersion);
public void setTaskMetadataZNRecord(BaseTaskMetadata taskMetadata, String taskType, int expectedVersion) {
MinionTaskMetadataUtils.persistTaskMetadata(_helixManager.getHelixPropertyStore(), taskType, taskMetadata,
expectedVersion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public void preProcess(PinotTaskConfig pinotTaskConfig) {
String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);

ZNRecord realtimeToOfflineSegmentsTaskZNRecord =
_minionTaskZkMetadataManager.getRealtimeToOfflineSegmentsTaskZNRecord(realtimeTableName);
_minionTaskZkMetadataManager.getTaskMetadataZNRecord(realtimeTableName,
RealtimeToOfflineSegmentsTask.TASK_TYPE);
Preconditions.checkState(realtimeToOfflineSegmentsTaskZNRecord != null,
"RealtimeToOfflineSegmentsTaskMetadata ZNRecord for table: %s should not be null. Exiting task.",
realtimeTableName);
Expand Down Expand Up @@ -191,7 +192,8 @@ public void postProcess(PinotTaskConfig pinotTaskConfig) {
long waterMarkMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata =
new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, waterMarkMs);
_minionTaskZkMetadataManager.setRealtimeToOfflineSegmentsTaskMetadata(newMinionMetadata, _expectedVersion);
_minionTaskZkMetadataManager.setTaskMetadataZNRecord(newMinionMetadata, RealtimeToOfflineSegmentsTask.TASK_TYPE,
_expectedVersion);
}

@Override
Expand Down