Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pinot_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ jobs:
matrix:
test_suite: [ "compatibility-verifier/sample-test-suite" ]
old_commit: [
"release-1.0.0", "release-1.3.0", "master"
"release-1.2.0", "release-1.3.0", "master"
]
name: Pinot Compatibility Regression Testing against ${{ matrix.old_commit }} on ${{ matrix.test_suite }}
steps:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{
"streamType": "kafka",
"stream.kafka.consumer.type": "simple",
"topicName": "PinotRealtimeFeatureTest2Event",
"partitionColumn": "longDimSV1",
"numPartitions": "1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
"realtime.segment.flush.threshold.time": "1h",
"streamType": "kafka",
"stream.kafka.topic.name": "PinotRealtimeFeatureTest2Event",
"stream.kafka.consumer.type": "simple",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "localhost:19092",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{
"streamType": "kafka",
"stream.kafka.consumer.type": "simple",
"topicName": "PinotRealtimeFeatureTest2Event",
"partitionColumn": "longDimSV1",
"numPartitions": "1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
"realtime.segment.flush.threshold.time": "1h",
"streamType": "kafka",
"stream.kafka.topic.name": "PinotRealtimeFeatureTest2Event",
"stream.kafka.consumer.type": "simple",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "localhost:19092",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{
"streamType": "kafka",
"stream.kafka.consumer.type": "simple",
"topicName": "PinotRealtimeFeatureTest3Event",
"partitionColumn": "longDimSV1",
"numPartitions": "3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
"realtime.segment.flush.threshold.time": "1h",
"streamType": "kafka",
"stream.kafka.topic.name": "PinotRealtimeFeatureTest3Event",
"stream.kafka.consumer.type": "simple",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "localhost:19092",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "simple",
"stream.kafka.topic.name": "flights-realtime",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
"stream.kafka.topic.name": "meetupRSVPEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
Expand Down
1 change: 0 additions & 1 deletion helm/pinot/pinot-github-events-setup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ data:
],
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "simple",
"stream.kafka.topic.name": "pullRequestMergedEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
Expand Down
2 changes: 0 additions & 2 deletions helm/pinot/pinot-realtime-quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ data:
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "simple",
"stream.kafka.topic.name": "flights-realtime",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
Expand Down Expand Up @@ -80,7 +79,6 @@ data:
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "simple",
"stream.kafka.topic.name": "flights-realtime-avro",
"stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder",
"stream.kafka.decoder.prop.schema": "{\"type\":\"record\",\"name\":\"Flight\",\"namespace\":\"pinot\",\"fields\":[{\"name\":\"DaysSinceEpoch\",\"type\":[\"int\"]},{\"name\":\"Year\",\"type\":[\"int\"]},{\"name\":\"Quarter\",\"type\":[\"int\"]},{\"name\":\"Month\",\"type\":[\"int\"]},{\"name\":\"DayofMonth\",\"type\":[\"int\"]},{\"name\":\"DayOfWeek\",\"type\":[\"int\"]},{\"name\":\"FlightDate\",\"type\":[\"string\"]},{\"name\":\"UniqueCarrier\",\"type\":[\"string\"]},{\"name\":\"AirlineID\",\"type\":[\"int\"]},{\"name\":\"Carrier\",\"type\":[\"string\"]},{\"name\":\"TailNum\",\"type\":[\"string\",\"null\"]},{\"name\":\"FlightNum\",\"type\":[\"int\"]},{\"name\":\"OriginAirportID\",\"type\":[\"int\"]},{\"name\":\"OriginAirportSeqID\",\"type\":[\"int\"]},{\"name\":\"OriginCityMarketID\",\"type\":[\"int\"]},{\"name\":\"Origin\",\"type\":[\"string\"]},{\"name\":\"OriginCityName\",\"type\":[\"string\"]},{\"name\":\"OriginState\",\"type\":[\"string\"]},{\"name\":\"OriginStateFips\",\"type\":[\"int\"]},{\"name\":\"OriginStateName\",\"type\":[\"string\"]},{\"name\":\"OriginWac\",\"type\":[\"int\"]},{\"name\":\"DestAirportID\",\"type\":[\"int\"]},{\"name\":\"DestAirportSeqID\",\"type\":[\"int\"]},{\"name\":\"DestCityMarketID\",\"type\":[\"int\"]},{\"name\":\"Dest\",\"type\":[\"string\"]},{\"name\":\"DestCityName\",\"type\":[\"string\"]},{\"name\":\"DestState\",\"type\":[\"string\"]},{\"name\":\"DestStateFips\",\"type\":[\"int\"]},{\"name\":\"DestStateName\",\"type\":[\"string\"]},{\"name\":\"DestWac\",\"type\":[\"int\"]},{\"name\":\"CRSDepTime\",\"type\":[\"int\"]},{\"name\":\"DepTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"DepDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"DepDelayMinutes\",\"type\":[\"int\",\"null\"]},{\"name\":\"DepDel15\",\"type\":[\"int\",\"null\"]},{\"name\":\"DepartureDelayGroups\",\"type\":[\"int\",\"null\"]},{\"name\":\"DepTimeBlk\",\"type\":[\"string\"]},{\"name\":\"TaxiOut\",\"type\":[\"int\",\"null\"]},{\"name\":\"WheelsOff\",\"type\":[\"int\",\"null\"]},{\"name\":\"WheelsOn\",\"type\":[\"int\",\"null\"]},{\"name\":\"TaxiIn\",\"type\":[\"int\",\"null\"]},{\"name\":\"CRSArrTime\",\"type\":[\"int\"]},{\"name\":\"ArrTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"ArrDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"ArrDelayMinutes\",\"type\":[\"int\",\"null\"]},{\"name\":\"ArrDel15\",\"type\":[\"int\",\"null\"]},{\"name\":\"ArrivalDelayGroups\",\"type\":[\"int\",\"null\"]},{\"name\":\"ArrTimeBlk\",\"type\":[\"string\"]},{\"name\":\"Cancelled\",\"type\":[\"int\"]},{\"name\":\"CancellationCode\",\"type\":[\"string\",\"null\"]},{\"name\":\"Diverted\",\"type\":[\"int\"]},{\"name\":\"CRSElapsedTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"ActualElapsedTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"AirTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"Flights\",\"type\":[\"int\"]},{\"name\":\"Distance\",\"type\":[\"int\"]},{\"name\":\"DistanceGroup\",\"type\":[\"int\"]},{\"name\":\"CarrierDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"WeatherDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"NASDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"SecurityDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"LateAircraftDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"FirstDepTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"TotalAddGTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"LongestAddGTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"DivAirportLandings\",\"type\":[\"int\"]},{\"name\":\"DivReachedDest\",\"type\":[\"int\",\"null\"]},{\"name\":\"DivActualElapsedTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"DivArrDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"DivDistance\",\"type\":[\"int\",\"null\"]},{\"name\":\"DivAirports\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"DivAirportIDs\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"DivAirportSeqIDs\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"DivWheelsOns\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"DivTotalGTimes\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"DivLongestGTimes\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"DivWheelsOffs\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"DivTailNums\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"RandomAirports\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public void setUp()
private Map<String, String> getStreamConfigs() {
Map<String, String> streamConfigs = new HashMap<>();
streamConfigs.put("streamType", "kafka");
streamConfigs.put("stream.kafka.consumer.type", "lowlevel");
streamConfigs.put("stream.kafka.topic.name", "kafkaTopic");
streamConfigs.put("stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
"realtime.segment.flush.threshold.time": "864",
"streamType" : "kafka",
"stream.kafka.broker.list" : "localhost:9092",
"stream.kafka.consumer.type" : "simple",
"stream.kafka.consumer.prop.auto.offset.reset": "largest",
"stream.kafka.topic.name" : "demand_topic"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
Expand Down Expand Up @@ -591,26 +588,6 @@ protected void configure() {

LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
_adminApp.start(_listenerConfigs);
List<String> existingHlcTables = new ArrayList<>();
_helixResourceManager.getAllRealtimeTables().forEach(rt -> {
TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
if (tableConfig != null) {
List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig);
try {
for (Map<String, String> streamConfigMap : streamConfigMaps) {
StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"),
streamConfigMap);
}
} catch (Exception e) {
existingHlcTables.add(rt);
}
}
});
if (existingHlcTables.size() > 0) {
LOGGER.error("High Level Consumer (HLC) based realtime tables are no longer supported. Please delete the "
+ "following HLC tables before proceeding: {}\n", existingHlcTables);
throw new RuntimeException("Unable to start controller due to existing HLC tables!");
}

// One time job to fix schema name in all tables
// This method can be removed after the next major release.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,6 @@ private static long getRandomInitialDelayInSeconds() {
private static final long DEFAULT_RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY = 300L; // 5 minutes
private static final long DEFAULT_RESOURCE_UTILIZATION_CHECKER_FREQUENCY = 300L; // 5 minutes
private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
// Disallow any high level consumer (HLC) table
private static final boolean DEFAULT_ALLOW_HLC_TABLES = false;
private static final String DEFAULT_CONTROLLER_MODE = ControllerMode.DUAL.name();
private static final String DEFAULT_LEAD_CONTROLLER_RESOURCE_REBALANCE_STRATEGY =
AutoRebalanceStrategy.class.getName();
Expand Down Expand Up @@ -1158,10 +1156,6 @@ public int getLeadControllerResourceRebalanceDelayMs() {
DEFAULT_LEAD_CONTROLLER_RESOURCE_REBALANCE_DELAY_MS);
}

public boolean getHLCTablesAllowed() {
return DEFAULT_ALLOW_HLC_TABLES;
}

public String getMetricsPrefix() {
return getProperty(CONFIG_OF_CONTROLLER_METRICS_PREFIX, DEFAULT_METRICS_PREFIX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Props = {
streamConfigsObj: Object
};

const compulsoryKeys = ["stream.kafka.broker.list","stream.kafka.topic.name","stream.kafka.consumer.type","stream.kafka.decoder.class.name"];
const compulsoryKeys = ["stream.kafka.broker.list","stream.kafka.topic.name","stream.kafka.decoder.class.name"];

export default function AddDeleteComponent({
changeHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ export default function AddIngestionComponent({
"streamType": "kafka",
"stream.kafka.topic.name": "",
"stream.kafka.broker.list": "",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ export default function AddRealTimeIngestionComponent({
"streamType": "kafka",
"stream.kafka.topic.name": "",
"stream.kafka.broker.list": "",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ const defaultTableObj = {
"streamType": "kafka",
"stream.kafka.topic.name": "",
"stream.kafka.broker.list": "",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
Expand Down Expand Up @@ -220,7 +219,7 @@ const checkFields = (tableObj,fields) => {
}

const validateTableConfig = async () => {
const fields = [{key:"tableName",label:"Table Name"},{key:"tableType",label:"Table Type"},{key:"stream.kafka.broker.list",label:"stream.kafka.broker.list"},{key:"stream.kafka.topic.name",label:"stream.kafka.topic.name"},{key:"stream.kafka.consumer.type",label:"stream.kafka.consumer.type"},{key:"stream.kafka.decoder.class.name",label:"stream.kafka.decoder.class.name"}];
const fields = [{key:"tableName",label:"Table Name"},{key:"tableType",label:"Table Type"},{key:"stream.kafka.broker.list",label:"stream.kafka.broker.list"},{key:"stream.kafka.topic.name",label:"stream.kafka.topic.name"},{key:"stream.kafka.decoder.class.name",label:"stream.kafka.decoder.class.name"}];
await checkFields(tableObj,fields);
if(isError){
isError = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ private Map<String, String> getStreamConfigs() {
Map<String, String> streamConfigs = new HashMap<>();
streamConfigs.put("streamType", "kafka");
streamConfigs.put("stream.kafka.topic.name", "kafkaTopic");
streamConfigs.put("stream.kafka.consumer.type", "simple");
streamConfigs.put("stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder");
streamConfigs.put("stream.kafka.consumer.factory.class.name",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,8 @@ ConsumingSegmentInfoReader.ConsumingSegmentInfo getConsumingSegmentInfoForServer
}

Map<String, String> getStreamConfigMap() {
return Map.of("streamType", "kafka", "stream.kafka.consumer.type", "simple", "stream.kafka.topic.name", "test",
"stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name",
return Map.of("streamType", "kafka", "stream.kafka.topic.name", "test", "stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder", "stream.kafka.consumer.factory.class.name",
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,14 @@
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import static org.testng.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;


@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -467,9 +472,8 @@ public void realtimeImmutableSegmentHasLessReplicaTest() {
}

private Map<String, String> getStreamConfigMap() {
return Map.of("streamType", "kafka", "stream.kafka.consumer.type", "simple", "stream.kafka.topic.name", "test",
"stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name",
return Map.of("streamType", "kafka", "stream.kafka.topic.name", "test", "stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder", "stream.kafka.consumer.factory.class.name",
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
"stream.kafka.clusterGroup": "aggregate-tracking",
"stream.kafka.consumer.factory.class.name": "com.linkedin.pinot.v2.server.LiPinotKafkaConsumerFactory",
"stream.kafka.consumer.prop.auto.offset.reset": "largest",
"stream.kafka.consumer.type": "simple",
"stream.kafka.decoder.class.name": "com.linkedin.pinot.v2.server.LiKafkaDecoder",
"stream.kafka.topic.name": "UserGeneratedContentGestureCountEvent",
"streamType": "kafka"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
"stream.kafka.clusterGroup": "aggregate-tracking",
"stream.kafka.consumer.factory.class.name": "com.linkedin.pinot.v2.server.LiPinotKafkaConsumerFactory",
"stream.kafka.consumer.prop.auto.offset.reset": "largest",
"stream.kafka.consumer.type": "simple",
"stream.kafka.decoder.class.name": "com.linkedin.pinot.v2.server.LiKafkaDecoder",
"stream.kafka.topic.name": "UserGeneratedContentGestureCountEvent",
"streamType": "kafka"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.pinot.core.data.manager.realtime;

import com.google.common.collect.ImmutableMap;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -42,11 +41,8 @@
public class SegmentCommitterFactoryTest {

private Map<String, String> getMinimumStreamConfigMap() {
return ImmutableMap.of(
"streamType", "kafka",
"stream.kafka.consumer.type", "simple",
"stream.kafka.topic.name", "ignore",
"stream.kafka.decoder.class.name", "org.apache.pinot.plugin.inputformat.json.JsonMessageDecoder");
return Map.of("streamType", "kafka", "stream.kafka.topic.name", "ignore", "stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.inputformat.json.JsonMessageDecoder");
}

private TableConfigBuilder createRealtimeTableConfig(String tableName) {
Expand Down
Loading
Loading