Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.ServerRouteInfo;
import org.apache.pinot.core.routing.TablePartitionInfo;
import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
Expand Down Expand Up @@ -833,6 +834,17 @@ public TablePartitionInfo getTablePartitionInfo(String tableNameWithType) {
return partitionMetadataManager != null ? partitionMetadataManager.getTablePartitionInfo() : null;
}

@Nullable
@Override
public TablePartitionReplicatedServersInfo getTablePartitionReplicatedServersInfo(String tableNameWithType) {
RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
if (routingEntry == null) {
return null;
}
SegmentPartitionMetadataManager partitionMetadataManager = routingEntry.getPartitionMetadataManager();
return partitionMetadataManager != null ? partitionMetadataManager.getTablePartitionReplicatedServersInfo() : null;
}

@Nullable
@Override
public Set<String> getServingInstances(String tableNameWithType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.core.routing.TablePartitionInfo;
import org.apache.pinot.core.routing.TablePartitionInfo.PartitionInfo;
import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo.PartitionInfo;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.slf4j.Logger;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi

// computed value based on status change.
private transient TablePartitionInfo _tablePartitionInfo;
private transient TablePartitionReplicatedServersInfo _tablePartitionReplicatedServersInfo;

public SegmentPartitionMetadataManager(String tableNameWithType, String partitionColumn, String partitionFunctionName,
int numPartitions) {
Expand All @@ -88,7 +90,7 @@ public void init(IdealState idealState, ExternalView externalView, List<String>
getOnlineServers(externalView, segment));
_segmentInfoMap.put(segment, segmentInfo);
}
computeTablePartitionInfo();
computeAllTablePartitionInfo();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding config to turn each of them on/off to reduce overhead. Can be done as a follow up

}

private int getPartitionId(String segment, @Nullable ZNRecord znRecord) {
Expand Down Expand Up @@ -136,7 +138,68 @@ private static List<String> getOnlineServers(ExternalView externalView, String s
return onlineServers;
}

private void computeTablePartitionInfo() {
@Override
public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: GH is not great at showing this but I have simply moved the compute table partition info methods towards the end. however reviewers can double check this claim.

Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord> znRecords) {
// Update segment partition id for the pulled segments
int numSegments = pulledSegments.size();
for (int i = 0; i < numSegments; i++) {
String segment = pulledSegments.get(i);
ZNRecord znRecord = znRecords.get(i);
SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(segment, znRecord), getCreationTimeMs(znRecord),
getOnlineServers(externalView, segment));
_segmentInfoMap.put(segment, segmentInfo);
}
// Update online servers for all online segments
for (String segment : onlineSegments) {
SegmentInfo segmentInfo = _segmentInfoMap.get(segment);
if (segmentInfo == null) {
// NOTE: This should not happen, but we still handle it gracefully by adding an invalid SegmentInfo
LOGGER.error("Failed to find segment info for segment: {} in table: {} while handling assignment change",
segment, _tableNameWithType);
segmentInfo =
new SegmentInfo(INVALID_PARTITION_ID, INVALID_CREATION_TIME_MS, getOnlineServers(externalView, segment));
_segmentInfoMap.put(segment, segmentInfo);
} else {
segmentInfo._onlineServers = getOnlineServers(externalView, segment);
}
}
_segmentInfoMap.keySet().retainAll(onlineSegments);
computeAllTablePartitionInfo();
}

@Override
public synchronized void refreshSegment(String segment, @Nullable ZNRecord znRecord) {
int partitionId = getPartitionId(segment, znRecord);
long pushTimeMs = getCreationTimeMs(znRecord);
SegmentInfo segmentInfo = _segmentInfoMap.get(segment);
if (segmentInfo == null) {
// NOTE: This should not happen, but we still handle it gracefully by adding an invalid SegmentInfo
LOGGER.error("Failed to find segment info for segment: {} in table: {} while handling segment refresh", segment,
_tableNameWithType);
segmentInfo = new SegmentInfo(partitionId, pushTimeMs, Collections.emptyList());
_segmentInfoMap.put(segment, segmentInfo);
} else {
segmentInfo._partitionId = partitionId;
segmentInfo._creationTimeMs = pushTimeMs;
}
computeAllTablePartitionInfo();
}

public TablePartitionInfo getTablePartitionInfo() {
return _tablePartitionInfo;
}

public TablePartitionReplicatedServersInfo getTablePartitionReplicatedServersInfo() {
return _tablePartitionReplicatedServersInfo;
}

private void computeAllTablePartitionInfo() {
computeTablePartitionReplicatedServersInfo();
computeTablePartitionInfo();
}

private void computeTablePartitionReplicatedServersInfo() {
PartitionInfo[] partitionInfoMap = new PartitionInfo[_numPartitions];
List<String> segmentsWithInvalidPartition = new ArrayList<>();
List<Pair<String, Integer>> unavailableSegments = new ArrayList<>();
Expand Down Expand Up @@ -187,7 +250,7 @@ private void computeTablePartitionInfo() {
}
if (!segmentsWithInvalidPartition.isEmpty()) {
int numSegments = segmentsWithInvalidPartition.size();
LOGGER.warn("Found {} segments: {} with invalid partition in table: {}", numSegments,
LOGGER.warn("(table-partition-rs-info) Found {} segments: {} with invalid partition in table: {}", numSegments,
numSegments <= 10 ? segmentsWithInvalidPartition : segmentsWithInvalidPartition.subList(0, 10) + "...",
_tableNameWithType);
}
Expand Down Expand Up @@ -242,61 +305,40 @@ private void computeTablePartitionInfo() {
numSegments <= 10 ? excludedNewSegments : excludedNewSegments.subList(0, 10) + "...", _tableNameWithType);
}
}
_tablePartitionInfo =
new TablePartitionInfo(_tableNameWithType, _partitionColumn, _partitionFunctionName, _numPartitions,
partitionInfoMap, segmentsWithInvalidPartition);
_tablePartitionReplicatedServersInfo =
new TablePartitionReplicatedServersInfo(_tableNameWithType, _partitionColumn, _partitionFunctionName,
_numPartitions, partitionInfoMap, segmentsWithInvalidPartition);
}

@Override
public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord> znRecords) {
// Update segment partition id for the pulled segments
int numSegments = pulledSegments.size();
for (int i = 0; i < numSegments; i++) {
String segment = pulledSegments.get(i);
ZNRecord znRecord = znRecords.get(i);
SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(segment, znRecord), getCreationTimeMs(znRecord),
getOnlineServers(externalView, segment));
_segmentInfoMap.put(segment, segmentInfo);
private void computeTablePartitionInfo() {
List<List<String>> segmentsByPartition = new ArrayList<>();
for (int i = 0; i < _numPartitions; i++) {
segmentsByPartition.add(new ArrayList<>());
}
// Update online servers for all online segments
for (String segment : onlineSegments) {
SegmentInfo segmentInfo = _segmentInfoMap.get(segment);
if (segmentInfo == null) {
// NOTE: This should not happen, but we still handle it gracefully by adding an invalid SegmentInfo
LOGGER.error("Failed to find segment info for segment: {} in table: {} while handling assignment change",
segment, _tableNameWithType);
segmentInfo =
new SegmentInfo(INVALID_PARTITION_ID, INVALID_CREATION_TIME_MS, getOnlineServers(externalView, segment));
_segmentInfoMap.put(segment, segmentInfo);
List<String> segmentsWithInvalidPartition = new ArrayList<>();
for (Map.Entry<String, SegmentInfo> entry : _segmentInfoMap.entrySet()) {
String segment = entry.getKey();
SegmentInfo segmentInfo = entry.getValue();
int partitionId = segmentInfo._partitionId;
if (partitionId == INVALID_PARTITION_ID) {
segmentsWithInvalidPartition.add(segment);
} else if (partitionId < segmentsByPartition.size()) {
segmentsByPartition.get(partitionId).add(segment);
} else {
segmentInfo._onlineServers = getOnlineServers(externalView, segment);
LOGGER.warn("Found segment: {} with partitionId: {} larger than numPartitions: {} in table: {}",
segment, partitionId, _numPartitions, _tableNameWithType);
segmentsWithInvalidPartition.add(segment);
}
}
_segmentInfoMap.keySet().retainAll(onlineSegments);
computeTablePartitionInfo();
}

@Override
public synchronized void refreshSegment(String segment, @Nullable ZNRecord znRecord) {
int partitionId = getPartitionId(segment, znRecord);
long pushTimeMs = getCreationTimeMs(znRecord);
SegmentInfo segmentInfo = _segmentInfoMap.get(segment);
if (segmentInfo == null) {
// NOTE: This should not happen, but we still handle it gracefully by adding an invalid SegmentInfo
LOGGER.error("Failed to find segment info for segment: {} in table: {} while handling segment refresh", segment,
if (!segmentsWithInvalidPartition.isEmpty()) {
int numSegments = segmentsWithInvalidPartition.size();
LOGGER.warn("(table-partition-info) Found {} segments: {} with invalid partition in table: {}", numSegments,
numSegments <= 10 ? segmentsWithInvalidPartition : segmentsWithInvalidPartition.subList(0, 10) + "...",
_tableNameWithType);
segmentInfo = new SegmentInfo(partitionId, pushTimeMs, Collections.emptyList());
_segmentInfoMap.put(segment, segmentInfo);
} else {
segmentInfo._partitionId = partitionId;
segmentInfo._creationTimeMs = pushTimeMs;
}
computeTablePartitionInfo();
}

public TablePartitionInfo getTablePartitionInfo() {
return _tablePartitionInfo;
_tablePartitionInfo =
new TablePartitionInfo(_tableNameWithType, _partitionColumn, _partitionFunctionName, _numPartitions,
segmentsByPartition, segmentsWithInvalidPartition);
}

private static class SegmentInfo {
Expand Down
Loading
Loading