Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@
*/
package org.apache.pinot.plugin.inputformat.csv;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
Expand All @@ -33,17 +41,31 @@
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Record reader for CSV file.
*/
@NotThreadSafe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does other record reader has this annotation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they do not. I have added this as a good practice. This is purely for documentation.

public class CSVRecordReader implements RecordReader {
private static final Logger LOGGER = LoggerFactory.getLogger(CSVRecordReader.class);

private File _dataFile;
private CSVFormat _format;
private CSVParser _parser;
private Iterator<CSVRecord> _iterator;
private CSVRecordExtractor _recordExtractor;
private Map<String, Integer> _headerMap = new HashMap<>();

// line iterator specific variables
private boolean _useLineIterator = false;
private boolean _skipHeaderRecord = false;
private boolean _isHeaderProvided = false;
private long _skippedLinesCount;
private BufferedReader _bufferedReader;
private String _nextLine;

public CSVRecordReader() {
}
Expand Down Expand Up @@ -83,20 +105,33 @@ public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable Re
}
char delimiter = config.getDelimiter();
format = format.withDelimiter(delimiter);

if (config.isSkipUnParseableLines()) {
_useLineIterator = true;
}

String csvHeader = config.getHeader();
if (csvHeader == null) {
format = format.withHeader();
} else {
//validate header for the delimiter before splitting
validateHeaderForDelimiter(delimiter, csvHeader, format);
format = format.withHeader(StringUtils.split(csvHeader, delimiter));
// do not validate header if using the line iterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there's no harm in running validateHeaderForDelimiter? It's reading the first line and check the delimiter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following problems exist with the current validation.

  1. It calls iterator.hasNext() which is what we are trying to avoid in the first place
  2. It checks if the record has multiple values. User can pass in header and the record can have a single line which is valid. Hence, this check is not valid.
  3. It checks if delimiter is present in header. This is also not valid if the file is like this.
id
100

As all these checks are harmful, I have not made use of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in that case, we should change the validation code. My concern is more on the feature parity. Running validation on the existing approach while not running it on line iterator will give us inconsistent behavior.

if (_useLineIterator) {
String[] header = StringUtils.split(csvHeader, delimiter);
setHeaderMap(header);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that the header will be in good format. Why we are not doing the validation in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yet to determine what would be the correct validation for a header line. Existing validation does not look correct.

format = format.withHeader(header);
_isHeaderProvided = true;
} else {
// validate header for the delimiter before splitting
validateHeaderForDelimiter(delimiter, csvHeader, format);
format = format.withHeader(StringUtils.split(csvHeader, delimiter));
}
}

format = format.withSkipHeaderRecord(config.isSkipHeader());
_skipHeaderRecord = config.isSkipHeader();
format = format.withCommentMarker(config.getCommentMarker());
format = format.withEscape(config.getEscapeCharacter());
format = format.withIgnoreEmptyLines(config.isIgnoreEmptyLines());
format = format.withIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces());
format = format.withSkipHeaderRecord(config.isSkipHeader());
format = format.withQuote(config.getQuoteCharacter());

if (config.getQuoteMode() != null) {
Expand All @@ -123,7 +158,7 @@ public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable Re

CSVRecordExtractorConfig recordExtractorConfig = new CSVRecordExtractorConfig();
recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter);
recordExtractorConfig.setColumnNames(_parser.getHeaderMap().keySet());
recordExtractorConfig.setColumnNames(_headerMap.keySet());
_recordExtractor.init(fieldsToRead, recordExtractorConfig);
}

Expand All @@ -147,7 +182,12 @@ private boolean delimiterNotPresentInHeader(char delimiter, String csvHeader) {

private void init()
throws IOException {
if (_useLineIterator) {
initLineIteratorResources();
return;
}
_parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
_headerMap = _parser.getHeaderMap();
_iterator = _parser.iterator();
}

Expand All @@ -161,36 +201,140 @@ private void init()
public Map<String, Integer> getCSVHeaderMap() {
// if header row is not configured and input file doesn't contain a valid header record, the returned map would
// contain values from the first row in the input file.
return _parser.getHeaderMap();
return _headerMap;
}

@Override
public boolean hasNext() {
if (_useLineIterator) {
// When line iterator is used, the call to this method won't throw an exception. The default and the only iterator
// from commons-csv library can throw an exception upon calling the hasNext() method. The line iterator overcomes
// this limitation.
return _nextLine != null;
}
return _iterator.hasNext();
}

@Override
public GenericRow next() {
public GenericRow next()
throws IOException {
return next(new GenericRow());
}

@Override
public GenericRow next(GenericRow reuse) {
CSVRecord record = _iterator.next();
_recordExtractor.extract(record, reuse);
public GenericRow next(GenericRow reuse)
throws IOException {
if (_useLineIterator) {
readNextLine(reuse);
} else {
CSVRecord record = _iterator.next();
_recordExtractor.extract(record, reuse);
}
return reuse;
}

@Override
public void rewind()
throws IOException {
_parser.close();
if (_useLineIterator) {
resetLineIteratorResources();
}

if (_parser != null && !_parser.isClosed()) {
_parser.close();
}

init();
}

@Override
public void close()
throws IOException {
_parser.close();
if (_useLineIterator) {
resetLineIteratorResources();
}

if (_parser != null && !_parser.isClosed()) {
_parser.close();
}
}

private void readNextLine(GenericRow reuse)
throws IOException {
while (_nextLine != null) {
try (Reader reader = new StringReader(_nextLine)) {
try (CSVParser csvParser = _format.parse(reader)) {
List<CSVRecord> csvRecords = csvParser.getRecords();
if (csvRecords != null && csvRecords.size() > 0) {
// There would be only one record as lines are read one after the other
CSVRecord record = csvRecords.get(0);
_recordExtractor.extract(record, reuse);
break;
} else {
// Can be thrown on: 1) Empty lines 2) Commented lines
throw new NoSuchElementException("Failed to find any records");
}
} catch (Exception e) {
_skippedLinesCount++;
LOGGER.debug("Skipped input line: {} from file: {}", _nextLine, _dataFile, e);
// Find the next line that can be parsed
_nextLine = _bufferedReader.readLine();
}
}
}
if (_nextLine != null) {
// Advance the pointer to the next line for future reading
_nextLine = _bufferedReader.readLine();
} else {
throw new RuntimeException("No more parseable lines. Line iterator reached end of file.");
}
}

private void setHeaderMap(String[] header) {
int columnPos = 0;
for (String columnName : header) {
_headerMap.put(columnName, columnPos++);
}
}

private void initLineIteratorResources()
throws IOException {
_bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 * 32); // 32KB buffer size

// When header is supplied by the client
if (_isHeaderProvided) {
if (_skipHeaderRecord) {
// When skip header config is set and header is supplied – skip the first line from the input file
_bufferedReader.readLine();
// turn off the property so that it doesn't interfere with further parsing
_format = _format.withSkipHeaderRecord(false);
}
_nextLine = _bufferedReader.readLine();
} else {
// Read the first line and set the header
String headerLine = _bufferedReader.readLine();
String[] header = StringUtils.split(headerLine, _format.getDelimiter());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use the same approach as the regular csv for reading the header? I don't think that we need the custom handling here? (header is anyway the first line of the file. So, we can first consume header using the shared code and then diverge?)

Otherwise, I would see different behavior for parsing the header in some edge cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt if the regular csv parser respects this config option at all. I see it marked as TODO in the library version that we are using.
image

The line iterator respects this property and following is the behavior:

  1. If header is supplied by the client and skip header is not set – it is considered that the input file has just data records.
  2. if header is supplied by the client and skip header is set – it is considered that the input file contains a header record but the user wishes to override it.
  3. if header is not provided – first line is considered as the header.

Copy link
Contributor

@snleee snleee Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's investigate the following and address in the follow-up PR.

  • check if newer library version has the proper wiring for these options
  • we should keep the same behavior across 2 approaches.
  • do the same investigation on validateHeaderForDelimiter
  • add the unit tests for these configs (skip header)

setHeaderMap(header);
_format = _format.withHeader(header);
_nextLine = _bufferedReader.readLine();
}
}

private void resetLineIteratorResources()
throws IOException {
_nextLine = null;

LOGGER.info("Total lines skipped in file: {} were: {}", _dataFile, _skippedLinesCount);
_skippedLinesCount = 0;

// if header is not provided by the client it would be rebuilt. When it's provided by the client it's initialized
// once in the constructor
if (!_isHeaderProvided) {
_headerMap.clear();
}

if (_bufferedReader != null) {
_bufferedReader.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class CSVRecordReaderConfig implements RecordReaderConfig {
private Character _escapeCharacter; // Default is null
private String _nullStringValue;
private boolean _skipHeader;
private boolean _skipUnParseableLines = false;
private boolean _ignoreEmptyLines = true;
private boolean _ignoreSurroundingSpaces = true;
private Character _quoteCharacter = '"';
Expand Down Expand Up @@ -76,6 +77,14 @@ public void setMultiValueDelimiter(char multiValueDelimiter) {
_multiValueDelimiter = multiValueDelimiter;
}

public boolean isSkipUnParseableLines() {
return _skipUnParseableLines;
}

public void setSkipUnParseableLines(boolean skipUnParseableLines) {
_skipUnParseableLines = skipUnParseableLines;
}

public boolean isMultiValueDelimiterEnabled() {
return _multiValueDelimiterEnabled;
}
Expand Down
Loading