Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public static abstract class MergeTask {
public static final String MAX_NUM_PARALLEL_BUCKETS = "maxNumParallelBuckets";
public static final String SEGMENT_NAME_PREFIX_KEY = "segmentNamePrefix";
public static final String SEGMENT_NAME_POSTFIX_KEY = "segmentNamePostfix";
public static final String FIXED_SEGMENT_NAME_KEY = "fixedSegmentName";
}

public static class MergeRollupTask extends MergeTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,18 @@ public class SegmentConfig {
private final int _maxNumRecordsPerSegment;
private final String _segmentNamePrefix;
private final String _segmentNamePostfix;
private final String _fixedSegmentName;

@JsonCreator
private SegmentConfig(@JsonProperty(value = "maxNumRecordsPerSegment", required = true) int maxNumRecordsPerSegment,
@JsonProperty("segmentNamePrefix") @Nullable String segmentNamePrefix,
@JsonProperty("segmentNamePostfix") @Nullable String segmentNamePostfix) {
@JsonProperty("segmentNamePostfix") @Nullable String segmentNamePostfix,
@JsonProperty("fixedSegmentName") @Nullable String fixedSegmentName) {
Preconditions.checkState(maxNumRecordsPerSegment > 0, "Max num records per segment must be > 0");
_maxNumRecordsPerSegment = maxNumRecordsPerSegment;
_segmentNamePrefix = segmentNamePrefix;
_segmentNamePostfix = segmentNamePostfix;
_fixedSegmentName = fixedSegmentName;
}

/**
Expand All @@ -63,13 +66,19 @@ public String getSegmentNamePostfix() {
return _segmentNamePostfix;
}

@Nullable
public String getFixedSegmentName() {
return _fixedSegmentName;
}

/**
* Builder for SegmentConfig
*/
public static class Builder {
private int _maxNumRecordsPerSegment = DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT;
private String _segmentNamePrefix;
private String _segmentNamePostfix;
private String _fixedSegmentName;

public Builder setMaxNumRecordsPerSegment(int maxNumRecordsPerSegment) {
_maxNumRecordsPerSegment = maxNumRecordsPerSegment;
Expand All @@ -86,15 +95,21 @@ public Builder setSegmentNamePostfix(String segmentNamePostfix) {
return this;
}

public Builder setFixedSegmentName(String fixedSegmentName) {
_fixedSegmentName = fixedSegmentName;
return this;
}

public SegmentConfig build() {
Preconditions.checkState(_maxNumRecordsPerSegment > 0, "Max num records per segment must be > 0");
return new SegmentConfig(_maxNumRecordsPerSegment, _segmentNamePrefix, _segmentNamePostfix);
return new SegmentConfig(_maxNumRecordsPerSegment, _segmentNamePrefix, _segmentNamePostfix, _fixedSegmentName);
}
}

@Override
public String toString() {
return "SegmentConfig{" + "_maxNumRecordsPerSegment=" + _maxNumRecordsPerSegment + ", _segmentNamePrefix='"
+ _segmentNamePrefix + '\'' + ", _segmentNamePostfix='" + _segmentNamePostfix + '\'' + '}';
+ _segmentNamePrefix + '\'' + ", _segmentNamePostfix='" + _segmentNamePostfix + '\'' + ", _fixedSegmentName='"
+ _fixedSegmentName + '\'' + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,14 @@ public List<File> process()
Schema schema = _segmentProcessorConfig.getSchema();
String segmentNamePrefix = _segmentProcessorConfig.getSegmentConfig().getSegmentNamePrefix();
String segmentNamePostfix = _segmentProcessorConfig.getSegmentConfig().getSegmentNamePostfix();
String fixedSegmentName = _segmentProcessorConfig.getSegmentConfig().getFixedSegmentName();
SegmentGeneratorConfig generatorConfig = new SegmentGeneratorConfig(tableConfig, schema);
generatorConfig.setOutDir(_segmentsOutputDir.getPath());

if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
generatorConfig.setSegmentNameGenerator(
SegmentNameGeneratorFactory
.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix, segmentNamePostfix, false));
generatorConfig.setSegmentNameGenerator(SegmentNameGeneratorFactory
.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix, segmentNamePostfix, fixedSegmentName,
false));
} else {
// SimpleSegmentNameGenerator is used by default.
generatorConfig.setSegmentNamePrefix(segmentNamePrefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class SegmentProcessorFrameworkTest {
private TableConfig _tableConfig;
private TableConfig _tableConfigNullValueEnabled;
private TableConfig _tableConfigSegmentNameGeneratorEnabled;
private TableConfig _tableConfigWithFixedSegmentName;

private Schema _schema;
private Schema _schemaMV;
Expand Down Expand Up @@ -98,6 +99,9 @@ public void setup()
_tableConfigSegmentNameGeneratorEnabled =
new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build();
_tableConfigSegmentNameGeneratorEnabled.getIndexingConfig().setSegmentNameGeneratorType("normalizedDate");
_tableConfigWithFixedSegmentName =
new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build();
_tableConfigWithFixedSegmentName.getIndexingConfig().setSegmentNameGeneratorType("fixed");

_schema =
new Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("campaign", DataType.STRING, "")
Expand Down Expand Up @@ -247,6 +251,19 @@ public void testSingleSegment()
FileUtils.cleanDirectory(workingDir);
rewindRecordReaders(_singleSegment);

// Fixed segment name
config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigWithFixedSegmentName).setSchema(_schema)
.setSegmentConfig(new SegmentConfig.Builder().setFixedSegmentName("myTable_segment_0001").build()).build();
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
segmentMetadata = segment.getSegmentMetadata();
assertEquals(segmentMetadata.getName(), "myTable_segment_0001");
segment.destroy();
FileUtils.cleanDirectory(workingDir);
rewindRecordReaders(_singleSegment);

// Time filter
config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597795200000L, 1597881600000L).build())
Expand Down Expand Up @@ -431,9 +448,10 @@ public void testSingleSegment()
FileUtils.cleanDirectory(workingDir);
rewindRecordReaders(_singleSegment);

config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigSegmentNameGeneratorEnabled)
.setSchema(_schema).setSegmentConfig(new SegmentConfig.Builder().setMaxNumRecordsPerSegment(4)
.setSegmentNamePrefix("myPrefix").setSegmentNamePostfix("myPostfix").build()).build();
config =
new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigSegmentNameGeneratorEnabled).setSchema(_schema)
.setSegmentConfig(new SegmentConfig.Builder().setMaxNumRecordsPerSegment(4).setSegmentNamePrefix("myPrefix")
.setSegmentNamePostfix("myPostfix").build()).build();
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public static SegmentConfig getSegmentConfig(Map<String, String> taskConfig) {
}
segmentConfigBuilder.setSegmentNamePrefix(taskConfig.get(MergeTask.SEGMENT_NAME_PREFIX_KEY));
segmentConfigBuilder.setSegmentNamePostfix(taskConfig.get(MergeTask.SEGMENT_NAME_POSTFIX_KEY));
segmentConfigBuilder.setFixedSegmentName(taskConfig.get(MergeTask.FIXED_SEGMENT_NAME_KEY));
return segmentConfigBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ public void testGetTimeHandlerConfig() {
@Test
public void testGetPartitionerConfigs() {
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable")
.setSegmentPartitionConfig(new SegmentPartitionConfig(
Collections.singletonMap("memberId", new ColumnPartitionConfig("murmur", 10)))).build();
.setSegmentPartitionConfig(
new SegmentPartitionConfig(Collections.singletonMap("memberId", new ColumnPartitionConfig("murmur", 10))))
.build();
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("memberId", DataType.LONG).build();
Map<String, String> taskConfig = Collections.emptyMap();

Expand Down Expand Up @@ -161,17 +162,21 @@ public void testGetSegmentConfig() {
taskConfig.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, "10000");
taskConfig.put(MergeTask.SEGMENT_NAME_PREFIX_KEY, "myPrefix");
taskConfig.put(MergeTask.SEGMENT_NAME_POSTFIX_KEY, "myPostfix");
taskConfig.put(MergeTask.FIXED_SEGMENT_NAME_KEY, "mySegment");
SegmentConfig segmentConfig = MergeTaskUtils.getSegmentConfig(taskConfig);
assertEquals(segmentConfig.getMaxNumRecordsPerSegment(), 10000);
assertEquals(segmentConfig.getSegmentNamePrefix(), "myPrefix");
assertEquals(segmentConfig.getSegmentNamePostfix(), "myPostfix");
assertEquals(segmentConfig.getSegmentNamePostfix(), "myPostfix");
assertEquals(segmentConfig.getFixedSegmentName(), "mySegment");
assertEquals(segmentConfig.toString(),
"SegmentConfig{_maxNumRecordsPerSegment=10000, _segmentNamePrefix='myPrefix', "
+ "_segmentNamePostfix='myPostfix'}");
+ "_segmentNamePostfix='myPostfix', _fixedSegmentName='mySegment'}");

segmentConfig = MergeTaskUtils.getSegmentConfig(Collections.emptyMap());
assertEquals(segmentConfig.getMaxNumRecordsPerSegment(), SegmentConfig.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT);
assertNull(segmentConfig.getSegmentNamePrefix());
assertNull(segmentConfig.getSegmentNamePostfix());
assertNull(segmentConfig.getFixedSegmentName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@


public class SegmentNameGeneratorFactory {
public static final String FIXED_SEGMENT_NAME_GENERATOR = "fixed";
public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple";
public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = "normalizeddate";

Expand All @@ -39,14 +40,16 @@ private SegmentNameGeneratorFactory() {
* Create the segment name generator given input configurations
*/
public static SegmentNameGenerator createSegmentNameGenerator(TableConfig tableConfig, Schema schema,
@Nullable String prefix, @Nullable String postfix, boolean excludeSequenceId) {
@Nullable String prefix, @Nullable String postfix, @Nullable String fixedSegmentName, boolean excludeSequenceId) {
String segmentNameGeneratorType = tableConfig.getIndexingConfig().getSegmentNameGeneratorType();
if (segmentNameGeneratorType == null || segmentNameGeneratorType.isEmpty()) {
segmentNameGeneratorType = SIMPLE_SEGMENT_NAME_GENERATOR;
}

String tableName = tableConfig.getTableName();
switch (segmentNameGeneratorType.toLowerCase()) {
case FIXED_SEGMENT_NAME_GENERATOR:
return new FixedSegmentNameGenerator(fixedSegmentName);
case SIMPLE_SEGMENT_NAME_GENERATOR:
if (prefix != null) {
return new SimpleSegmentNameGenerator(prefix, postfix);
Expand Down