Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -61,8 +62,7 @@ public class PinotRealtimeTableResource {
@POST
@Path("/tables/{tableName}/pauseConsumption")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Pause consumption of a realtime table",
notes = "Pause the consumption of a realtime table")
@ApiOperation(value = "Pause consumption of a realtime table", notes = "Pause the consumption of a realtime table")
public Response pauseConsumption(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
Expand All @@ -77,14 +77,22 @@ public Response pauseConsumption(
@POST
@Path("/tables/{tableName}/resumeConsumption")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Resume consumption of a realtime table",
notes = "Resume the consumption for a realtime table")
@ApiOperation(value = "Resume consumption of a realtime table", notes =
"Resume the consumption for a realtime table. ConsumeFrom parameter indicates from which offsets "
+ "consumption should resume. If consumeFrom parameter is not provided, consumption continues based on the "
+ "offsets in segment ZK metadata, and in case the offsets are already gone, the first available offsets are "
+ "picked to minimize the data loss.")
public Response resumeConsumption(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "smallest | largest") @QueryParam("consumeFrom") String consumeFrom) {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
validate(tableNameWithType);
if (consumeFrom != null && !consumeFrom.equalsIgnoreCase("smallest") && !consumeFrom.equalsIgnoreCase("largest")) {
throw new ControllerApplicationException(LOGGER,
String.format("consumeFrom param '%s' is not valid.", consumeFrom), Response.Status.BAD_REQUEST);
}
try {
return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType)).build();
return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType, consumeFrom)).build();
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -873,25 +873,28 @@ private Map<Integer, SegmentZKMetadata> getLatestSegmentZKMetadataMap(String rea
* which means it's manually triggered by admin not by automatic periodic task)
*/
public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
boolean recreateDeletedConsumingSegment) {
boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");

String realtimeTableName = tableConfig.getTableName();
HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
assert idealState != null;
boolean isTableEnabled = idealState.isEnabled();
boolean isTablePaused = isTablePaused(idealState);
boolean offsetsHaveToChange = offsetCriteria != null;
if (isTableEnabled && !isTablePaused) {
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
getPartitionGroupConsumptionStatusList(idealState, streamConfig);
// Read the smallest offset when a new partition is detected
offsetsHaveToChange
? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions
: getPartitionGroupConsumptionStatusList(idealState, streamConfig);
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
// Read the smallest offset when a new partition is detected
streamConfig.setOffsetCriteria(offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
streamConfig.setOffsetCriteria(originalOffsetCriteria);
return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList,
recreateDeletedConsumingSegment);
recreateDeletedConsumingSegment, offsetCriteria);
} else {
LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}",
realtimeTableName, isTableEnabled, isTablePaused);
Expand Down Expand Up @@ -1052,7 +1055,7 @@ private boolean isAllInstancesInState(Map<String, String> instanceStateMap, Stri
@VisibleForTesting
IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
IdealState idealState, List<PartitionGroupMetadata> newPartitionGroupMetadataList,
boolean recreateDeletedConsumingSegment) {
boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) {
String realtimeTableName = tableConfig.getTableName();

InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
Expand All @@ -1074,6 +1077,16 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS
// Get the latest segment ZK metadata for each partition
Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName);

// create a map of <parition id, start offset> using data already fetched in newPartitionGroupMetadataList
Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can u add a comment before this line what this map is supposed to contain?

The logic in this class is getting quite hard to read, can we even base class some methods and sub-class the partitionGroup vs partitionId for the two different type of streams we suppoer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add the comments.

For the refactoring, I agree with you that this class is hard to read. It already has more than 1500 lines and it's worth refactoring, but the refactoring is outside the scope of this PR.

for (PartitionGroupMetadata metadata : newPartitionGroupMetadataList) {
partitionGroupIdToStartOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
}
Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestStreamOffset = null;
if (offsetCriteria == OffsetCriteria.SMALLEST_OFFSET_CRITERIA) {
partitionGroupIdToSmallestStreamOffset = partitionGroupIdToStartOffset;
}

// Walk over all partitions that we have metadata for, and repair any partitions necessary.
// Possible things to repair:
// 1. The latest metadata is in DONE state, but the idealstate says segment is CONSUMING:
Expand Down Expand Up @@ -1144,21 +1157,37 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS
// 3. we should never end up with some replicas ONLINE and some OFFLINE.
if (isAllInstancesInState(instanceStateMap, SegmentStateModel.OFFLINE)) {
LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", latestSegmentName);
StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
if (partitionGroupIdToSmallestStreamOffset == null) {
// Smallest offset is fetched from stream once and stored in partitionGroupIdToSmallestStreamOffset.
// This is to prevent fetching the same info for each and every partition group.
partitionGroupIdToSmallestStreamOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig);
}
StreamPartitionMsgOffset startOffset =
selectStartOffset(offsetCriteria, partitionGroupId, partitionGroupIdToStartOffset,
partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory,
latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning
createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs,
partitionGroupId, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap,
segmentAssignment, instancePartitionsMap, startOffset);
newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
instancePartitionsMap, startOffset);
} else {
if (newPartitionGroupSet.contains(partitionGroupId)) {
if (recreateDeletedConsumingSegment && latestSegmentZKMetadata.getStatus().isCompleted()
&& isAllInstancesInState(instanceStateMap, SegmentStateModel.ONLINE)) {
// If we get here, that means in IdealState, the latest segment has all replicas ONLINE.
// Create a new IN_PROGRESS segment in PROPERTYSTORE,
// add it as CONSUMING segment to IDEALSTATE.
StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getEndOffset());
if (partitionGroupIdToSmallestStreamOffset == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may make things more readable if we can get the smallest offset all the time? Does it involve multiple calls to the stream, and is that what we are optimizing here? If so, good to add a comment. Otherwise, getting it once unconditionally make make things a bit more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is for optimization. Some topics have hundreds of partitions and we shouldn't call the stream to get the same metadata hundreds of times. I'll add the comments.

// Smallest offset is fetched from stream once and stored in partitionGroupIdToSmallestStreamOffset.
// This is to prevent fetching the same info for each and every partition group.
partitionGroupIdToSmallestStreamOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig);
}
StreamPartitionMsgOffset startOffset =
selectStartOffset(offsetCriteria, partitionGroupId, partitionGroupIdToStartOffset,
partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory,
latestSegmentZKMetadata.getEndOffset());
createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs,
partitionGroupId, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap,
segmentAssignment, instancePartitionsMap, startOffset);
newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
instancePartitionsMap, startOffset);
} else {
LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap,
latestSegmentName);
Expand Down Expand Up @@ -1220,31 +1249,14 @@ && new LLCSegmentName(segmentEntry.getKey()).getPartitionGroupId() == partitionG
}

private void createNewConsumingSegment(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, int partitionGroupId,
SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs,
List<PartitionGroupMetadata> newPartitionGroupMetadataList, InstancePartitions instancePartitions,
Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, StreamPartitionMsgOffset startOffset) {
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
int numPartitions = newPartitionGroupMetadataList.size();
LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentZKMetadata.getSegmentName());
LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
StreamPartitionMsgOffset partitionGroupSmallestOffset =
getPartitionGroupSmallestOffset(streamConfig, partitionGroupId);

if (partitionGroupSmallestOffset != null) {
// Start offset must be higher than the start offset of the stream
if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) {
LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset,
partitionGroupSmallestOffset, partitionGroupId, tableConfig.getTableName());
_controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
startOffset = partitionGroupSmallestOffset;
}
} else {
LOGGER.error("Smallest offset for partition: {} of table: {} not found. Using startOffset: {}", partitionGroupId,
tableConfig.getTableName(), startOffset);
_controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
}

CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), startOffset.toString(), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor,
Expand All @@ -1254,21 +1266,39 @@ private void createNewConsumingSegment(TableConfig tableConfig, PartitionLevelSt
instancePartitionsMap);
}

@Nullable
private StreamPartitionMsgOffset getPartitionGroupSmallestOffset(StreamConfig streamConfig, int partitionGroupId) {
private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) {
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata =
List<PartitionGroupMetadata> partitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
streamConfig.setOffsetCriteria(originalOffsetCriteria);
StreamPartitionMsgOffset partitionStartOffset = null;
for (PartitionGroupMetadata info : smallestOffsetCriteriaPartitionGroupMetadata) {
if (info.getPartitionGroupId() == partitionGroupId) {
partitionStartOffset = info.getStartOffset();
break;
Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = new HashMap<>();
for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
}
return partitionGroupIdToSmallestOffset;
}

private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria, int partitionGroupId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you consider removing the offsetCriteria from the argument here, and incorporating the logic to deal with a non-null value of offsetCriteria outside this method? Not sure if it will make the logic more readable, but worth a try, I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

selectStartOffset method is used in two places. If we move the non-null check out of this method, we need to repeat the non-null check two times. That's why it was added to the method.
Also, this method select the start offset. If the offset criteria is provided, it gets the start offset from one map and if not, it gets it from the other map. So that's another reason I think having the non-null check belongs to this method.

Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset,
Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestStreamOffset, String tableName,
StreamPartitionMsgOffsetFactory offsetFactory, String startOffsetInSegmentZkMetadataStr) {
if (offsetCriteria != null) {
// use the fetched offset according to offset criteria
return partitionGroupIdToStartOffset.get(partitionGroupId);
} else {
// use offset from segment ZK metadata
StreamPartitionMsgOffset startOffsetInSegmentZkMetadata = offsetFactory.create(startOffsetInSegmentZkMetadataStr);
StreamPartitionMsgOffset streamSmallestOffset = partitionGroupIdToSmallestStreamOffset.get(partitionGroupId);
// Start offset in ZK must be higher than the start offset of the stream
if (streamSmallestOffset.compareTo(startOffsetInSegmentZkMetadata) > 0) {
LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffsetInSegmentZkMetadata,
streamSmallestOffset, partitionGroupId, tableName);
_controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
return streamSmallestOffset;
}
return startOffsetInSegmentZkMetadata;
}
return partitionStartOffset;
}

private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) {
Expand Down Expand Up @@ -1448,12 +1478,15 @@ public PauseStatus pauseConsumption(String tableNameWithType) {
* 1) setting "isTablePaused" in ideal states to false and
* 2) triggering segment validation job to create new consuming segments in ideal states
*/
public PauseStatus resumeConsumption(String tableNameWithType) {
public PauseStatus resumeConsumption(String tableNameWithType, @Nullable String offsetCriteria) {
IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, false);

// trigger realtime segment validation job to resume consumption
Map<String, String> taskProperties = new HashMap<>();
taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true");
if (offsetCriteria != null) {
taskProperties.put(RealtimeSegmentValidationManager.OFFSET_CRITERIA, offsetCriteria);
}
_helixResourceManager
.invokeControllerPeriodicTask(tableNameWithType, Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
Expand All @@ -55,6 +56,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
private long _lastSegmentLevelValidationRunTimeMs = 0L;

public static final String RECREATE_DELETED_CONSUMING_SEGMENT_KEY = "recreateDeletedConsumingSegment";
public static final String OFFSET_CRITERIA = "offsetCriteria";

public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
Expand Down Expand Up @@ -82,7 +84,10 @@ protected Context preprocess(Properties periodicTaskProperties) {
}
context._recreateDeletedConsumingSegment =
Boolean.parseBoolean(periodicTaskProperties.getProperty(RECREATE_DELETED_CONSUMING_SEGMENT_KEY));

String offsetCriteriaStr = periodicTaskProperties.getProperty(OFFSET_CRITERIA);
if (offsetCriteriaStr != null) {
context._offsetCriteria = new OffsetCriteria.OffsetCriteriaBuilder().withOffsetString(offsetCriteriaStr);
}
return context;
}

Expand All @@ -106,7 +111,7 @@ protected void processTable(String tableNameWithType, Context context) {

if (streamConfig.hasLowLevelConsumerType()) {
_llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig,
context._recreateDeletedConsumingSegment);
context._recreateDeletedConsumingSegment, context._offsetCriteria);
}
}

Expand Down Expand Up @@ -178,6 +183,7 @@ public void cleanUpTask() {
public static final class Context {
private boolean _runSegmentLevelValidation;
private boolean _recreateDeletedConsumingSegment;
private OffsetCriteria _offsetCriteria;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,8 @@ public void testStopSegmentManager()
// Expected
}
try {
segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, false);
segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, false,
null);
fail();
} catch (IllegalStateException e) {
// Expected
Expand Down Expand Up @@ -1115,7 +1116,7 @@ public void setUpNewTable() {

public void ensureAllPartitionsConsuming() {
ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), false);
getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), false, null);
}

@Override
Expand Down