Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.apache.pinot.common.messages;

import java.util.UUID;
import org.apache.helix.model.Message;
import org.apache.helix.zookeeper.datamodel.ZNRecord;


// FIXME add a description
public class PauselessConsumptionMessage extends Message {
public static final String PAUSELESS_CONSUMPTION_MSG_SUB_TYPE = "PAUSELESS_CONSUMPTION";
public static final String TABLE_NAME_WITH_TYPE = "tableNameWithType";
public static final String COMMITTING_SEGMENT_NAME = "committingSegmentName";
public static final String NEW_CONSUMING_SEGMENT_NAME = "newConsumingSegmentName";

public PauselessConsumptionMessage(String tableNameWithType, String committingSegmentName,
String newConsumingSegmentName) {
super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
setMsgSubType(PAUSELESS_CONSUMPTION_MSG_SUB_TYPE);
setExecutionTimeout(-1); // no timeout
ZNRecord znRecord = getRecord();
znRecord.setSimpleField(TABLE_NAME_WITH_TYPE, tableNameWithType);
znRecord.setSimpleField(COMMITTING_SEGMENT_NAME, committingSegmentName);
znRecord.setSimpleField(NEW_CONSUMING_SEGMENT_NAME, newConsumingSegmentName);
}

public PauselessConsumptionMessage(Message message) {
super(message.getRecord());
String msgSubType = message.getMsgSubType();
if (!msgSubType.equals(PAUSELESS_CONSUMPTION_MSG_SUB_TYPE)) {
throw new IllegalArgumentException(
"Invalid message sub type: " + msgSubType + " for PauselessConsumptionMessage");
}
}

public String getTableNameWithType() {
return getRecord().getSimpleField(TABLE_NAME_WITH_TYPE);
}

public String getCommittingSegmentName() {
return getRecord().getSimpleField(COMMITTING_SEGMENT_NAME);
}

public String getNewConsumingSegmentName() {
return getRecord().getSimpleField(NEW_CONSUMING_SEGMENT_NAME);
}

@Override
public String toString() {
return "PauselessConsumptionMessage{" + "tableNameWithType='" + getTableNameWithType() + '\''
+ ", committingSegmentName='" + getCommittingSegmentName() + '\'' + ", newConsumingSegmentName='"
+ getNewConsumingSegmentName() + '\'' + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.messages.ForceCommitMessage;
import org.apache.pinot.common.messages.IngestionMetricsRemoveMessage;
import org.apache.pinot.common.messages.PauselessConsumptionMessage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
Expand Down Expand Up @@ -529,7 +531,7 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
* Update zookeeper in 3 steps.
*
* Step 1: Update PROPERTYSTORE to change the old segment metadata status to DONE
* Step 2: Update PROPERTYSTORE to create the new segment metadata with status IN_PROGRESS
* Step 2: Update PROPERTYSTORE to create the new segment metadata with status IN_PROGRESS FIXME
* Step 3: Update IDEALSTATES to include new segment in CONSUMING state, and change old segment to ONLINE state.
*/

Expand All @@ -544,32 +546,25 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
long startTimeNs2 = System.nanoTime();
String newConsumingSegmentName = null;
if (!isTablePaused(idealState)) {
StreamConfig streamConfig =
new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
Set<Integer> partitionIds;
try {
partitionIds = getPartitionIds(streamConfig);
} catch (Exception e) {
LOGGER.info("Failed to fetch partition ids from stream metadata provider for table: {}, exception: {}. "
+ "Reading all partition group metadata to determine partition ids.", realtimeTableName, e.toString());
// TODO: Find a better way to determine partition count and if the committing partition group is fully consumed.
// We don't need to read partition group metadata for other partition groups.
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
getPartitionGroupConsumptionStatusList(idealState, streamConfig);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
partitionIds = newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
.collect(Collectors.toSet());
List<String> llcSegments = getLLCSegments(realtimeTableName);
LLCSegmentName latestSegment = null;
for (String segment : llcSegments) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
if (llcSegmentName.getPartitionGroupId() == committingSegmentPartitionGroupId) {
if (latestSegment == null || llcSegmentName.getSequenceNumber() > latestSegment.getSequenceNumber()) {
latestSegment = llcSegmentName;
}
}
}
if (partitionIds.contains(committingSegmentPartitionGroupId)) {
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
long newSegmentCreationTimeMs = getCurrentTimeMs();
LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, partitionIds.size(),
numReplicas);
newConsumingSegmentName = newLLCSegment.getSegmentName();
if (latestSegment != null) {
SegmentZKMetadata metadata = getSegmentZKMetadata(realtimeTableName, latestSegment.getSegmentName());
if (metadata.getStatus() == Status.IN_PROGRESS) {
newConsumingSegmentName = latestSegment.getSegmentName();
} else {
// TODO create new one if cannot find one
// For existing realtime tables, this scenario only happens once and that's after the changes for pauseless
// consumption is deployed
}
}
}

Expand Down Expand Up @@ -1826,4 +1821,101 @@ String moveSegmentFile(String rawTableName, String segmentName, String segmentLo
URI createSegmentPath(String rawTableName, String segmentName) {
return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName));
}

public LLCSegmentName createNextSegmentZkMetadata(IdealState idealState, TableConfig tableConfig,
LLCSegmentName committingSegment, StreamPartitionMsgOffset nextSegmentOffset, int numRowsConsumed) {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableConfig.getTableName());
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
LOGGER.warn("ideal state for committing segment {} is {}", committingSegment.getSegmentName(),
idealState.getInstanceStateMap(committingSegment.getSegmentName()));
Preconditions.checkState(idealState.getInstanceStateMap(committingSegment.getSegmentName())
.containsValue(SegmentStateModel.CONSUMING),
"Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegment);
LLCSegmentName newLLCSegment = null;
if (!isTablePaused(idealState)) {
StreamConfig streamConfig =
new StreamConfig(tableNameWithType, IngestionConfigUtils.getStreamConfigMap(tableConfig));
Set<Integer> allPartitionIds;
try {
allPartitionIds = getPartitionIds(streamConfig);
} catch (Exception e) {
LOGGER.info("Failed to fetch partition ids from stream metadata provider for table: {}, exception: {}. "
+ "Reading all partition group metadata to determine partition ids.", tableNameWithType, e.toString());
// TODO: Find a better way to determine partition count and if the committing partition group is fully consumed.
// We don't need to read partition group metadata for other partition groups.
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
getPartitionGroupConsumptionStatusList(idealState, streamConfig);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
allPartitionIds = newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
.collect(Collectors.toSet());
}
int pId = committingSegment.getPartitionGroupId();
if (allPartitionIds.contains(pId)) {
long newSegmentCreationTimeMs = getCurrentTimeMs();
newLLCSegment = new LLCSegmentName(rawTableName, pId,
committingSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
CommittingSegmentDescriptor descriptor = // TODO refactor createNewSegmentZKMetadata
new CommittingSegmentDescriptor(committingSegment.getSegmentName(), nextSegmentOffset.toString(), -1);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
descriptor, null/*committingSegmentZKMetadata*/, instancePartitions,
allPartitionIds.size(), numReplicas);
}
}
return newLLCSegment;
}

public void startPauselessConsumptionIfEnabled(IdealState idealState, TableConfig tableConfig,
String committingSegment, String newSegment) {

// get stream config
String rawTableName = tableConfig.getTableName();
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
StreamConfig streamConfig = new StreamConfig(rawTableName, IngestionConfigUtils.getStreamConfigMap(tableConfig));

if (streamConfig.isPauselessConsumptionEnabled()) {

// find instances that will host the new consuming segment
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
Collections.singletonMap(InstancePartitionsType.CONSUMING, getConsumingInstancePartitions(tableConfig));
Map<String, Map<String, String>> existingSegmentToInstanceStateMap = idealState.getRecord().getMapFields();
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig, _controllerMetrics);
List<String> newInstances =
segmentAssignment.assignSegment(newSegment, existingSegmentToInstanceStateMap, instancePartitionsMap);

// find target instances that:
// 1) currently host a consuming segment for this partition, and
// 2) will host the new consuming segment
Set<String> targetInstances = existingSegmentToInstanceStateMap.get(committingSegment).keySet();
targetInstances.retainAll(newInstances);

// send consumption message to the target instances to start consumption
ClusterMessagingService messagingService = _helixManager.getMessagingService();
List<String> instancesReceivedMsg = new ArrayList<>();
for (String instance : targetInstances) {
Criteria recipientCriteria = new Criteria();
recipientCriteria.setInstanceName(instance);
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setSessionSpecific(true);
PauselessConsumptionMessage message =
new PauselessConsumptionMessage(tableNameWithType, committingSegment, newSegment);
if (messagingService.send(recipientCriteria, message, null, -1) > 0) {
instancesReceivedMsg.add(instance);
} else {
LOGGER.warn("Failed to send pauseless consumption message to instance: {}, message: {}", instance, message);
}
}
LOGGER.info("Sent pauseless consumption message for committing segment: {}"
+ " and new consuming segment: {} to instances: {}", committingSegment, newSegment, instancesReceivedMsg);
}
}

public boolean isSegmentInExternalView(String tableNameWithType, LLCSegmentName currentCommittingSegment) {
ExternalView externalView = _helixResourceManager.getTableExternalView(tableNameWithType);
return externalView.getPartitionSet().contains(currentCommittingSegment.getSegmentName());
}
}
Loading