Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a server level config for segment server upload to deep store. #14093

Merged
merged 6 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add a server level config for segment server upload to deep store.
  • Loading branch information
raghavyadav01 committed Sep 26, 2024
commit 55e071c9fe4d9c3fa117ee033111cda427a64854
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.slf4j.Logger;

Expand Down Expand Up @@ -54,7 +57,13 @@ public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProt
public SegmentCommitter createSegmentCommitter(SegmentCompletionProtocol.Request.Params params,
String controllerVipUrl)
throws URISyntaxException {
boolean uploadToFs = _streamConfig.isServerUploadToDeepStore();
InstanceDataManagerConfig instanceDataManagerConfig = _indexLoadingConfig.getInstanceDataManagerConfig();
PinotConfiguration config = instanceDataManagerConfig.getConfig();
boolean defaultSegmentUploadToDeepStore = config.getProperty(
CommonConstants.Segment.CONFIG_SEGMENT_SERVER_UPLOAD_TO_DEEP_STORE,
CommonConstants.Segment.DEFAULT_SEGMENT_SERVER_UPLOAD_TO_DEEP_STORE);

boolean uploadToFs = _streamConfig.isServerUploadToDeepStore(defaultSegmentUploadToDeepStore);
String peerSegmentDownloadScheme = _tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
String segmentStoreUri = _indexLoadingConfig.getSegmentStoreURI();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class StreamConfig {
public static final long DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS = TimeUnit.MILLISECONDS.convert(6, TimeUnit.HOURS);
public static final long DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES = 200 * 1024 * 1024; // 200M
public static final int DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS = 100_000;
public static final String DEFAULT_SERVER_UPLOAD_TO_DEEPSTORE = "false";

public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING =
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory";
Expand Down Expand Up @@ -79,10 +78,11 @@ public class StreamConfig {
// Allow overriding it to use different offset criteria
private OffsetCriteria _offsetCriteria;

// Indicates if the segment should be uploaded to the deep store's file system or to the controller during the
// segment commit protocol. By default, segment is uploaded to the controller during commit.
// If this flag is set to true, the segment is uploaded to deep store.
private final boolean _serverUploadToDeepStore;
// Indicate StreamConfig flag for table if segment should be uploaded to the deep store's file system or to the
// controller during the segment commit protocol. if config is not present in Table StreamConfig
// _serverUploadToDeepStore is null and method isServerUploadToDeepStore() overrides the default value with Server
// level config
private final String _serverUploadToDeepStore;

/**
* Initializes a StreamConfig using the map of stream configs from the table config
Expand Down Expand Up @@ -175,9 +175,7 @@ public StreamConfig(String tableNameWithType, Map<String, String> streamConfigMa
_flushThresholdSegmentRows = extractFlushThresholdSegmentRows(streamConfigMap);
_flushThresholdTimeMillis = extractFlushThresholdTimeMillis(streamConfigMap);
_flushThresholdSegmentSizeBytes = extractFlushThresholdSegmentSize(streamConfigMap);
_serverUploadToDeepStore = Boolean.parseBoolean(
streamConfigMap.getOrDefault(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
DEFAULT_SERVER_UPLOAD_TO_DEEPSTORE));
_serverUploadToDeepStore = streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE);

int autotuneInitialRows = 0;
String initialRowsValue = streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS);
Expand Down Expand Up @@ -214,8 +212,9 @@ public static void validateConsumerType(String streamType, Map<String, String> s
}
}

public boolean isServerUploadToDeepStore() {
return _serverUploadToDeepStore;
public boolean isServerUploadToDeepStore(boolean defaultServerUploadToDeepStore) {
return _serverUploadToDeepStore == null ? defaultServerUploadToDeepStore
: Boolean.parseBoolean(_serverUploadToDeepStore);
}

private long extractFlushThresholdSegmentSize(Map<String, String> streamConfigMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,9 @@ public static class Offline {
*/
public static final String SEGMENT_UPLOAD_START_TIME = "segment.upload.start.time";

public static final String CONFIG_SEGMENT_SERVER_UPLOAD_TO_DEEP_STORE = "segment.server.upload.to.deep.store";
public static final boolean DEFAULT_SEGMENT_SERVER_UPLOAD_TO_DEEP_STORE = false;

public static final String SEGMENT_BACKUP_DIR_SUFFIX = ".segment.bak";
public static final String SEGMENT_TEMP_DIR_SUFFIX = ".segment.tmp";

Expand Down
Loading