Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.broker;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;


public class FakeStreamConsumerFactory extends StreamConsumerFactory {
@Override
public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
return new FakePartitionLevelConsumer();
}

@Override
public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Set<String> fieldsToRead,
String groupId) {
return null;
}

@Override
public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
return new FakesStreamMetadataProvider();
}

@Override
public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
return new FakesStreamMetadataProvider();
}

public class FakePartitionLevelConsumer implements PartitionLevelConsumer {

@Override
public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis)
throws TimeoutException {
return null;
}

@Override
public void close()
throws IOException {
}
}

public class FakesStreamMetadataProvider implements StreamMetadataProvider {

@Override
public List<PartitionGroupMetadata> computePartitionGroupMetadata(String clientId, StreamConfig streamConfig,
List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses, int timeoutMillis)
throws IOException, TimeoutException {
return Collections.singletonList(new PartitionGroupMetadata(0, new LongMsgOffset(0)));
}

@Override
public int fetchPartitionCount(long timeoutMillis) {
return 1;
}

@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis)
throws TimeoutException {
return new LongMsgOffset(0);
}

@Override
public void close()
throws IOException {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public void setUp()
_helixResourceManager.addTable(offlineTableConfig);
TableConfig realtimeTimeConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
.setTimeType(TimeUnit.DAYS.name()).setStreamConfigs(getStreamConfigs()).build();
.setTimeType(TimeUnit.DAYS.name()).setStreamConfigs(getStreamConfigs()).setNumReplicas(1)
.build();
_helixResourceManager.addTable(realtimeTimeConfig);

for (int i = 0; i < NUM_OFFLINE_SEGMENTS; i++) {
Expand All @@ -118,10 +119,12 @@ public void setUp()
private Map<String, String> getStreamConfigs() {
Map<String, String> streamConfigs = new HashMap<>();
streamConfigs.put("streamType", "kafka");
streamConfigs.put("stream.kafka.consumer.type", "highLevel");
streamConfigs.put("stream.kafka.consumer.type", "lowlevel");
streamConfigs.put("stream.kafka.topic.name", "kafkaTopic");
streamConfigs.put("stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder");
streamConfigs.put("stream.kafka.consumer.factory.class.name",
"org.apache.pinot.broker.broker.FakeStreamConsumerFactory");
return streamConfigs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,6 @@ public void testGetReplication() {
offlineTableConfig.getValidationConfig().setReplicasPerPartition("3");
assertEquals(4, offlineTableConfig.getReplication());

TableConfig realtimeHLCTableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(TEST_REALTIME_HLC_TABLE_NAME)
.setStreamConfigs(getStreamConfigMap("highlevel")).setNumReplicas(2).build();
assertEquals(2, realtimeHLCTableConfig.getReplication());

realtimeHLCTableConfig.getValidationConfig().setReplication("4");
assertEquals(4, realtimeHLCTableConfig.getReplication());

realtimeHLCTableConfig.getValidationConfig().setReplicasPerPartition("3");
assertEquals(4, realtimeHLCTableConfig.getReplication());

TableConfig realtimeLLCTableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(TEST_REALTIME_LLC_TABLE_NAME)
.setStreamConfigs(getStreamConfigMap("lowlevel")).setLLC(true).setNumReplicas(2).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
import org.apache.pinot.controller.helix.core.retention.RetentionManager;
Expand All @@ -101,14 +100,18 @@
import org.apache.pinot.core.segment.processing.lifecycle.PinotSegmentLifecycleEventListenerManager;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
Expand Down Expand Up @@ -163,7 +166,6 @@ public abstract class BaseControllerStarter implements ServiceStartable {
protected TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> _taskManagerStatusCache;
protected PeriodicTaskScheduler _periodicTaskScheduler;
protected PinotHelixTaskResourceManager _helixTaskResourceManager;
protected PinotRealtimeSegmentManager _realtimeSegmentsManager;
protected PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
protected SegmentCompletionManager _segmentCompletionManager;
protected LeadControllerManager _leadControllerManager;
Expand Down Expand Up @@ -430,15 +432,6 @@ private void setUpPinotController() {
new SegmentCompletionManager(_helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
_leadControllerManager, _config.getSegmentCommitTimeoutSeconds());

if (_config.getHLCTablesAllowed()) {
LOGGER.info("Realtime tables with High Level consumers will be supported");
_realtimeSegmentsManager =
new PinotRealtimeSegmentManager(_helixResourceManager, _leadControllerManager, _config);
_realtimeSegmentsManager.start(_controllerMetrics);
} else {
LOGGER.info("Realtime tables with High Level consumers will NOT be supported");
_realtimeSegmentsManager = null;
}
_sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());

_connectionManager = new MultiThreadedHttpConnectionManager();
Expand Down Expand Up @@ -502,6 +495,24 @@ protected void configure() {

LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
_adminApp.start(_listenerConfigs);
List<String> existingHlcTables = new ArrayList<>();
_helixResourceManager.getAllRealtimeTables().forEach(rt -> {
TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
if (tableConfig != null) {
Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);
try {
StreamConfig.validateConsumerType(
streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), streamConfigMap);
} catch (Exception e) {
existingHlcTables.add(rt);
}
}
});
if (existingHlcTables.size() > 0) {
LOGGER.error("High Level Consumer (HLC) based realtime tables are no longer supported. Please delete the "
+ "following HLC tables before proceeding: {}\n", existingHlcTables);
throw new RuntimeException("Unable to start controller due to existing HLC tables!");
}

_controllerMetrics.addCallbackGauge("dataDir.exists", () -> new File(_config.getDataDir()).exists() ? 1L : 0L);
_controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
Expand Down Expand Up @@ -758,11 +769,6 @@ private void stopPinotController() {
LOGGER.info("Stopping Jersey admin API");
_adminApp.stop();

if (_realtimeSegmentsManager != null) {
LOGGER.info("Stopping realtime segment manager");
_realtimeSegmentsManager.stop();
}

LOGGER.info("Stopping resource manager");
_helixResourceManager.stop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ private static long getRandomInitialDelayInSeconds() {
private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
private static final boolean DEFAULT_ALLOW_HLC_TABLES = true;
// Disallow any high level consumer (HLC) table
private static final boolean DEFAULT_ALLOW_HLC_TABLES = false;
private static final String DEFAULT_CONTROLLER_MODE = ControllerMode.DUAL.name();
private static final String DEFAULT_LEAD_CONTROLLER_RESOURCE_REBALANCE_STRATEGY =
AutoRebalanceStrategy.class.getName();
Expand Down Expand Up @@ -993,11 +994,7 @@ public String getLeadControllerResourceRebalanceStrategy() {
}

public boolean getHLCTablesAllowed() {
return getProperty(ALLOW_HLC_TABLES, DEFAULT_ALLOW_HLC_TABLES);
}

public void setHLCTablesAllowed(boolean allowHLCTables) {
setProperty(ALLOW_HLC_TABLES, allowHLCTables);
return DEFAULT_ALLOW_HLC_TABLES;
}

public String getMetricsPrefix() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ private enum LineageUpdateType {
private final String _dataDir;
private final boolean _isSingleTenantCluster;
private final boolean _enableBatchMessageMode;
private final boolean _allowHLCTables;
private final int _deletedSegmentsRetentionInDays;
private final boolean _enableTieredSegmentAssignment;

Expand All @@ -235,15 +234,14 @@ private enum LineageUpdateType {
private final LineageManager _lineageManager;

public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir,
boolean isSingleTenantCluster, boolean enableBatchMessageMode, boolean allowHLCTables,
boolean isSingleTenantCluster, boolean enableBatchMessageMode,
int deletedSegmentsRetentionInDays, boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
_helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
_helixClusterName = helixClusterName;
_dataDir = dataDir;
_isSingleTenantCluster = isSingleTenantCluster;
_enableBatchMessageMode = enableBatchMessageMode;
_deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
_allowHLCTables = allowHLCTables;
_enableTieredSegmentAssignment = enableTieredSegmentAssignment;
_instanceAdminEndpointCache =
CacheBuilder.newBuilder().expireAfterWrite(CACHE_ENTRY_EXPIRE_TIME_HOURS, TimeUnit.HOURS)
Expand All @@ -265,8 +263,8 @@ public String load(String instanceId) {
public PinotHelixResourceManager(ControllerConf controllerConf) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(),
controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(),
controllerConf.getHLCTablesAllowed(), controllerConf.getDeletedSegmentsRetentionInDays(),
controllerConf.tieredSegmentAssignmentEnabled(), LineageManagerFactory.create(controllerConf));
controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(),
LineageManagerFactory.create(controllerConf));
}

/**
Expand Down Expand Up @@ -1753,7 +1751,7 @@ private void verifyStreamConfig(String tableNameWithType, TableConfig tableConfi
// Check if HLC table is allowed.
StreamConfig streamConfig =
new StreamConfig(tableNameWithType, IngestionConfigUtils.getStreamConfigMap(tableConfig));
if (streamConfig.hasHighLevelConsumerType() && !_allowHLCTables) {
if (streamConfig.hasHighLevelConsumerType()) {
throw new InvalidTableConfigException(
"Creating HLC realtime table is not allowed for Table: " + tableNameWithType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public class PinotTableRestletResourceTest extends ControllerTest {
private static final String OFFLINE_TABLE_NAME = "testOfflineTable";
private static final String REALTIME_TABLE_NAME = "testRealtimeTable";
private final TableConfigBuilder _offlineBuilder = new TableConfigBuilder(TableType.OFFLINE);
private final TableConfigBuilder _realtimeBuilder = new TableConfigBuilder(TableType.REALTIME);
private final TableConfigBuilder _realtimeBuilder = new TableConfigBuilder(TableType.REALTIME)
.setStreamConfigs(Map.of("stream.type", "foo", "consumer.type", "lowlevel"));
private String _createTableUrl;

@BeforeClass
Expand All @@ -72,7 +73,7 @@ public void setUp()

// add schema for realtime table
DEFAULT_INSTANCE.addDummySchema(REALTIME_TABLE_NAME);
StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs();
StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
_realtimeBuilder.setTableName(REALTIME_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS")
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setSchemaName(REALTIME_TABLE_NAME)
.setStreamConfigs(streamConfig.getStreamConfigsMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void setUp()

// add schema for realtime table
DEFAULT_INSTANCE.addDummySchema(HYBRID_TABLE_NAME);
StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs();
StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(4);
tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(HYBRID_TABLE_NAME)
.setNumReplicas(DEFAULT_MIN_NUM_REPLICAS).setStreamConfigs(streamConfig.getStreamConfigsMap()).build();
DEFAULT_INSTANCE.getHelixResourceManager().addTable(tableConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public void testGetLiveBrokers()
tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
.setServerTenant(SERVER_TENANT_NAME)
.setStreamConfigs(FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs().getStreamConfigsMap()).build();
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
waitForEVToDisappear(tableConfig.getTableName());
_helixResourceManager.addTable(tableConfig);
waitForTableOnlineInBrokerResourceEV(REALTIME_TABLE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,6 @@ public static StreamConfig getDefaultLowLevelStreamConfigs() {
return getDefaultLowLevelStreamConfigs(DEFAULT_NUM_PARTITIONS);
}

/**
* Generate fake stream configs for high level stream
*/
public static StreamConfig getDefaultHighLevelStreamConfigs() {
Map<String, String> streamConfigMap = getDefaultStreamConfigs();
streamConfigMap
.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_CONSUMER_TYPES),
StreamConfig.ConsumerType.HIGHLEVEL.toString());

return new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
}

private static Map<String, String> getDefaultStreamConfigs() {
Map<String, String> streamConfigMap = new HashMap<>();
streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, STREAM_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int
@Override
public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Set<String> fieldsToRead,
String groupId) {
return new FakeStreamLevelConsumer();
throw new UnsupportedOperationException("Pinot no longer support stream level consumers!");
}

@Override
Expand Down
Loading