Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static abstract class MergeTask {
public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment";
public static final String MAX_NUM_PARALLEL_BUCKETS = "maxNumParallelBuckets";
public static final String SEGMENT_NAME_PREFIX_KEY = "segmentNamePrefix";
public static final String SEGMENT_NAME_POSTFIX_KEY = "segmentNamePostfix";
}

public static class MergeRollupTask extends MergeTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ public class SegmentConfig {

private final int _maxNumRecordsPerSegment;
private final String _segmentNamePrefix;
private final String _segmentNamePostfix;

@JsonCreator
private SegmentConfig(@JsonProperty(value = "maxNumRecordsPerSegment", required = true) int maxNumRecordsPerSegment,
@JsonProperty("segmentNamePrefix") @Nullable String segmentNamePrefix) {
@JsonProperty("segmentNamePrefix") @Nullable String segmentNamePrefix,
@JsonProperty("segmentNamePostfix") @Nullable String segmentNamePostfix) {
Preconditions.checkState(maxNumRecordsPerSegment > 0, "Max num records per segment must be > 0");
_maxNumRecordsPerSegment = maxNumRecordsPerSegment;
_segmentNamePrefix = segmentNamePrefix;
_segmentNamePostfix = segmentNamePostfix;
}

/**
Expand All @@ -55,12 +58,18 @@ public String getSegmentNamePrefix() {
return _segmentNamePrefix;
}

@Nullable
public String getSegmentNamePostfix() {
return _segmentNamePostfix;
}

/**
* Builder for SegmentConfig
*/
public static class Builder {
private int _maxNumRecordsPerSegment = DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT;
private String _segmentNamePrefix;
private String _segmentNamePostfix;

public Builder setMaxNumRecordsPerSegment(int maxNumRecordsPerSegment) {
_maxNumRecordsPerSegment = maxNumRecordsPerSegment;
Expand All @@ -72,15 +81,20 @@ public Builder setSegmentNamePrefix(String segmentNamePrefix) {
return this;
}

public Builder setSegmentNamePostfix(String segmentNamePostfix) {
_segmentNamePostfix = segmentNamePostfix;
return this;
}

public SegmentConfig build() {
Preconditions.checkState(_maxNumRecordsPerSegment > 0, "Max num records per segment must be > 0");
return new SegmentConfig(_maxNumRecordsPerSegment, _segmentNamePrefix);
return new SegmentConfig(_maxNumRecordsPerSegment, _segmentNamePrefix, _segmentNamePostfix);
}
}

@Override
public String toString() {
return "SegmentConfig{" + "_maxNumRecordsPerSegment=" + _maxNumRecordsPerSegment + ", _segmentNamePrefix='"
+ _segmentNamePrefix + '\'' + '}';
+ _segmentNamePrefix + '\'' + ", _segmentNamePostfix='" + _segmentNamePostfix + '\'' + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,18 @@ public List<File> process()
TableConfig tableConfig = _segmentProcessorConfig.getTableConfig();
Schema schema = _segmentProcessorConfig.getSchema();
String segmentNamePrefix = _segmentProcessorConfig.getSegmentConfig().getSegmentNamePrefix();
String segmentNamePostfix = _segmentProcessorConfig.getSegmentConfig().getSegmentNamePostfix();
SegmentGeneratorConfig generatorConfig = new SegmentGeneratorConfig(tableConfig, schema);
generatorConfig.setOutDir(_segmentsOutputDir.getPath());

if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
generatorConfig.setSegmentNameGenerator(
SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix, null, false));
SegmentNameGeneratorFactory
.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix, segmentNamePostfix, false));
} else {
// SimpleSegmentNameGenerator is used by default.
generatorConfig.setSegmentNamePrefix(segmentNamePrefix);
generatorConfig.setSegmentNamePostfix(segmentNamePostfix);
}

int maxNumRecordsPerSegment = _segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,39 +413,40 @@ public void testSingleSegment()

// Segment config
config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setSegmentConfig(
new SegmentConfig.Builder().setMaxNumRecordsPerSegment(4).setSegmentNamePrefix("myPrefix").build()).build();
new SegmentConfig.Builder().setMaxNumRecordsPerSegment(4).setSegmentNamePrefix("myPrefix")
.setSegmentNamePostfix("myPostfix").build()).build();
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 3);
outputSegments.sort(null);
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 4);
assertEquals(segmentMetadata.getName(), "myPrefix_1597719600000_1597795200000_0");
assertEquals(segmentMetadata.getName(), "myPrefix_1597719600000_1597795200000_myPostfix_0");
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(1));
assertEquals(segmentMetadata.getTotalDocs(), 4);
assertEquals(segmentMetadata.getName(), "myPrefix_1597802400000_1597878000000_1");
assertEquals(segmentMetadata.getName(), "myPrefix_1597802400000_1597878000000_myPostfix_1");
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(2));
assertEquals(segmentMetadata.getTotalDocs(), 2);
assertEquals(segmentMetadata.getName(), "myPrefix_1597881600000_1597892400000_2");
assertEquals(segmentMetadata.getName(), "myPrefix_1597881600000_1597892400000_myPostfix_2");
FileUtils.cleanDirectory(workingDir);
rewindRecordReaders(_singleSegment);

config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigSegmentNameGeneratorEnabled)
.setSchema(_schema).setSegmentConfig(new SegmentConfig.Builder().setMaxNumRecordsPerSegment(4)
.setSegmentNamePrefix("myPrefix").build()).build();
.setSegmentNamePrefix("myPrefix").setSegmentNamePostfix("myPostfix").build()).build();
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 3);
outputSegments.sort(null);
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 4);
assertEquals(segmentMetadata.getName(), "myPrefix_2020-08-18_2020-08-19_0");
assertEquals(segmentMetadata.getName(), "myPrefix_2020-08-18_2020-08-19_myPostfix_0");
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(1));
assertEquals(segmentMetadata.getTotalDocs(), 4);
assertEquals(segmentMetadata.getName(), "myPrefix_2020-08-19_2020-08-19_1");
assertEquals(segmentMetadata.getName(), "myPrefix_2020-08-19_2020-08-19_myPostfix_1");
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(2));
assertEquals(segmentMetadata.getTotalDocs(), 2);
assertEquals(segmentMetadata.getName(), "myPrefix_2020-08-20_2020-08-20_2");
assertEquals(segmentMetadata.getName(), "myPrefix_2020-08-20_2020-08-20_myPostfix_2");
FileUtils.cleanDirectory(workingDir);
rewindRecordReaders(_singleSegment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public static SegmentConfig getSegmentConfig(Map<String, String> taskConfig) {
segmentConfigBuilder.setMaxNumRecordsPerSegment(Integer.parseInt(maxNumRecordsPerSegment));
}
segmentConfigBuilder.setSegmentNamePrefix(taskConfig.get(MergeTask.SEGMENT_NAME_PREFIX_KEY));
segmentConfigBuilder.setSegmentNamePostfix(taskConfig.get(MergeTask.SEGMENT_NAME_POSTFIX_KEY));
return segmentConfigBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,18 @@ public void testGetSegmentConfig() {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, "10000");
taskConfig.put(MergeTask.SEGMENT_NAME_PREFIX_KEY, "myPrefix");
taskConfig.put(MergeTask.SEGMENT_NAME_POSTFIX_KEY, "myPostfix");
SegmentConfig segmentConfig = MergeTaskUtils.getSegmentConfig(taskConfig);
assertEquals(segmentConfig.getMaxNumRecordsPerSegment(), 10000);
assertEquals(segmentConfig.getSegmentNamePrefix(), "myPrefix");
assertEquals(segmentConfig.getSegmentNamePostfix(), "myPostfix");
assertEquals(segmentConfig.toString(),
"SegmentConfig{_maxNumRecordsPerSegment=10000, _segmentNamePrefix='myPrefix', "
+ "_segmentNamePostfix='myPostfix'}");

segmentConfig = MergeTaskUtils.getSegmentConfig(Collections.emptyMap());
assertEquals(segmentConfig.getMaxNumRecordsPerSegment(), SegmentConfig.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT);
assertNull(segmentConfig.getSegmentNamePrefix());
assertNull(segmentConfig.getSegmentNamePostfix());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ public NormalizedDateSegmentNameGenerator(String tableName, @Nullable String seg
_segmentNamePrefix != null && isValidSegmentName(_segmentNamePrefix));
_excludeSequenceId = excludeSequenceId;
_appendPushType = "APPEND".equalsIgnoreCase(pushType);
_segmentNamePostfix = segmentNamePostfix;
_segmentNamePostfix = segmentNamePostfix != null ? segmentNamePostfix.trim() : null;
Preconditions.checkArgument(
_segmentNamePostfix == null || isValidSegmentName(_segmentNamePostfix));

// Include time info for APPEND push type
if (_appendPushType) {
Expand Down Expand Up @@ -96,13 +98,9 @@ public String generateSegmentName(int sequenceId, @Nullable Object minTimeValue,
// Include time value for APPEND push type
if (_appendPushType) {
return JOINER.join(_segmentNamePrefix, getNormalizedDate(Preconditions.checkNotNull(minTimeValue)),
getNormalizedDate(Preconditions.checkNotNull(maxTimeValue)), sequenceIdInSegmentName);
getNormalizedDate(Preconditions.checkNotNull(maxTimeValue)), _segmentNamePostfix, sequenceIdInSegmentName);
} else {
if (_segmentNamePostfix != null) {
return JOINER.join(_segmentNamePrefix, _segmentNamePostfix, sequenceIdInSegmentName);
} else {
return JOINER.join(_segmentNamePrefix, sequenceIdInSegmentName);
}
return JOINER.join(_segmentNamePrefix, _segmentNamePostfix, sequenceIdInSegmentName);
}
}

Expand All @@ -129,8 +127,11 @@ public String getNormalizedDate(Object timeValue) {
@Override
public String toString() {
StringBuilder stringBuilder =
new StringBuilder("NormalizedDateSegmentNameGenerator: segmentNamePrefix=").append(_segmentNamePrefix)
.append(", appendPushType=").append(_appendPushType);
new StringBuilder("NormalizedDateSegmentNameGenerator: segmentNamePrefix=").append(_segmentNamePrefix);
if (_segmentNamePostfix != null) {
stringBuilder.append(", segmentNamePostfix=").append(_segmentNamePostfix);
}
stringBuilder.append(", appendPushType=").append(_appendPushType);
if (_excludeSequenceId) {
stringBuilder.append(", excludeSequenceId=true");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ public class NormalizedDateSegmentNameGeneratorTest {
private static final String TABLE_NAME = "myTable";
private static final String MALFORMED_TABLE_NAME = "my/Table";
private static final String SEGMENT_NAME_PREFIX = "myTable_daily";
private static final String SEGMENT_NAME_POSTFIX = "myPostfix";
private static final String MALFORMED_SEGMENT_NAME_PREFIX = "myTable\\daily";
private static final String MALFORMED_SEGMENT_NAME_POSTFIX = "my\\postfix";
private static final String APPEND_PUSH_TYPE = "APPEND";
private static final String REFRESH_PUSH_TYPE = "REFRESH";
private static final String EPOCH_TIME_FORMAT = "EPOCH";
Expand Down Expand Up @@ -64,6 +66,18 @@ public void testWithSegmentNamePrefix() {
assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_daily_1");
}

@Test
public void testWithSegmentNamePrefixPostfix() {
SegmentNameGenerator segmentNameGenerator =
new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, false, REFRESH_PUSH_TYPE, null, null,
SEGMENT_NAME_POSTFIX);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, segmentNamePostfix=myPostfix, "
+ "appendPushType=false");
assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable_daily_myPostfix");
assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_daily_myPostfix_1");
}

@Test
public void testWithUntrimmedSegmentNamePrefix() {
SegmentNameGenerator segmentNameGenerator =
Expand All @@ -75,6 +89,18 @@ public void testWithUntrimmedSegmentNamePrefix() {
assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_daily_1");
}

@Test
public void testWithUntrimmedSegmentNamePrefixPostfix() {
SegmentNameGenerator segmentNameGenerator =
new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX + " ", false, REFRESH_PUSH_TYPE, null,
null, SEGMENT_NAME_POSTFIX + " ");
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, segmentNamePostfix=myPostfix, "
+ "appendPushType=false");
assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable_daily_myPostfix");
assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_daily_myPostfix_1");
}

@Test
public void testExcludeSequenceId() {
SegmentNameGenerator segmentNameGenerator =
Expand All @@ -97,6 +123,18 @@ public void testWithPrefixExcludeSequenceId() {
assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_daily");
}

@Test
public void testWithPrefixPostfixExcludeSequenceId() {
SegmentNameGenerator segmentNameGenerator =
new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, true, REFRESH_PUSH_TYPE, null, null,
SEGMENT_NAME_POSTFIX);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, segmentNamePostfix=myPostfix, "
+ "appendPushType=false, excludeSequenceId=true");
assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable_daily_myPostfix");
assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_daily_myPostfix");
}

@Test
public void testAppend() {
SegmentNameGenerator segmentNameGenerator =
Expand All @@ -111,6 +149,35 @@ public void testAppend() {
"myTable_1970-01-02_1970-01-04_1");
}

@Test
public void testAppendWithSegmentNamePrefix() {
SegmentNameGenerator segmentNameGenerator =
new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, false, APPEND_PUSH_TYPE,
DAILY_PUSH_FREQUENCY, new DateTimeFormatSpec(1, TimeUnit.DAYS.toString(), EPOCH_TIME_FORMAT), null);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, appendPushType=true, "
+ "outputSDF=yyyy-MM-dd, inputTimeUnit=DAYS");
assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 1L, 3L),
"myTable_daily_1970-01-02_1970-01-04");
assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, 1L, 3L),
"myTable_daily_1970-01-02_1970-01-04_1");
}

@Test
public void testAppendWithSegmentNamePrefixPostfix() {
SegmentNameGenerator segmentNameGenerator =
new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, false, APPEND_PUSH_TYPE,
DAILY_PUSH_FREQUENCY, new DateTimeFormatSpec(1, TimeUnit.DAYS.toString(), EPOCH_TIME_FORMAT),
SEGMENT_NAME_POSTFIX);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, segmentNamePostfix=myPostfix, "
+ "appendPushType=true, outputSDF=yyyy-MM-dd, inputTimeUnit=DAYS");
assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 1L, 3L),
"myTable_daily_1970-01-02_1970-01-04_myPostfix");
assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, 1L, 3L),
"myTable_daily_1970-01-02_1970-01-04_myPostfix_1");
}

@Test
public void testHoursTimeType() {
SegmentNameGenerator segmentNameGenerator =
Expand Down Expand Up @@ -156,7 +223,7 @@ public void testStringSimpleDateFormat() {
}

@Test
public void testMalFormedTableNameAndSegmentNamePrefix() {
public void testMalFormedTableNameAndSegmentNamePrefixPostfix() {
try {
new NormalizedDateSegmentNameGenerator(MALFORMED_TABLE_NAME, null, false, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
new DateTimeFormatSpec(1, TimeUnit.DAYS.toString(), SIMPLE_DATE_TIME_FORMAT, STRING_SLASH_DATE_FORMAT), null);
Expand All @@ -172,6 +239,15 @@ public void testMalFormedTableNameAndSegmentNamePrefix() {
} catch (IllegalArgumentException e) {
// Expected
}
try {
new NormalizedDateSegmentNameGenerator(
TABLE_NAME, SEGMENT_NAME_PREFIX, false, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
new DateTimeFormatSpec(1, TimeUnit.DAYS.toString(), SIMPLE_DATE_TIME_FORMAT, STRING_SLASH_DATE_FORMAT),
MALFORMED_SEGMENT_NAME_POSTFIX);
Assert.fail();
} catch (IllegalArgumentException e) {
// Expected
}
}

@Test
Expand Down