Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,7 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
_resourceTmpDir.mkdirs();
}
_state = State.INITIAL_CONSUMING;
fetchLatestStreamOffset();
_latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000);
_consumeStartTime = now();
setConsumeEndTime(segmentZKMetadata, _consumeStartTime);
_segmentCommitterFactory =
Expand Down Expand Up @@ -1429,15 +1429,16 @@ private void setConsumeEndTime(SegmentZKMetadata segmentZKMetadata, long now) {
}
}

private void fetchLatestStreamOffset() {
try (StreamMetadataProvider metadataProvider = _streamConsumerFactory
.createPartitionMetadataProvider(_clientId, _partitionGroupId)) {
_latestStreamOffsetAtStartupTime =
metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/5000);
public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) {
try (StreamMetadataProvider metadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId,
_partitionGroupId)) {
return metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, maxWaitTimeMs);
} catch (Exception e) {
_segmentLogger.warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}", _clientId,
_partitionGroupId);
_segmentLogger.warn(
"Cannot fetch latest stream offset for clientId {} and partitionGroupId {} with maxWaitTime {}", _clientId,
_partitionGroupId, maxWaitTimeMs);
}
return null;
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,22 @@ private void registerServiceStatusHandler() {
boolean isOffsetBasedConsumptionStatusCheckerEnabled =
_serverConf.getProperty(Server.CONFIG_OF_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER,
Server.DEFAULT_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER);
boolean isFreshnessStatusCheckerEnabled =
_serverConf.getProperty(Server.CONFIG_OF_ENABLE_REALTIME_FRESHNESS_BASED_CONSUMPTION_STATUS_CHECKER,
Server.DEFAULT_ENABLE_REALTIME_FRESHNESS_BASED_CONSUMPTION_STATUS_CHECKER);
int realtimeMinFreshnessMs = _serverConf.getProperty(Server.CONFIG_OF_STARTUP_REALTIME_MIN_FRESHNESS_MS,
Server.DEFAULT_STARTUP_REALTIME_MIN_FRESHNESS_MS);

// collect all resources which have this instance in the ideal state
List<String> resourcesToMonitor = new ArrayList<>();

Set<String> consumingSegments = new HashSet<>();
boolean checkRealtime = realtimeConsumptionCatchupWaitMs > 0;
if (isFreshnessStatusCheckerEnabled && realtimeMinFreshnessMs <= 0) {
LOGGER.warn("Realtime min freshness {} must be > 0. Setting relatime min freshness to default {}.",
realtimeMinFreshnessMs, Server.DEFAULT_STARTUP_REALTIME_MIN_FRESHNESS_MS);
realtimeMinFreshnessMs = Server.DEFAULT_STARTUP_REALTIME_MIN_FRESHNESS_MS;
}

for (String resourceName : _helixAdmin.getResourcesInCluster(_helixClusterName)) {
// Only monitor table resources
Expand Down Expand Up @@ -280,16 +290,34 @@ private void registerServiceStatusHandler() {
_instanceId, resourcesToMonitor, minResourcePercentForStartup));
boolean foundConsuming = !consumingSegments.isEmpty();
if (checkRealtime && foundConsuming) {
Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset = null;
if (isOffsetBasedConsumptionStatusCheckerEnabled) {
// We specifically put the freshness based checker first to ensure it's the only one setup if both checkers
// are accidentally enabled together. The freshness based checker is a stricter version of the offset based
// checker. But in the end, both checkers are bounded in time by realtimeConsumptionCatchupWaitMs.
if (isFreshnessStatusCheckerEnabled) {
LOGGER.info("Setting up freshness based status checker");
FreshnessBasedConsumptionStatusChecker freshnessStatusChecker =
new FreshnessBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments,
realtimeMinFreshnessMs);
Supplier<Integer> getNumConsumingSegmentsNotReachedMinFreshness =
freshnessStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria;
serviceStatusCallbackListBuilder.add(
new ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager, _helixClusterName,
_instanceId, realtimeConsumptionCatchupWaitMs, getNumConsumingSegmentsNotReachedMinFreshness));
} else if (isOffsetBasedConsumptionStatusCheckerEnabled) {
LOGGER.info("Setting up offset based status checker");
OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments);
getNumConsumingSegmentsNotReachedTheirLatestOffset =
consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset;
Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset =
consumptionStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria;
serviceStatusCallbackListBuilder.add(
new ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager, _helixClusterName,
_instanceId, realtimeConsumptionCatchupWaitMs, getNumConsumingSegmentsNotReachedTheirLatestOffset));
} else {
LOGGER.info("Setting up static time based status checker");
serviceStatusCallbackListBuilder.add(
new ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager, _helixClusterName,
_instanceId, realtimeConsumptionCatchupWaitMs, null));
}
serviceStatusCallbackListBuilder.add(
new ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager, _helixClusterName,
_instanceId, realtimeConsumptionCatchupWaitMs, getNumConsumingSegmentsNotReachedTheirLatestOffset));
}
LOGGER.info("Registering service status handler");
ServiceStatus.setServiceStatusCallback(_instanceId,
Expand Down Expand Up @@ -340,10 +368,10 @@ private void updateInstanceConfigIfNeeded(ServerConf serverConf) {

// Update multi-stage query engine ports
if (serverConf.isMultiStageServerEnabled()) {
updated |= updatePortIfNeeded(simpleFields,
Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY, serverConf.getMultiStageServicePort());
updated |= updatePortIfNeeded(simpleFields,
Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY, serverConf.getMultiStageMailboxPort());
updated |= updatePortIfNeeded(simpleFields, Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY,
serverConf.getMultiStageServicePort());
updated |= updatePortIfNeeded(simpleFields, Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY,
serverConf.getMultiStageMailboxPort());
}

// Update environment properties
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.server.starter.helix;

import java.util.Set;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;


/**
* This class is used at startup time to have a more accurate estimate of the catchup period in which no query
* execution happens and consumers try to catch up to the latest messages available in streams.
* To achieve this, every time status check is called - {@link #getNumConsumingSegmentsNotReachedIngestionCriteria} -
* for each consuming segment, we check if either:
* - the segment's latest ingested offset has reached the current stream offset that's
* - the last ingested message is within {@link #_minFreshnessMs} of the current system time
*/
public class FreshnessBasedConsumptionStatusChecker extends IngestionBasedConsumptionStatusChecker {
private final long _minFreshnessMs;

public FreshnessBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments,
long minFreshnessMs) {
super(instanceDataManager, consumingSegments);
_minFreshnessMs = minFreshnessMs;
}

private boolean isOffsetCaughtUp(StreamPartitionMsgOffset currentOffset, StreamPartitionMsgOffset latestOffset) {
if (currentOffset != null && latestOffset != null) {
// Kafka's "latest" offset is actually the next available offset. Therefore it will be 1 ahead of the
// current offset in the case we are caught up.
// TODO: implement a way to have this work correctly for kafka consumers
return currentOffset.compareTo(latestOffset) >= 0;
}
return false;
}

protected long now() {
return System.currentTimeMillis();
}

@Override
protected boolean isSegmentCaughtUp(String segmentName, LLRealtimeSegmentDataManager rtSegmentDataManager) {
long now = now();
long latestIngestionTimestamp =
rtSegmentDataManager.getSegment().getSegmentMetadata().getLatestIngestionTimestamp();
long freshnessMs = now - latestIngestionTimestamp;

// We check latestIngestionTimestamp >= 0 because the default freshness when unknown is Long.MIN_VALUE
if (latestIngestionTimestamp >= 0 && freshnessMs <= _minFreshnessMs) {
_logger.info("Segment {} with freshness {}ms has caught up within min freshness {}", segmentName, freshnessMs,
_minFreshnessMs);
return true;
}

// For stream partitions that see very low volume, it's possible we're already caught up but the oldest
// message is too old to pass the freshness check. We check this condition separately to avoid hitting
// the stream consumer to check partition count if we're already caught up.
StreamPartitionMsgOffset currentOffset = rtSegmentDataManager.getCurrentOffset();
StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.fetchLatestStreamOffset(5000);
if (isOffsetCaughtUp(currentOffset, latestStreamOffset)) {
_logger.info("Segment {} with freshness {}ms has not caught up within min freshness {}."
+ "But the current ingested offset is equal to the latest available offset {}.", segmentName, freshnessMs,
_minFreshnessMs, currentOffset);
return true;
}

_logger.info("Segment {} with freshness {}ms has not caught up within "
+ "min freshness {}. At offset {}. Latest offset {}.",
segmentName, freshnessMs, _minFreshnessMs, currentOffset, latestStreamOffset);
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.server.starter.helix;

import java.util.HashSet;
import java.util.Set;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public abstract class IngestionBasedConsumptionStatusChecker {
protected final Logger _logger = LoggerFactory.getLogger(getClass());

// constructor parameters
protected final InstanceDataManager _instanceDataManager;
protected final Set<String> _consumingSegments;

// helper variable
private final Set<String> _caughtUpSegments = new HashSet<>();

public IngestionBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager,
Set<String> consumingSegments) {
_instanceDataManager = instanceDataManager;
_consumingSegments = consumingSegments;
}

public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
for (String segName : _consumingSegments) {
if (_caughtUpSegments.contains(segName)) {
continue;
}
TableDataManager tableDataManager = getTableDataManager(segName);
if (tableDataManager == null) {
_logger.info("TableDataManager is not yet setup for segment {}. Will check consumption status later", segName);
continue;
}
SegmentDataManager segmentDataManager = null;
try {
segmentDataManager = tableDataManager.acquireSegment(segName);
if (segmentDataManager == null) {
_logger.info("SegmentDataManager is not yet setup for segment {}. Will check consumption status later",
segName);
continue;
}
if (!(segmentDataManager instanceof LLRealtimeSegmentDataManager)) {
// There's a possibility that a consuming segment has converted to a committed segment. If that's the case,
// segment data manager will not be of type LLRealtime.
_logger.info("Segment {} is already committed and is considered caught up.", segName);
_caughtUpSegments.add(segName);
continue;
}

LLRealtimeSegmentDataManager rtSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager;
if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
_caughtUpSegments.add(segName);
}
} finally {
if (segmentDataManager != null) {
tableDataManager.releaseSegment(segmentDataManager);
}
}
}
return _consumingSegments.size() - _caughtUpSegments.size();
}

protected abstract boolean isSegmentCaughtUp(String segmentName, LLRealtimeSegmentDataManager rtSegmentDataManager);

private TableDataManager getTableDataManager(String segmentName) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
String tableName = llcSegmentName.getTableName();
String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
return _instanceDataManager.getTableDataManager(tableNameWithType);
}
}
Loading