Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public static DimensionTableDataManager getInstanceByTableName(String tableNameW
private final AtomicInteger _loadToken = new AtomicInteger();

private boolean _disablePreload = false;
private boolean _errorOnDuplicatePrimaryKey = false;

@Override
protected void doInit() {
Expand All @@ -103,6 +104,7 @@ protected void doInit() {
DimensionTableConfig dimensionTableConfig = tableConfig.getDimensionTableConfig();
if (dimensionTableConfig != null) {
_disablePreload = dimensionTableConfig.isDisablePreload();
_errorOnDuplicatePrimaryKey = dimensionTableConfig.isErrorOnDuplicatePrimaryKey();
}
}

Expand Down Expand Up @@ -211,7 +213,12 @@ private DimensionTable createFastLookupDimensionTable() {
}
GenericRow row = new GenericRow();
recordReader.getRecord(i, row);
lookupTable.put(row.getPrimaryKey(primaryKeyColumns), row);
GenericRow previousRow = lookupTable.put(row.getPrimaryKey(primaryKeyColumns), row);
if (_errorOnDuplicatePrimaryKey && previousRow != null) {
throw new IllegalStateException(
"Caught exception while reading records from segment: " + indexSegment.getSegmentName()
+ "primary key already exist for: " + row.getPrimaryKey(primaryKeyColumns));
}
}
} catch (Exception e) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.pinot.core.data.manager.offline;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -47,10 +50,13 @@
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -60,14 +66,17 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;


@SuppressWarnings("unchecked")
public class DimensionTableDataManagerTest {
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), LoaderTest.class.getName());
private static final String RAW_TABLE_NAME = "dimBaseballTeams";
private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
private static final String AVRO_DATA_PATH = "data/dimBaseballTeams.avro";
private static final String CSV_DATA_PATH = "data/dimBaseballTeams.csv";
private static final String SCHEMA_PATH = "data/dimBaseballTeams_schema.json";
private static final String TABLE_CONFIG_PATH = "data/dimBaseballTeams_config.json";

private File _indexDir;
private IndexLoadingConfig _indexLoadingConfig;
Expand All @@ -80,14 +89,22 @@ public void setUp()
ServerMetrics.register(mock(ServerMetrics.class));

// prepare segment data
URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
assertNotNull(resourceUrl);
File avroFile = new File(resourceUrl.getFile());
URL dataPathUrl = getClass().getClassLoader().getResource(CSV_DATA_PATH);
URL schemaPathUrl = getClass().getClassLoader().getResource(SCHEMA_PATH);
URL configPathUrl = getClass().getClassLoader().getResource(TABLE_CONFIG_PATH);
assertNotNull(dataPathUrl);
assertNotNull(schemaPathUrl);
assertNotNull(configPathUrl);
File csvFile = new File(dataPathUrl.getFile());
Schema schema = createSchema(new File(schemaPathUrl.getFile()));
TableConfig tableConfig = createTableConfig(new File(configPathUrl.getFile()));

// create segment
File tableDataDir = new File(TEMP_DIR, OFFLINE_TABLE_NAME);

SegmentGeneratorConfig segmentGeneratorConfig =
SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, tableDataDir, RAW_TABLE_NAME);
SegmentTestUtils.getSegmentGeneratorConfig(csvFile, FileFormat.CSV, tableDataDir, RAW_TABLE_NAME, tableConfig,
schema);
SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
driver.init(segmentGeneratorConfig);
driver.build();
Expand All @@ -111,8 +128,8 @@ private Schema getSchema() {
.setPrimaryKeyColumns(Collections.singletonList("teamID")).build();
}

private TableConfig getTableConfig(boolean disablePreload) {
DimensionTableConfig dimensionTableConfig = new DimensionTableConfig(disablePreload);
private TableConfig getTableConfig(boolean disablePreload, boolean errorOnDuplicatePrimaryKey) {
DimensionTableConfig dimensionTableConfig = new DimensionTableConfig(disablePreload, errorOnDuplicatePrimaryKey);
return new TableConfigBuilder(TableType.OFFLINE).setTableName("dimBaseballTeams")
.setDimensionTableConfig(dimensionTableConfig).build();
}
Expand All @@ -127,7 +144,7 @@ private Schema getSchemaWithExtraColumn() {
private DimensionTableDataManager makeTableDataManager(HelixManager helixManager) {
InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableConfig tableConfig = getTableConfig(false);
TableConfig tableConfig = getTableConfig(false, false);
DimensionTableDataManager tableDataManager =
DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME);
tableDataManager.init(instanceDataManagerConfig, tableConfig, helixManager, null, null);
Expand Down Expand Up @@ -257,8 +274,8 @@ public void testLookupWithoutPreLoad()
ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, AccessOption.PERSISTENT)).thenReturn(
SchemaUtils.toZNRecord(getSchema()));
when(propertyStore.get("/CONFIGS/TABLE/dimBaseballTeams", null, AccessOption.PERSISTENT)).thenReturn(
TableConfigUtils.toZNRecord(getTableConfig(true)));
when(propertyStore.get("/CONFIGS/TABLE/dimBaseballTeams_OFFLINE", null, AccessOption.PERSISTENT)).thenReturn(
TableConfigUtils.toZNRecord(getTableConfig(true, false)));
when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
DimensionTableDataManager tableDataManager = makeTableDataManager(helixManager);

Expand Down Expand Up @@ -294,4 +311,42 @@ public void testLookupWithoutPreLoad()
resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
assertNull(resp, "Response should be null if no segment is loaded");
}

@Test
public void testLookupErrorOnDuplicatePrimaryKey()
throws Exception {
HelixManager helixManager = mock(HelixManager.class);
ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, AccessOption.PERSISTENT)).thenReturn(
SchemaUtils.toZNRecord(getSchema()));
when(propertyStore.get("/CONFIGS/TABLE/dimBaseballTeams_OFFLINE", null, AccessOption.PERSISTENT)).thenReturn(
TableConfigUtils.toZNRecord(getTableConfig(false, true)));
when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
DimensionTableDataManager tableDataManager = makeTableDataManager(helixManager);

// try fetching data BEFORE loading segment
GenericRow resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
assertNull(resp, "Response should be null if no segment is loaded");

try {
tableDataManager.addSegment(_indexDir, _indexLoadingConfig);
fail("Should error out when ErrorOnDuplicatePrimaryKey is configured to true");
} catch (Exception e) {
// expected;
}
}

protected static Schema createSchema(File schemaFile)
throws IOException {
InputStream inputStream = new FileInputStream(schemaFile);
Assert.assertNotNull(inputStream);
return JsonUtils.inputStreamToObject(inputStream, Schema.class);
}

protected static TableConfig createTableConfig(File tableConfigFile)
throws IOException {
InputStream inputStream = new FileInputStream(tableConfigFile);
Assert.assertNotNull(inputStream);
return JsonUtils.inputStreamToObject(inputStream, TableConfig.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void testRecordReaderFileConfigInit() throws Exception {
assertEquals(outputSegments.size(), 1);
ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
assertEquals(segmentMetadata.getTotalDocs(), 51);
assertEquals(segmentMetadata.getTotalDocs(), 52);
}

@Test
Expand Down
Binary file not shown.
8 changes: 0 additions & 8 deletions pinot-core/src/test/resources/data/dimBaseballTeams.avsc

This file was deleted.

1 change: 1 addition & 0 deletions pinot-core/src/test/resources/data/dimBaseballTeams.csv
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
teamID,teamName
"ANA","Anaheim Angels"
"ANA","Anaheim Angels"
"ARI","Arizona Diamondbacks"
"ATL","Atlanta Braves"
"BAL","Baltimore Orioles (original- 1901–1902 current- since 1954)"
Expand Down
18 changes: 18 additions & 0 deletions pinot-core/src/test/resources/data/dimBaseballTeams_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"tableName": "dimBaseballTeams",
"tableType": "OFFLINE",
"isDimTable": true,
"segmentsConfig": {
"segmentPushType": "REFRESH",
"replication": "1"
},
"tenants": {
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"metadata": {
"customConfigs": {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@

public class DimensionTableConfig extends BaseJsonConfig {
private final boolean _disablePreload;
private final boolean _errorOnDuplicatePrimaryKey;

@JsonCreator
public DimensionTableConfig(@JsonProperty(value = "disablePreload", required = true) boolean disablePreload) {
_disablePreload = disablePreload;
public DimensionTableConfig(@JsonProperty(value = "disablePreload") Boolean disablePreload,
@JsonProperty(value = "errorOnDuplicatePrimaryKey") Boolean errorOnDuplicatePrimaryKey) {
_disablePreload = disablePreload != null && disablePreload;
_errorOnDuplicatePrimaryKey = errorOnDuplicatePrimaryKey != null && errorOnDuplicatePrimaryKey;
}

public boolean isDisablePreload() {
return _disablePreload;
}

public boolean isErrorOnDuplicatePrimaryKey() {
return _errorOnDuplicatePrimaryKey;
}
}