Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,28 +337,13 @@ public void testSerDe()
assertEquals(tunerConfigToCompare.getName(), name);
assertEquals(tunerConfigToCompare.getTunerProperties(), props);
}
{
// disable handling null value in time column
TableConfig tableConfig = tableConfigBuilder.setTimeColumnName("timeColumn").build();
checkNullTimeValueHandling(JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class), false);
checkNullTimeValueHandling(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)), false);

// enable handling null value in time column
tableConfig = tableConfigBuilder.setAllowNullTimeValue(true).setTimeColumnName("timeColumn").build();
checkNullTimeValueHandling(JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class), true);
checkNullTimeValueHandling(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)), true);
}
}

private void checkSegmentsValidationAndRetentionConfig(TableConfig tableConfig) {
// TODO validate other fields of SegmentsValidationAndRetentionConfig.
assertEquals(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), CommonConstants.HTTP_PROTOCOL);
}

private void checkNullTimeValueHandling(TableConfig tableConfig, boolean expected) {
assertEquals(tableConfig.getValidationConfig().isAllowNullTimeValue(), expected);
}

private void checkDefaultTableConfig(TableConfig tableConfig) {
// Check mandatory fields
assertEquals(tableConfig.getTableName(), "testTable_OFFLINE");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
import org.apache.pinot.spi.stream.PermanentConsumerException;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
Expand Down Expand Up @@ -83,53 +82,83 @@ public class LLRealtimeSegmentDataManagerTest {
private static final long START_OFFSET_VALUE = 19885L;
private static final LongMsgOffset START_OFFSET = new LongMsgOffset(START_OFFSET_VALUE);
private static final String TOPIC_NAME = "someTopic";
private static final String CONSUMER_FACTORY_CLASS = FakeStreamConsumerFactory.class.getName();
private static final String MESSAGE_DECODER_CLASS = FakeStreamMessageDecoder.class.getName();
private static final int MAX_ROWS_IN_SEGMENT = 250000;
private static final long MAX_TIME_FOR_SEGMENT_CLOSE_MS = 64368000L;
private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new ConcurrentHashMap<>();

private static long _timeNow = System.currentTimeMillis();
//@formatter:off
private static final String TABLE_CONFIG_JSON =
"{"
+ " \"metadata\":{},"
+ " \"segmentsConfig\":{"
+ " \"replicasPerPartition\":\"3\","
+ " \"replication\":\"3\","
+ " \"replicationNumber\":3,"
+ " \"retentionTimeUnit\":\"DAYS\","
+ " \"retentionTimeValue\":\"3\","
+ " \"schemaName\":\"testSchema\","
+ " \"segmentAssignmentStrategy\":\"BalanceNumSegmentAssignmentStrategy\","
+ " \"segmentPushFrequency\":\"daily\","
+ " \"segmentPushType\":\"APPEND\","
+ " \"timeColumnName\":\"minutesSinceEpoch\","
+ " \"timeType\":\"MINUTES\""
+ " },"
+ " \"tableIndexConfig\":{"
+ " \"invertedIndexColumns\":[],"
+ " \"lazyLoad\":\"false\","
+ " \"loadMode\":\"HEAP\","
+ " \"segmentFormatVersion\":null,"
+ " \"sortedColumn\":[],"
+ " \"streamConfigs\":{"
+ " \"realtime.segment.flush.threshold.rows\":\"" + MAX_ROWS_IN_SEGMENT + "\","
+ " \"realtime.segment.flush.threshold.time\":\"" + MAX_TIME_FOR_SEGMENT_CLOSE_MS + "\","
+ " \"stream.fakeStream.broker.list\":\"broker:7777\","
+ " \"stream.fakeStream.consumer.prop.auto.offset.reset\":\"smallest\","
+ " \"stream.fakeStream.consumer.type\":\"simple\","
+ " \"stream.fakeStream.consumer.factory.class.name\":\"" + CONSUMER_FACTORY_CLASS + "\","
+ " \"stream.fakeStream.decoder.class.name\":\"" + MESSAGE_DECODER_CLASS + "\","
+ " \"stream.fakeStream.decoder.prop.schema.registry.rest.url\":\"http://1.2.3.4:1766/schemas\","
+ " \"stream.fakeStream.decoder.prop.schema.registry.schema.name\":\"UnknownSchema\","
+ " \"stream.fakeStream.hlc.zk.connect.string\":\"zoo:2181/kafka-queuing\","
+ " \"stream.fakeStream.topic.name\":\"" + TOPIC_NAME + "\","
+ " \"stream.fakeStream.zk.broker.url\":\"kafka-broker:2181/kafka-queuing\","
+ " \"streamType\":\"fakeStream\""
+ " }"
+ " },"
+ " \"tableName\":\"Coffee_REALTIME\","
+ " \"tableType\":\"realtime\","
+ " \"tenants\":{"
+ " \"broker\":\"shared\","
+ " \"server\":\"server-1\""
+ " },"
+ " \"upsertConfig\":{"
+ " \"mode\":\"FULL\""
+ " }"
+ "}";
private static final String SCHEMA_JSON =
"{"
+ " \"schemaName\":\"testSchema\","
+ " \"metricFieldSpecs\":[{\"name\":\"m\",\"dataType\":\"LONG\"}],"
+ " \"dimensionFieldSpecs\":[{\"name\":\"d\",\"dataType\":\"STRING\",\"singleValueField\":true}],"
+ " \"timeFieldSpec\":{"
+ " \"incomingGranularitySpec\":{"
+ " \"dataType\":\"LONG\","
+ " \"timeType\":\"MINUTES\","
+ " \"name\":\"minutesSinceEpoch\""
+ " },"
+ " \"defaultNullValue\":12345"
+ " },"
+ " \"primaryKeyColumns\": [\"event_id\"]"
+ "}";
//@formatter:on

private final String _tableConfigJson =
"{\n" + " \"metadata\": {}, \n" + " \"segmentsConfig\": {\n" + " \"replicasPerPartition\": \"3\", \n"
+ " \"replication\": \"3\", \n" + " \"replicationNumber\": 3, \n"
+ " \"retentionTimeUnit\": \"DAYS\", \n" + " \"retentionTimeValue\": \"3\", \n"
+ " \"schemaName\": \"UnknownSchema\", \n"
+ " \"segmentAssignmentStrategy\": \"BalanceNumSegmentAssignmentStrategy\", \n"
+ " \"segmentPushFrequency\": \"daily\", \n" + " \"segmentPushType\": \"APPEND\", \n"
+ " \"timeColumnName\": \"minutesSinceEpoch\", \n" + " \"timeType\": \"MINUTES\"\n" + " }, \n"
+ " \"tableIndexConfig\": {\n" + " \"invertedIndexColumns\": [" + " ], \n"
+ " \"lazyLoad\": \"false\", \n" + " \"loadMode\": \"HEAP\", \n"
+ " \"segmentFormatVersion\": null, \n" + " \"sortedColumn\": [], \n" + " \"streamConfigs\": {\n"
+ " \"" + StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS + "\": \"" + MAX_ROWS_IN_SEGMENT + "\", \n"
+ " \"" + StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME + "\": \"" + MAX_TIME_FOR_SEGMENT_CLOSE_MS
+ "\", \n" + " \"stream.fakeStream.broker.list\": \"broker:7777\", \n"
+ " \"stream.fakeStream.consumer.prop.auto.offset.reset\": \"smallest\", \n"
+ " \"stream.fakeStream.consumer.type\": \"simple\", \n"
+ " \"stream.fakeStream.consumer.factory.class.name\": \"" + FakeStreamConsumerFactory.class.getName()
+ "\", \n" + " \"stream.fakeStream.decoder.class.name\": \"" + FakeStreamMessageDecoder.class.getName()
+ "\", \n"
+ " \"stream.fakeStream.decoder.prop.schema.registry.rest.url\": \"http://schema-registry-host.corp"
+ ".ceo:1766/schemas\", \n"
+ " \"stream.fakeStream.decoder.prop.schema.registry.schema.name\": \"UnknownSchema\", \n"
+ " \"stream.fakeStream.hlc.zk.connect.string\": \"zoo:2181/kafka-queuing\", \n"
+ " \"stream.fakeStream.topic.name\": \"" + TOPIC_NAME + "\", \n"
+ " \"stream.fakeStream.zk.broker.url\": \"kafka-broker:2181/kafka-queuing\", \n"
+ " \"streamType\": \"fakeStream\"\n" + " }\n" + " }, \n"
+ " \"tableName\": \"Coffee_REALTIME\", \n" + " \"tableType\": \"realtime\", \n" + " \"tenants\": {\n"
+ " \"broker\": \"shared\", \n" + " \"server\": \"server-1\"\n" + " },\n"
+ " \"upsertConfig\": {\"mode\": \"FULL\" } \n" + "}";

private String makeSchema() {
return "{" + " \"schemaName\":\"SchemaTest\"," + " \"metricFieldSpecs\":[" + " {\"name\":\"m\",\"dataType\":\""
+ "LONG" + "\"}" + " ]," + " \"dimensionFieldSpecs\":[" + " {\"name\":\"d\",\"dataType\":\"" + "STRING"
+ "\",\"singleValueField\":" + "true" + "}" + " ]," + " \"timeFieldSpec\":{"
+ " \"incomingGranularitySpec\":{\"dataType\":\"LONG\",\"timeType\":\"MILLISECONDS\",\"name\":\"time\"},"
+ " \"defaultNullValue\":12345" + " },\n" + "\"primaryKeyColumns\": [\"event_id\"] \n" + "}";
}
private static long _timeNow = System.currentTimeMillis();
private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new ConcurrentHashMap<>();

private TableConfig createTableConfig()
throws Exception {
return JsonUtils.stringToObject(_tableConfigJson, TableConfig.class);
return JsonUtils.stringToObject(TABLE_CONFIG_JSON, TableConfig.class);
}

private RealtimeTableDataManager createTableDataManager(TableConfig tableConfig) {
Expand Down Expand Up @@ -160,7 +189,7 @@ private FakeLLRealtimeSegmentDataManager createFakeSegmentManager()
RealtimeTableDataManager tableDataManager = createTableDataManager(tableConfig);
LLCSegmentName llcSegmentName = new LLCSegmentName(SEGMENT_NAME_STR);
_partitionGroupIdToSemaphoreMap.putIfAbsent(PARTITION_GROUP_ID, new Semaphore(1));
Schema schema = Schema.fromString(makeSchema());
Schema schema = Schema.fromString(SCHEMA_JSON);
ServerMetrics serverMetrics = new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
return new FakeLLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, tableDataManager, SEGMENT_DIR, schema,
llcSegmentName, _partitionGroupIdToSemaphoreMap, serverMetrics);
Expand Down Expand Up @@ -225,8 +254,8 @@ public void testHolding()
// We should consume initially...
segmentDataManager._consumeOffsets.add(endOffset);
final SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(
new SegmentCompletionProtocol.Response.Params()
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)
new SegmentCompletionProtocol.Response.Params().withStatus(
SegmentCompletionProtocol.ControllerResponseStatus.HOLD)
.withStreamPartitionMsgOffset(endOffset.toString()));
// And then never consume as long as we get a hold response, 100 times.
for (int i = 0; i < 100; i++) {
Expand Down Expand Up @@ -272,8 +301,8 @@ public void testCommitAfterHold()
Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertTrue(segmentDataManager._commitSegmentCalled);
Assert
.assertEquals(segmentDataManager._state.get(segmentDataManager), LLRealtimeSegmentDataManager.State.COMMITTED);
Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
LLRealtimeSegmentDataManager.State.COMMITTED);
segmentDataManager.destroy();
}

Expand Down Expand Up @@ -309,12 +338,12 @@ public void testCommitAfterCatchup()
segmentDataManager._consumeOffsets.add(firstOffset);
segmentDataManager._consumeOffsets.add(catchupOffset); // Offset after catchup
final SegmentCompletionProtocol.Response holdResponse1 = new SegmentCompletionProtocol.Response(
new SegmentCompletionProtocol.Response.Params()
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)
new SegmentCompletionProtocol.Response.Params().withStatus(
SegmentCompletionProtocol.ControllerResponseStatus.HOLD)
.withStreamPartitionMsgOffset(firstOffset.toString()));
final SegmentCompletionProtocol.Response catchupResponse = new SegmentCompletionProtocol.Response(
new SegmentCompletionProtocol.Response.Params()
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP)
new SegmentCompletionProtocol.Response.Params().withStatus(
SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP)
.withStreamPartitionMsgOffset(catchupOffset.toString()));
final SegmentCompletionProtocol.Response holdResponse2 = new SegmentCompletionProtocol.Response(
new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(catchupOffset.toString())
Expand All @@ -336,8 +365,8 @@ public void testCommitAfterCatchup()
Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertTrue(segmentDataManager._commitSegmentCalled);
Assert
.assertEquals(segmentDataManager._state.get(segmentDataManager), LLRealtimeSegmentDataManager.State.COMMITTED);
Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
LLRealtimeSegmentDataManager.State.COMMITTED);
segmentDataManager.destroy();
}

Expand All @@ -361,8 +390,8 @@ public void testDiscarded()
Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertFalse(segmentDataManager._commitSegmentCalled);
Assert
.assertEquals(segmentDataManager._state.get(segmentDataManager), LLRealtimeSegmentDataManager.State.DISCARDED);
Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
LLRealtimeSegmentDataManager.State.DISCARDED);
segmentDataManager.destroy();
}

Expand Down Expand Up @@ -765,9 +794,9 @@ public void testShutdownTableDataManagerWillNotShutdownLeaseExtenderExecutor()
when(tableDataManagerConfig.getTableName()).thenReturn(tableConfig.getTableName());
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());

TableDataManager tableDataManager = TableDataManagerProvider
.getTableDataManager(tableDataManagerConfig, "testInstance", propertyStore, mock(ServerMetrics.class),
mock(HelixManager.class), null);
TableDataManager tableDataManager =
TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, "testInstance", propertyStore,
mock(ServerMetrics.class), mock(HelixManager.class), null);
tableDataManager.start();
tableDataManager.shutDown();
Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ public abstract class GeoFunctionTest {
protected static final String STRING_SV_COLUMN = "stringSV";
protected static final String LONG_SV_COLUMN = "longSV";
protected static final String STRING_SV_COLUMN2 = "stringSV2";

private static final String RAW_TABLE_NAME = "testTable";
private static final String SEGMENT_NAME = "testSegment";
private static final String INDEX_DIR_PATH = FileUtils.getTempDirectoryPath() + File.separator + SEGMENT_NAME;
protected static final String TIME_COLUMN = "time";

private static final double DELTA = 0.00001;

Expand All @@ -81,9 +82,9 @@ protected void assertRelation(String functionName, String leftWkt, String rightW
throws Exception {
assertIntFunction(
String.format("%s(ST_GeomFromText(%s),ST_GeomFromText(%s))", functionName, STRING_SV_COLUMN, STRING_SV_COLUMN2),
new int[]{result ? 1 : 0}, Arrays
.asList(new Column(STRING_SV_COLUMN, FieldSpec.DataType.STRING, new String[]{leftWkt}),
new Column(STRING_SV_COLUMN2, FieldSpec.DataType.STRING, new String[]{rightWkt})));
new int[]{result ? 1 : 0},
Arrays.asList(new Column(STRING_SV_COLUMN, FieldSpec.DataType.STRING, new String[]{leftWkt}),
new Column(STRING_SV_COLUMN2, FieldSpec.DataType.STRING, new String[]{rightWkt})));
}

protected void assertLongFunction(String function, long[] expectedValues, List<Column> columns)
Expand Down Expand Up @@ -151,8 +152,7 @@ private void assertFunction(String function, int length, List<Column> columns,
rows.add(row);
}

TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME_COLUMN).build();
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(INDEX_DIR_PATH);
config.setSegmentName(SEGMENT_NAME);
Expand Down
Loading