Skip to content

Commit

Permalink
[Improve](tvf)jni-avro support split file (apache#27933)
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 authored and stephen committed Dec 28, 2023
1 parent 0b4d1e2 commit e8b3894
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 15 deletions.
4 changes: 4 additions & 0 deletions be/src/vec/exec/format/avro/avro_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ Status AvroJNIReader::init_fetch_table_reader(
if (type == TFileType::FILE_S3) {
required_param.insert(_params.properties.begin(), _params.properties.end());
}
required_param.insert(
std::make_pair("split_start_offset", std::to_string(_range.start_offset)));
required_param.insert(std::make_pair("split_size", std::to_string(_range.size)));
required_param.insert(std::make_pair("split_file_size", std::to_string(_range.file_size)));
required_param.insert(std::make_pair("uri", _range.path));
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner",
required_param, column_names);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,37 @@ public static void invalidateFileCache(AvroFileCacheKey key) {
public static class AvroFileMeta {
private final String schema;
private Set<String> requiredFields;
// TODO split file
private String splitInfo;
private Long splitStartOffset;
private Long splitSize;

AvroFileMeta(String schema) {
this.schema = schema;
}

AvroFileMeta(String schema, String splitInfo) {
this.schema = schema;
this.splitInfo = splitInfo;
}

public String getSchema() {
return schema;
}

public String getSplitInfo() {
return splitInfo;
}

public void setRequiredFields(Set<String> requiredFields) {
this.requiredFields = requiredFields;
}

public void setSplitStartOffset(Long splitStartOffset) {
this.splitStartOffset = splitStartOffset;
}

public void setSplitSize(Long splitSize) {
this.splitSize = splitSize;
}

public Long getSplitStartOffset() {
return this.splitStartOffset;
}

public Long getSplitSize() {
return this.splitSize;
}

public Set<String> getRequiredFields() {
return requiredFields;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class AvroJNIScanner extends JniScanner {
private AvroFileMeta avroFileMeta;
private AvroWrapper<Pair<Integer, Long>> inputPair;
private NullWritable ignore;
private Long splitStartOffset;
private Long splitSize;
private Long splitFileSize;

/**
* Call by JNI for get table data or get table schema
Expand All @@ -100,6 +103,9 @@ public AvroJNIScanner(int fetchSize, Map<String, String> requiredParams) {
this.fieldInspectors = new ObjectInspector[requiredFields.length];
this.inputPair = new AvroWrapper<>(null);
this.ignore = NullWritable.get();
this.splitStartOffset = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_START_OFFSET));
this.splitSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_SIZE));
this.splitFileSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_FILE_SIZE));
}
}

Expand Down Expand Up @@ -171,6 +177,8 @@ private void initDataReader() {
avroFileCacheKey = new AvroFileCacheKey(fileType.name(), uri);
avroFileMeta = AvroFileCache.getAvroFileMeta(avroFileCacheKey);
avroFileMeta.setRequiredFields(requiredFieldSet);
avroFileMeta.setSplitStartOffset(splitStartOffset);
avroFileMeta.setSplitSize(splitSize);
initFieldInspector();
initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,8 @@ public class AvroProperties {
protected static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
protected static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
protected static final String FS_S3A_REGION = "fs.s3a.region";
protected static final String SPLIT_START_OFFSET = "split_start_offset";
protected static final String SPLIT_SIZE = "split_size";
protected static final String SPLIT_FILE_SIZE = "split_file_size";

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.avro.mapred.AvroRecordReader;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
Expand Down Expand Up @@ -72,9 +71,7 @@ protected void openSchemaReader() throws IOException {
protected void openDataReader(AvroFileMeta avroFileMeta) throws IOException {
JobConf job = new JobConf();
projectionSchema(job, avroFileMeta);
FileStatus fileStatus = fileSystem.getFileStatus(path);
// TODO split file
FileSplit fileSplit = new FileSplit(path, 0, fileStatus.getLen(), job);
FileSplit fileSplit = new FileSplit(path, avroFileMeta.getSplitStartOffset(), avroFileMeta.getSplitSize(), job);
dataReader = new AvroRecordReader<>(job, fileSplit);
LOG.debug("success open avro data reader.");
}
Expand Down

0 comments on commit e8b3894

Please sign in to comment.