Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,7 @@ private static void validateValidationConfig(TableConfig tableConfig, Schema sch

String peerSegmentDownloadScheme = validationConfig.getPeerSegmentDownloadScheme();
if (peerSegmentDownloadScheme != null) {
if (!CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)
&& !CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)) {
if (!isValidPeerDownloadScheme(peerSegmentDownloadScheme)) {
throw new IllegalStateException("Invalid value '" + peerSegmentDownloadScheme
+ "' for peerSegmentDownloadScheme. Must be one of http or https");
}
Expand All @@ -317,6 +316,11 @@ private static void validateValidationConfig(TableConfig tableConfig, Schema sch
validateRetentionConfig(tableConfig);
}

private static boolean isValidPeerDownloadScheme(String peerSegmentDownloadScheme) {
return CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)
|| CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme);
}

/**
* Validates the following:
* 1. validity of filter function
Expand Down Expand Up @@ -364,6 +368,21 @@ public static void validateIngestionConfig(TableConfig tableConfig, Schema schem
List<Map<String, String>> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps();
Preconditions.checkState(!streamConfigMaps.isEmpty(), "Must have at least 1 stream in REALTIME table");
// TODO: for multiple stream configs, validate them

boolean isPauselessEnabled = ingestionConfig.getStreamIngestionConfig().isPauselessConsumptionEnabled();
if (isPauselessEnabled) {
int replication = tableConfig.getReplication();
// We are checking for this only when replication is greater than 1 because in test environments
// users still prefer to create pauseless tables with replication 1
if (replication > 1) {
String peerSegmentDownloadScheme = tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
Preconditions.checkState(StringUtils.isNotEmpty(peerSegmentDownloadScheme) && isValidPeerDownloadScheme(
peerSegmentDownloadScheme),
"Must have a valid peerSegmentDownloadScheme set in validation config for pauseless consumption");
} else {
LOGGER.warn("It's not recommended to create pauseless tables with replication 1 for stability reasons.");
}
}
}

// Filter config
Expand Down
Loading