Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment processing framework #5934

Merged
merged 12 commits into from
Sep 15, 2020
Prev Previous commit
Next Next commit
Javadoc and imports
  • Loading branch information
npawar committed Aug 27, 2020
commit eece981149304287c751d3dd44be40f14bfcc7a1
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,14 @@ public Object evaluate(Object[] arguments) {

private interface ExecutableNode {

/**
* Execute the function by extracting arguments from the row
*/
Object execute(GenericRow row);

/**
* Execute the function on provided arguments
*/
Object execute(Object[] arguments);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ public void reset() {
_collection.clear();
}

/**
* A record representation for the keys of the record
* Note that the dimensions can have multi-value columns, and hence the equals and hashCode need deep array operations
*/
private static class Record {
private final Object[] _values;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.common.base.Preconditions;


/**
Expand Down Expand Up @@ -55,6 +56,7 @@ public Builder setMaxNumRecordsPerSegment(int maxNumRecordsPerSegment) {
}

public SegmentConfig build() {
Preconditions.checkState(maxNumRecordsPerSegment > 0, "Max num records per segment must be > 0");
return new SegmentConfig(maxNumRecordsPerSegment);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ public class SegmentMapper {
private final PartitionFilter _partitionFilter;
private final Map<String, DataFileWriter<GenericData.Record>> _partitionToDataFileWriterMap = new HashMap<>();

public SegmentMapper(File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) {
public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) {
_inputSegment = inputSegment;
_mapperOutputDir = mapperOutputDir;

_mapperId = mapperConfig.getMapperId();
_mapperId = mapperId;
_avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
_recordTransformer = RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
_partitioner = PartitionerFactory.getPartitioner(mapperConfig.getPartitioningConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,17 @@
*/
public class SegmentMapperConfig {

private final String _mapperId;
private final Schema _pinotSchema;
private final RecordTransformerConfig _recordTransformerConfig;
private final PartitioningConfig _partitioningConfig;

public SegmentMapperConfig(String mapperId, Schema pinotSchema, RecordTransformerConfig recordTransformerConfig,
public SegmentMapperConfig(Schema pinotSchema, RecordTransformerConfig recordTransformerConfig,
PartitioningConfig partitioningConfig) {
_mapperId = mapperId;
_pinotSchema = pinotSchema;
_recordTransformerConfig = recordTransformerConfig;
_partitioningConfig = partitioningConfig;
}

/**
* Mapper id. Each mapper should have a unique id in one run of the Segment Processor
*/
public String getMapperId() {
return _mapperId;
}

/**
* The Pinot schema
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,14 @@

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
import org.apache.pinot.core.segment.processing.partitioner.PartitioningConfig;
import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -83,8 +73,7 @@ public SegmentProcessorFramework(File inputSegmentsDir, SegmentProcessorConfig s

_inputSegmentsDir = inputSegmentsDir;
Preconditions.checkState(_inputSegmentsDir.exists() && _inputSegmentsDir.isDirectory(),
"Input path: %s, must be a directory with Pinot segments", _inputSegmentsDir.getAbsolutePath());

"Input path: %s must be a directory with Pinot segments", _inputSegmentsDir.getAbsolutePath());
_outputSegmentsDir = outputSegmentsDir;
Preconditions.checkState(
_outputSegmentsDir.exists() && _outputSegmentsDir.isDirectory() && (_outputSegmentsDir.list().length == 0),
Expand All @@ -96,21 +85,19 @@ public SegmentProcessorFramework(File inputSegmentsDir, SegmentProcessorConfig s

_baseDir = new File(FileUtils.getTempDirectory(), "segment_processor_" + System.currentTimeMillis());
FileUtils.deleteQuietly(_baseDir);
if (!_baseDir.mkdirs()) {
throw new RuntimeException("Failed to create directory " + _baseDir + " for SegmentProcessor");
}
Preconditions.checkState(_baseDir.mkdirs(), "Failed to create base directory: %s for SegmentProcessor", _baseDir);
_mapperInputDir = new File(_baseDir, "mapper_input");
if (!_mapperInputDir.mkdirs()) {
throw new RuntimeException("Failed to create directory " + _mapperInputDir + " for SegmentProcessor");
}
Preconditions
.checkState(_mapperInputDir.mkdirs(), "Failed to create mapper input directory: %s for SegmentProcessor",
_mapperInputDir);
_mapperOutputDir = new File(_baseDir, "mapper_output");
if (!_mapperOutputDir.mkdirs()) {
throw new RuntimeException("Failed to create directory " + _mapperOutputDir + " for SegmentProcessor");
}
Preconditions
.checkState(_mapperOutputDir.mkdirs(), "Failed to create mapper output directory: %s for SegmentProcessor",
_mapperOutputDir);
_reducerOutputDir = new File(_baseDir, "reducer_output");
if (!_reducerOutputDir.mkdirs()) {
throw new RuntimeException("Failed to create directory " + _reducerOutputDir + " for SegmentProcessor");
}
Preconditions
.checkState(_reducerOutputDir.mkdirs(), "Failed to create reducer output directory: %s for SegmentProcessor",
_reducerOutputDir);
}

/**
Expand Down Expand Up @@ -143,9 +130,10 @@ public void processSegments()
}

// Set mapperId as the name of the segment
SegmentMapperConfig mapperConfig = new SegmentMapperConfig(mapperInput.getName(), _pinotSchema,
_segmentProcessorConfig.getRecordTransformerConfig(), _segmentProcessorConfig.getPartitioningConfig());
SegmentMapper mapper = new SegmentMapper(mapperInput, mapperConfig, _mapperOutputDir);
SegmentMapperConfig mapperConfig =
new SegmentMapperConfig(_pinotSchema, _segmentProcessorConfig.getRecordTransformerConfig(),
_segmentProcessorConfig.getPartitioningConfig());
SegmentMapper mapper = new SegmentMapper(mapperInput.getName(), mapperInput, mapperConfig, _mapperOutputDir);
mapper.map();
mapper.cleanup();
}
Expand All @@ -164,9 +152,9 @@ public void processSegments()

// Set partition as reducerId
SegmentReducerConfig reducerConfig =
new SegmentReducerConfig(partDir.getName(), _pinotSchema, _segmentProcessorConfig.getCollectorConfig(),
new SegmentReducerConfig(_pinotSchema, _segmentProcessorConfig.getCollectorConfig(),
_segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment());
SegmentReducer reducer = new SegmentReducer(partDir, reducerConfig, _reducerOutputDir);
SegmentReducer reducer = new SegmentReducer(partDir.getName(), partDir, reducerConfig, _reducerOutputDir);
reducer.reduce();
reducer.cleanup();
}
Expand Down Expand Up @@ -203,48 +191,4 @@ public void processSegments()
public void cleanup() {
FileUtils.deleteQuietly(_baseDir);
}

public static void main(String[] args)
throws IOException {
TableConfig tableConfig = JsonUtils
.fileToObject(new File("/Users/npawar/quick_start_configs/segment_processing_framework/offline.json"),
TableConfig.class);
Schema schema =
Schema.fromFile(new File("/Users/npawar/quick_start_configs/segment_processing_framework/schema.json"));
File inputSegments = new File("/Users/npawar/quick_start_configs/segment_processing_framework/segments");
File outputSegments = new File("/Users/npawar/quick_start_configs/segment_processing_framework/output");
FileUtils.deleteQuietly(outputSegments);
outputSegments.mkdirs();

Map<String, String> transformFunctionsMap = new HashMap<>();
transformFunctionsMap.put("timeValue", "round(timeValue, 86400000)");
RecordTransformerConfig transformerConfig =
new RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionsMap).build();
PartitioningConfig partitioningConfig =
new PartitioningConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
.setColumnName("timeValue").setFilterFunction("Groovy({arg0 != \"1597795200000\"}, arg0)").build();
SegmentConfig segmentConfig = new SegmentConfig.Builder().setMaxNumRecordsPerSegment(4).build();
Map<String, ValueAggregatorFactory.ValueAggregatorType> valueAggregatorsMap = new HashMap<>();
valueAggregatorsMap.put("clicks", ValueAggregatorFactory.ValueAggregatorType.MAX);
CollectorConfig collectorConfig =
new CollectorConfig.Builder().setCollectorType(CollectorFactory.CollectorType.ROLLUP)
.setAggregatorTypeMap(valueAggregatorsMap).build();
SegmentProcessorConfig segmentProcessorConfig =
new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
.setRecordTransformerConfig(transformerConfig).setPartitioningConfig(partitioningConfig)
.setCollectorConfig(collectorConfig).setSegmentConfig(segmentConfig).build();
System.out.println(segmentProcessorConfig);

SegmentProcessorFramework framework = null;
try {
framework = new SegmentProcessorFramework(inputSegments, segmentProcessorConfig, outputSegments);
framework.processSegments();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (framework != null) {
framework.cleanup();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.pinot.core.segment.processing.collector.Collector;
import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
Expand Down Expand Up @@ -59,11 +58,12 @@ public class SegmentReducer {
private final Collector _collector;
private final int _numRecordsPerPart;

public SegmentReducer(File reducerInputDir, SegmentReducerConfig reducerConfig, File reducerOutputDir) {
public SegmentReducer(String reducerId, File reducerInputDir, SegmentReducerConfig reducerConfig,
File reducerOutputDir) {
_reducerInputDir = reducerInputDir;
_reducerOutputDir = reducerOutputDir;

_reducerId = reducerConfig.getReducerId();
_reducerId = reducerId;
_pinotSchema = reducerConfig.getPinotSchema();
_avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(_pinotSchema);
_collector = CollectorFactory.getCollector(reducerConfig.getCollectorConfig(), _pinotSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,17 @@
*/
public class SegmentReducerConfig {

private final String _reducerId;
private final Schema _pinotSchema;
private final int _numRecordsPerPart;
private final CollectorConfig _collectorConfig;

public SegmentReducerConfig(String reducerId, Schema pinotSchema, CollectorConfig collectorConfig,
public SegmentReducerConfig(Schema pinotSchema, CollectorConfig collectorConfig,
int numRecordsPerPart) {
_reducerId = reducerId;
_pinotSchema = pinotSchema;
_numRecordsPerPart = numRecordsPerPart;
_collectorConfig = collectorConfig;
}

/**
* Reducer id. Each reducer should have a unique id for a run of the Segment Processor Framework
*/
public String getReducerId() {
return _reducerId;
}

/**
* The Pinot schema
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pinot.core.segment.processing.partitioner;

/**
* Filter for partitions
* Used for filtering partitions in the mapper
*/
public interface PartitionFilter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.segment.processing.framework;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down
Loading