Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ rules:
cache: true
labels:
table: "$1"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableTotalSizeOnServer.(\\w+)_(\\w+)\"><>(\\w+)"
name: "pinot_controller_tableTotalSizeOnServer_$3"
labels:
table: "$1"
tableType: "$2"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableSizePerReplicaOnServer.(\\w+)_(\\w+)\"><>(\\w+)"
name: "pinot_controller_tableSizePerReplicaOnServer_$3"
labels:
table: "$1"
tableType: "$2"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableCompressedSize.(\\w+)_(\\w+)\"><>(\\w+)"
name: "pinot_controller_tableCompressedSize_$3"
labels:
table: "$1"
tableType: "$2"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableQuota.(\\w+)_(\\w+)\"><>(\\w+)"
name: "pinot_controller_tableQuota_$3"
cache: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public void testTimeBoundaryUpdate() {
_helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, segmentToRefresh);
_helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, segmentToRefresh, newEndTime),
segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null);
segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null, -1);

TestUtils.waitForCondition(aVoid -> routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME).getTimeValue()
.equals(Integer.toString(newEndTime - 1)), 30_000L, "Failed to update the time boundary for refreshed segment");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ public void setTotalDocs(long totalDocs) {
setNonNegativeValue(Segment.TOTAL_DOCS, totalDocs);
}

public void setSizeInBytes(long sizeInBytes) {
setNonNegativeValue(Segment.SIZE_IN_BYTES, sizeInBytes);
}

public long getSizeInBytes() {
return _znRecord.getLongField(Segment.SIZE_IN_BYTES, -1);
}

public long getCrc() {
return _znRecord.getLongField(Segment.CRC, -1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,18 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
CONTROLLER_LEADER_PARTITION_COUNT("ControllerLeaderPartitionCount", true),

// Estimated size of offline table
@Deprecated // Instead use TABLE_TOTAL_SIZE_ON_SERVER
OFFLINE_TABLE_ESTIMATED_SIZE("OfflineTableEstimatedSize", false),

// Total size of table across replicas on servers
TABLE_TOTAL_SIZE_ON_SERVER("TableTotalSizeOnServer", false),

// Size of table per replica on servers
TABLE_SIZE_PER_REPLICA_ON_SERVER("TableSizePerReplicaOnServer", false),

// Total size of compressed segments per table
TABLE_COMPRESSED_SIZE("TableCompressedSize", false),

// Table quota based on setting in table config
TABLE_QUOTA("TableQuotaBasedOnTableConfig", false),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,8 @@ protected List<PeriodicTask> setupControllerPeriodicTasks() {
new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics);
periodicTasks.add(_brokerResourceValidationManager);
_segmentStatusChecker =
new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
_executorService);
periodicTasks.add(_segmentStatusChecker);
_segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
_executorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ private static long getRandomInitialDelayInSeconds() {
private static final String DEFAULT_DIM_TABLE_MAX_SIZE = "200M";

private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName();
public static final String DISABLE_GROOVY = "controller.disable.ingestion.groovy";
private static final String DISABLE_GROOVY = "controller.disable.ingestion.groovy";

public ControllerConf() {
super(new HashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,28 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl
boolean uploadedSegmentIsEncrypted = !Strings.isNullOrEmpty(crypterClassNameInHeader);
FileUploadDownloadClient.FileUploadType uploadType = getUploadType(uploadTypeStr);
File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : tempDecryptedFile;
long segmentSizeInBytes;
switch (uploadType) {
case URI:
downloadSegmentFileFromURI(downloadUri, dstFile, tableName);
segmentSizeInBytes = dstFile.length();
break;
case SEGMENT:
createSegmentFileFromMultipart(multiPart, dstFile);
segmentSizeInBytes = dstFile.length();
break;
case METADATA:
moveSegmentToFinalLocation = false;
Preconditions.checkState(downloadUri != null, "Download URI is required in segment metadata upload mode");
createSegmentFileFromMultipart(multiPart, dstFile);
try {
URI segmentURI = new URI(downloadUri);
PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme());
segmentSizeInBytes = pinotFS.length(segmentURI);
} catch (Exception e) {
segmentSizeInBytes = -1;
LOGGER.warn("Could not fetch segment size for metadata push", e);
}
break;
default:
throw new UnsupportedOperationException("Unsupported upload type: " + uploadType);
Expand Down Expand Up @@ -302,7 +313,7 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl

// Zk operations
completeZkOperations(enableParallelPushProtection, headers, finalSegmentFile, tableNameWithType, segmentMetadata,
segmentName, zkDownloadUri, moveSegmentToFinalLocation, crypterClassName, allowRefresh);
segmentName, zkDownloadUri, moveSegmentToFinalLocation, crypterClassName, allowRefresh, segmentSizeInBytes);

return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType);
} catch (WebApplicationException e) {
Expand Down Expand Up @@ -397,15 +408,15 @@ private SegmentMetadata getSegmentMetadata(File tempDecryptedFile, File tempSegm

private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File uploadedSegmentFile,
String tableNameWithType, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI,
boolean moveSegmentToFinalLocation, String crypter, boolean allowRefresh)
boolean moveSegmentToFinalLocation, String crypter, boolean allowRefresh, long segmentSizeInBytes)
throws Exception {
String basePath = ControllerFilePathProvider.getInstance().getDataDirURI().toString();
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
URI finalSegmentLocationURI = URIUtils.getUri(basePath, rawTableName, URIUtils.encode(segmentName));
ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
zkOperator
.completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, uploadedSegmentFile,
enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, crypter, allowRefresh);
zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI,
uploadedSegmentFile, enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, crypter,
allowRefresh, segmentSizeInBytes);
}

private void decryptFile(String crypterClassName, File tempEncryptedFile, File tempDecryptedFile) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ZKOperator(PinotHelixResourceManager pinotHelixResourceManager, Controlle
public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata,
URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection,
HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter,
boolean allowRefresh)
boolean allowRefresh, long segmentSizeInBytes)
throws Exception {
String segmentName = segmentMetadata.getName();
ZNRecord segmentMetadataZNRecord =
Expand All @@ -76,7 +76,8 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata
}
LOGGER.info("Adding new segment {} from table {}", segmentName, tableNameWithType);
processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, headers,
crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation, enableParallelPushProtection);
crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation, enableParallelPushProtection,
segmentSizeInBytes);
return;
}

Expand All @@ -101,13 +102,13 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata
LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, tableNameWithType);
processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation,
enableParallelPushProtection, headers, zkDownloadURI, crypter, tableNameWithType, segmentName,
segmentMetadataZNRecord, moveSegmentToFinalLocation);
segmentMetadataZNRecord, moveSegmentToFinalLocation, segmentSizeInBytes);
}

private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI,
String crypter, String tableNameWithType, String segmentName, ZNRecord znRecord,
boolean moveSegmentToFinalLocation)
boolean moveSegmentToFinalLocation, long segmentSizeInBytes)
throws Exception {

SegmentZKMetadata existingSegmentZKMetadata = new SegmentZKMetadata(znRecord);
Expand Down Expand Up @@ -202,7 +203,7 @@ private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSe

_pinotHelixResourceManager
.refreshSegment(tableNameWithType, segmentMetadata, existingSegmentZKMetadata, expectedVersion,
zkDownloadURI, crypter);
zkDownloadURI, crypter, segmentSizeInBytes);
}
} catch (Exception e) {
if (!_pinotHelixResourceManager
Expand Down Expand Up @@ -234,10 +235,12 @@ private void checkCRC(HttpHeaders headers, String offlineTableName, String segme

private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, String crypter, String tableNameWithType,
String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection)
String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection,
long segmentSizeInBytes)
throws Exception {
SegmentZKMetadata newSegmentZKMetadata = _pinotHelixResourceManager
.constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter);
SegmentZKMetadata newSegmentZKMetadata =
_pinotHelixResourceManager.constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI,
crypter, segmentSizeInBytes);

// Lock if enableParallelPushProtection is true.
if (enableParallelPushProtection) {
Expand All @@ -253,7 +256,6 @@ private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegment
newSegmentZKMetadata
.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(newSegmentZKMetadata.getCustomMap()));
}

if (!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType, newSegmentZKMetadata)) {
throw new RuntimeException(
"Failed to create ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*/
package org.apache.pinot.controller.helix;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.httpclient.SimpleHttpConnectionManager;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
Expand All @@ -36,6 +40,7 @@
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
Expand All @@ -57,21 +62,28 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
private static final long DISABLED_TABLE_LOG_INTERVAL_MS = TimeUnit.DAYS.toMillis(1);
private static final ZNRecordSerializer RECORD_SERIALIZER = new ZNRecordSerializer();

private static final int TABLE_CHECKER_TIMEOUT_MS = 30_000;

private final int _waitForPushTimeSeconds;
private long _lastDisabledTableLogTimestamp = 0;

private TableSizeReader _tableSizeReader;

/**
* Constructs the segment status checker.
* @param pinotHelixResourceManager The resource checker used to interact with Helix
* @param config The controller configuration object
*/
public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics,
ExecutorService executorService) {
super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
controllerMetrics);

_waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
_tableSizeReader = new TableSizeReader(executorService, new SimpleHttpConnectionManager(true), _controllerMetrics,
_pinotHelixResourceManager);
}

@Override
Expand All @@ -96,6 +108,7 @@ protected Context preprocess() {
protected void processTable(String tableNameWithType, Context context) {
try {
updateSegmentMetrics(tableNameWithType, context);
updateTableSizeMetrics(tableNameWithType);
} catch (Exception e) {
LOGGER.error("Caught exception while updating segment status for table {}", tableNameWithType, e);
// Remove the metric for this table
Expand All @@ -110,6 +123,11 @@ protected void postprocess(Context context) {
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, context._disabledTableCount);
}

private void updateTableSizeMetrics(String tableNameWithType)
throws InvalidConfigException {
_tableSizeReader.getTableSizeDetails(tableNameWithType, TABLE_CHECKER_TIMEOUT_MS);
}

/**
* Runs a segment status pass over the given table.
* TODO: revisit the logic and reduce the ZK access
Expand Down Expand Up @@ -173,6 +191,7 @@ private void updateSegmentMetrics(String tableNameWithType, Context context) {
int nErrors = 0; // Keeps track of number of segments in error state
int nOffline = 0; // Keeps track of number segments with no online replicas
int nSegments = 0; // Counts number of segments
long tableCompressedSize = 0; // Tracks the total compressed segment size in deep store per table
for (String partitionName : segmentsExcludeReplaced) {
int nReplicas = 0;
int nIdeal = 0;
Expand All @@ -198,6 +217,12 @@ private void updateSegmentMetrics(String tableNameWithType, Context context) {
// Push is not finished yet, skip the segment
continue;
}
if (segmentZKMetadata != null) {
long sizeInBytes = segmentZKMetadata.getSizeInBytes();
if (sizeInBytes > 0) {
tableCompressedSize += sizeInBytes;
}
}
nReplicasIdealMax = (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? idealState
.getInstanceStateMap(partitionName).size() : nReplicasIdealMax;
if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) {
Expand Down Expand Up @@ -240,6 +265,9 @@ private void updateSegmentMetrics(String tableNameWithType, Context context) {
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
(nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE,
tableCompressedSize);

if (nOffline > 0) {
LOGGER.warn("Table {} has {} segments with no online replicas", tableNameWithType, nOffline);
}
Expand Down Expand Up @@ -287,6 +315,11 @@ public void cleanUpTask() {
setStatusToDefault();
}

@VisibleForTesting
void setTableSizeReader(TableSizeReader tableSizeReader) {
_tableSizeReader = tableSizeReader;
}

public static final class Context {
private boolean _logDisabledTables;
private int _realTimeTableCount;
Expand Down
Loading