Skip to content

Commit

Permalink
[BugFix] Compression type of csv file (#35094)
Browse files Browse the repository at this point in the history
Why I'm doing:
The compression type of csv file may be various, like gz, bz2, etc. In a load process, the csv format not only needs the format info, but also needs the compression type. The compression type is obtained from the filename suffix.

What I'm doing:
This PR fixes the compression type of csv file.
Fixes https://github.com/StarRocks/StarRocksTest/issues/4749

Signed-off-by: ricky <rickif@qq.com>
  • Loading branch information
rickif authored Nov 15, 2023
1 parent fc28e70 commit 6a3447c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,11 +542,10 @@ private TFileFormatType formatType(String fileFormat, String path) {
return TFileFormatType.FORMAT_PARQUET;
} else if (fileFormat.toLowerCase().equals("orc")) {
return TFileFormatType.FORMAT_ORC;
} else if (fileFormat.toLowerCase().equals("csv")) {
return TFileFormatType.FORMAT_CSV_PLAIN;
} else if (fileFormat.toLowerCase().equals("json")) {
return TFileFormatType.FORMAT_JSON;
}
// Attention: The compression type of csv format is from the suffix of filename.
}

String lowerCasePath = path.toLowerCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.starrocks.system.SystemInfoService;
import com.starrocks.thrift.TBrokerFileStatus;
import com.starrocks.thrift.TBrokerRangeDesc;
import com.starrocks.thrift.TFileFormatType;
import com.starrocks.thrift.TScanRangeLocations;
import com.starrocks.thrift.TUniqueId;
import mockit.Expectations;
Expand Down Expand Up @@ -392,5 +393,52 @@ public void testCreateScanRangeLocations(@Mocked GlobalStateMgr globalStateMgr,
rangeDescs = locationsList.get(0).scan_range.broker_scan_range.ranges;
Assert.assertEquals(1, rangeDescs.size());
Assert.assertEquals(0, rangeDescs.get(0).size);

// case 6
// csv file compression type
// result: CSV_PLAIN, CSV_GZ, CSV_BZ2, CSV_LZ4, CSV_DFLATE, CSV_ZSTD

// file groups
fileGroups = Lists.newArrayList();
files = Lists.newArrayList("hdfs://127.0.0.1:9001/file1", "hdfs://127.0.0.1:9001/file2.csv",
"hdfs://127.0.0.1:9001/file3.gz", "hdfs://127.0.0.1:9001/file4.bz2", "hdfs://127.0.0.1:9001/file5.lz4", "hdfs://127.0.0.1:9001/file6.deflate", "hdfs://127.0.0.1:9001/file7.zst");
desc =
new DataDescription("testTable", null, files, columnNames, null, null, "csv", false, null);
brokerFileGroup = new BrokerFileGroup(desc);
Deencapsulation.setField(brokerFileGroup, "columnSeparator", "\t");
Deencapsulation.setField(brokerFileGroup, "rowDelimiter", "\n");
Deencapsulation.setField(brokerFileGroup, "fileFormat", "csv");
fileGroups.add(brokerFileGroup);

// file status
fileStatusesList = Lists.newArrayList();
fileStatusList = Lists.newArrayList();
for (String file : files) {
fileStatusList.add(new TBrokerFileStatus(file, false, 1024, true));
}
fileStatusesList.add(fileStatusList);

analyzer = new Analyzer(GlobalStateMgr.getCurrentState(), new ConnectContext());
descTable = analyzer.getDescTbl();
tupleDesc = descTable.createTupleDescriptor("DestTableTuple");
scanNode = new FileScanNode(new PlanNodeId(0), tupleDesc, "FileScanNode", fileStatusesList, 2);
scanNode.setLoadInfo(jobId, txnId, table, brokerDesc, fileGroups, true, loadParallelInstanceNum);
scanNode.init(analyzer);
scanNode.finalizeStats(analyzer);

// check
locationsList = scanNode.getScanRangeLocations(0);
Assert.assertEquals(1, locationsList.size());


Assert.assertEquals(7, locationsList.get(0).scan_range.broker_scan_range.ranges.size());

Assert.assertEquals(TFileFormatType.FORMAT_CSV_PLAIN, locationsList.get(0).scan_range.broker_scan_range.ranges.get(0).format_type);
Assert.assertEquals(TFileFormatType.FORMAT_CSV_PLAIN, locationsList.get(0).scan_range.broker_scan_range.ranges.get(1).format_type);
Assert.assertEquals(TFileFormatType.FORMAT_CSV_GZ, locationsList.get(0).scan_range.broker_scan_range.ranges.get(2).format_type);
Assert.assertEquals(TFileFormatType.FORMAT_CSV_BZ2, locationsList.get(0).scan_range.broker_scan_range.ranges.get(3).format_type);
Assert.assertEquals(TFileFormatType.FORMAT_CSV_LZ4_FRAME, locationsList.get(0).scan_range.broker_scan_range.ranges.get(4).format_type);
Assert.assertEquals(TFileFormatType.FORMAT_CSV_DEFLATE, locationsList.get(0).scan_range.broker_scan_range.ranges.get(5).format_type);
Assert.assertEquals(TFileFormatType.FORMAT_CSV_ZSTD, locationsList.get(0).scan_range.broker_scan_range.ranges.get(6).format_type);
}
}
}

0 comments on commit 6a3447c

Please sign in to comment.