Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a server level config for segment server upload to deep store. #14093

Merged
merged 6 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
Expand Down Expand Up @@ -54,7 +55,14 @@ public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProt
public SegmentCommitter createSegmentCommitter(SegmentCompletionProtocol.Request.Params params,
String controllerVipUrl)
throws URISyntaxException {
boolean uploadToFs = _streamConfig.isServerUploadToDeepStore();
InstanceDataManagerConfig instanceDataManagerConfig = _indexLoadingConfig.getInstanceDataManagerConfig();

boolean uploadToFs = instanceDataManagerConfig.isUploadSegmentToDeepStore();
Boolean streamConfigServerUploadToDeepStore = _streamConfig.isServerUploadToDeepStore();
if (streamConfigServerUploadToDeepStore != null) {
uploadToFs = streamConfigServerUploadToDeepStore;
}

String peerSegmentDownloadScheme = _tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
String segmentStoreUri = _indexLoadingConfig.getSegmentStoreURI();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
Expand Down Expand Up @@ -62,9 +64,10 @@ public void testSplitSegmentCommitterIsDefault()
ServerSegmentCompletionProtocolHandler protocolHandler =
new ServerSegmentCompletionProtocolHandler(Mockito.mock(ServerMetrics.class), "test_REALTIME");
String controllerVipUrl = "http://localhost:1234";
IndexLoadingConfig indexLoadingConfig = mockIndexLoadConfig();
SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params();
SegmentCommitterFactory factory = new SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config,
Mockito.mock(IndexLoadingConfig.class), Mockito.mock(ServerMetrics.class));
indexLoadingConfig, Mockito.mock(ServerMetrics.class));
SegmentCommitter committer = factory.createSegmentCommitter(requestParams, controllerVipUrl);
Assert.assertNotNull(committer);
Assert.assertTrue(committer instanceof SplitSegmentCommitter);
Expand All @@ -83,9 +86,8 @@ public void testUploadToDeepStoreConfig()
Map<String, String> streamConfigMap = new HashMap<>(getMinimumStreamConfigMap());
streamConfigMap.put(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, "true");
TableConfig config = createRealtimeTableConfig("testDeepStoreConfig", streamConfigMap).build();
IndexLoadingConfig indexLoadingConfig = Mockito.mock(IndexLoadingConfig.class);
Mockito.when(indexLoadingConfig.getSegmentStoreURI()).thenReturn("file:///path/to/segment/store.txt");

// Create and set up the mocked IndexLoadingConfig and InstanceDataManager
IndexLoadingConfig indexLoadingConfig = mockIndexLoadConfig();
SegmentCommitterFactory factory = new SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config,
indexLoadingConfig, Mockito.mock(ServerMetrics.class));
SegmentCommitter committer = factory.createSegmentCommitter(requestParams, controllerVipUrl);
Expand All @@ -107,4 +109,14 @@ public void testUploadToDeepStoreConfig()
Assert.assertTrue(committer instanceof SplitSegmentCommitter);
Assert.assertTrue(((SplitSegmentCommitter) committer).getSegmentUploader() instanceof PinotFSSegmentUploader);
}

private IndexLoadingConfig mockIndexLoadConfig() {
IndexLoadingConfig indexLoadingConfig = Mockito.mock(IndexLoadingConfig.class);
InstanceDataManagerConfig instanceDataManagerConfig = Mockito.mock(InstanceDataManagerConfig.class);
Mockito.when(indexLoadingConfig.getInstanceDataManagerConfig()).thenReturn(instanceDataManagerConfig);
PinotConfiguration pinotConfiguration = Mockito.mock(PinotConfiguration.class);
Mockito.when(instanceDataManagerConfig.getConfig()).thenReturn(pinotConfiguration);

return indexLoadingConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
private static final String EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS = "external.view.dropped.max.wait.ms";
private static final String EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS = "external.view.dropped.check.interval.ms";

public static final String UPLOAD_SEGMENT_TO_DEEP_STORE = "segment.upload.to.deep.store";
public static final boolean DEFAULT_UPLOAD_SEGMENT_TO_DEEP_STORE = false;

private final static String[] REQUIRED_KEYS = {INSTANCE_ID};
private static final long DEFAULT_ERROR_CACHE_SIZE = 100L;
private static final int DEFAULT_DELETED_SEGMENTS_CACHE_SIZE = 10_000;
Expand Down Expand Up @@ -330,4 +333,9 @@ public PinotConfiguration getAuthConfig() {
public Map<String, Map<String, String>> getTierConfigs() {
return _tierConfigs;
}

@Override
public boolean isUploadSegmentToDeepStore() {
return _serverConfig.getProperty(UPLOAD_SEGMENT_TO_DEEP_STORE, DEFAULT_UPLOAD_SEGMENT_TO_DEEP_STORE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,6 @@ public interface InstanceDataManagerConfig {
PinotConfiguration getAuthConfig();

Map<String, Map<String, String>> getTierConfigs();

boolean isUploadSegmentToDeepStore();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.TimeUtils;
Expand All @@ -41,7 +42,6 @@ public class StreamConfig {
public static final long DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS = TimeUnit.MILLISECONDS.convert(6, TimeUnit.HOURS);
public static final long DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES = 200 * 1024 * 1024; // 200M
public static final int DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS = 100_000;
public static final String DEFAULT_SERVER_UPLOAD_TO_DEEPSTORE = "false";

public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING =
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory";
Expand Down Expand Up @@ -79,10 +79,11 @@ public class StreamConfig {
// Allow overriding it to use different offset criteria
private OffsetCriteria _offsetCriteria;

// Indicates if the segment should be uploaded to the deep store's file system or to the controller during the
// segment commit protocol. By default, segment is uploaded to the controller during commit.
// If this flag is set to true, the segment is uploaded to deep store.
private final boolean _serverUploadToDeepStore;
// Indicate StreamConfig flag for table if segment should be uploaded to the deep store's file system or to the
// controller during the segment commit protocol. if config is not present in Table StreamConfig
// _serverUploadToDeepStore is null and method isServerUploadToDeepStore() overrides the default value with Server
// level config
private final Boolean _serverUploadToDeepStore;

/**
* Initializes a StreamConfig using the map of stream configs from the table config
Expand Down Expand Up @@ -175,9 +176,9 @@ public StreamConfig(String tableNameWithType, Map<String, String> streamConfigMa
_flushThresholdSegmentRows = extractFlushThresholdSegmentRows(streamConfigMap);
_flushThresholdTimeMillis = extractFlushThresholdTimeMillis(streamConfigMap);
_flushThresholdSegmentSizeBytes = extractFlushThresholdSegmentSize(streamConfigMap);
_serverUploadToDeepStore = Boolean.parseBoolean(
streamConfigMap.getOrDefault(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
DEFAULT_SERVER_UPLOAD_TO_DEEPSTORE));
_serverUploadToDeepStore = streamConfigMap.containsKey(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE)
? Boolean.valueOf(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE))
: null;

int autotuneInitialRows = 0;
String initialRowsValue = streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS);
Expand Down Expand Up @@ -214,7 +215,8 @@ public static void validateConsumerType(String streamType, Map<String, String> s
}
}

public boolean isServerUploadToDeepStore() {
@Nullable
public Boolean isServerUploadToDeepStore() {
return _serverUploadToDeepStore;
}

Expand Down Expand Up @@ -416,7 +418,7 @@ public boolean equals(Object o) {
&& _flushThresholdSegmentSizeBytes == that._flushThresholdSegmentSizeBytes
&& _flushAutotuneInitialRows == that._flushAutotuneInitialRows
&& Double.compare(_topicConsumptionRateLimit, that._topicConsumptionRateLimit) == 0
&& _serverUploadToDeepStore == that._serverUploadToDeepStore && Objects.equals(_type, that._type)
&& Objects.equals(_serverUploadToDeepStore, that._serverUploadToDeepStore) && Objects.equals(_type, that._type)
&& Objects.equals(_topicName, that._topicName) && Objects.equals(_tableNameWithType, that._tableNameWithType)
&& Objects.equals(_consumerFactoryClassName, that._consumerFactoryClassName) && Objects.equals(_decoderClass,
that._decoderClass) && Objects.equals(_decoderProperties, that._decoderProperties) && Objects.equals(_groupId,
Expand Down
Loading