Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,15 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
UNTRACKED_SEGMENTS_COUNT("untrackedSegmentsCount", false),

// Metric used to track errors during the periodic table retention management
RETENTION_MANAGER_ERROR("retentionManagerError", false);
RETENTION_MANAGER_ERROR("retentionManagerError", false),

// Metric used to track when segments in error state are detected for pauseless table
PAUSELESS_SEGMENTS_IN_ERROR_COUNT("pauselessSegmentsInErrorCount", false),

// Metric used to track when segments in error state are detected for pauseless table for which needs
// manual intervention for repair
PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT("pauselessSegmentsInUnrecoverableErrorCount", false);


private final String _gaugeName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
Expand Down Expand Up @@ -2359,9 +2360,10 @@ URI createSegmentPath(String rawTableName, String segmentName) {
* Request body (JSON):
*
* If segment is in ERROR state in only few replicas but has download URL, we instead trigger a segment reset
* @param realtimeTableName The table name with type, e.g. "myTable_REALTIME"
* @param tableConfig The table config
*/
public void repairSegmentsInErrorStateForPauselessConsumption(String realtimeTableName) {
public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig tableConfig) {
String realtimeTableName = tableConfig.getTableName();
// Fetch ideal state and external view
IdealState idealState = getIdealState(realtimeTableName);
ExternalView externalView = _helixResourceManager.getTableExternalView(realtimeTableName);
Expand Down Expand Up @@ -2425,7 +2427,12 @@ public void repairSegmentsInErrorStateForPauselessConsumption(String realtimeTab
}
}


if (segmentsInErrorStateInAtLeastOneReplica.isEmpty()) {
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 0);
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 0);
return;
}

Expand All @@ -2434,6 +2441,23 @@ public void repairSegmentsInErrorStateForPauselessConsumption(String realtimeTab
segmentsInErrorStateInAtLeastOneReplica.size(), segmentsInErrorStateInAtLeastOneReplica,
segmentsInErrorStateInAllReplicas.size(), segmentsInErrorStateInAllReplicas, realtimeTableName);

boolean isPartialUpsertEnabled =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to move this after the warning log below. We should also log something because we are not able to fix the segments. Probably an error log

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably also emit a metric for error segments not able to fix. We need to get immediate alert on it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

tableConfig.getUpsertConfig() != null && tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL;
boolean isDedupEnabled = tableConfig.getDedupConfig() != null && tableConfig.getDedupConfig().isDedupEnabled();
if ((isPartialUpsertEnabled || isDedupEnabled)) {
// We do not run reingestion for dedup and partial upsert tables in pauseless as it can
// lead to data inconsistencies
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, segmentsInErrorStateInAllReplicas.size());
LOGGER.error("Skipping repair for errored segments in table: {} because dedup or partial upsert is enabled.",
realtimeTableName);
return;
} else {
_controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, segmentsInErrorStateInAllReplicas.size());
}


for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) {
SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName);
if (segmentZKMetadata == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ protected void processTable(String tableNameWithType, Context context) {

boolean isPauselessConsumptionEnabled = PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
if (isPauselessConsumptionEnabled) {
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig.getTableName());
// For pauseless tables without dedup or partial upsert, repair segments in error state
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig);
} else if (_segmentAutoResetOnErrorAtValidation) {
// Reset for pauseless tables is already handled in repairSegmentsInErrorStateForPauselessConsumption method with
// additional checks for pauseless consumption
Expand Down
Loading