-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Added support to skip unparseable records in the csv record reader #11487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,12 +18,20 @@ | |
| */ | ||
| package org.apache.pinot.plugin.inputformat.csv; | ||
|
|
||
| import java.io.BufferedReader; | ||
| import java.io.File; | ||
| import java.io.FileReader; | ||
| import java.io.IOException; | ||
| import java.io.Reader; | ||
| import java.io.StringReader; | ||
| import java.util.HashMap; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.NoSuchElementException; | ||
| import java.util.Set; | ||
| import javax.annotation.Nullable; | ||
| import javax.annotation.concurrent.NotThreadSafe; | ||
| import org.apache.commons.csv.CSVFormat; | ||
| import org.apache.commons.csv.CSVParser; | ||
| import org.apache.commons.csv.CSVRecord; | ||
|
|
@@ -33,17 +41,31 @@ | |
| import org.apache.pinot.spi.data.readers.RecordReader; | ||
| import org.apache.pinot.spi.data.readers.RecordReaderConfig; | ||
| import org.apache.pinot.spi.data.readers.RecordReaderUtils; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
||
| /** | ||
| * Record reader for CSV file. | ||
| */ | ||
| @NotThreadSafe | ||
| public class CSVRecordReader implements RecordReader { | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(CSVRecordReader.class); | ||
|
|
||
| private File _dataFile; | ||
| private CSVFormat _format; | ||
| private CSVParser _parser; | ||
| private Iterator<CSVRecord> _iterator; | ||
| private CSVRecordExtractor _recordExtractor; | ||
| private Map<String, Integer> _headerMap = new HashMap<>(); | ||
|
|
||
| // line iterator specific variables | ||
| private boolean _useLineIterator = false; | ||
| private boolean _skipHeaderRecord = false; | ||
| private boolean _isHeaderProvided = false; | ||
| private long _skippedLinesCount; | ||
| private BufferedReader _bufferedReader; | ||
| private String _nextLine; | ||
|
|
||
| public CSVRecordReader() { | ||
| } | ||
|
|
@@ -83,20 +105,33 @@ public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable Re | |
| } | ||
| char delimiter = config.getDelimiter(); | ||
| format = format.withDelimiter(delimiter); | ||
|
|
||
| if (config.isSkipUnParseableLines()) { | ||
| _useLineIterator = true; | ||
| } | ||
|
|
||
| String csvHeader = config.getHeader(); | ||
| if (csvHeader == null) { | ||
| format = format.withHeader(); | ||
| } else { | ||
| //validate header for the delimiter before splitting | ||
| validateHeaderForDelimiter(delimiter, csvHeader, format); | ||
| format = format.withHeader(StringUtils.split(csvHeader, delimiter)); | ||
| // do not validate header if using the line iterator | ||
|
||
| if (_useLineIterator) { | ||
| String[] header = StringUtils.split(csvHeader, delimiter); | ||
| setHeaderMap(header); | ||
|
||
| format = format.withHeader(header); | ||
| _isHeaderProvided = true; | ||
| } else { | ||
| // validate header for the delimiter before splitting | ||
| validateHeaderForDelimiter(delimiter, csvHeader, format); | ||
| format = format.withHeader(StringUtils.split(csvHeader, delimiter)); | ||
| } | ||
| } | ||
|
|
||
| format = format.withSkipHeaderRecord(config.isSkipHeader()); | ||
| _skipHeaderRecord = config.isSkipHeader(); | ||
| format = format.withCommentMarker(config.getCommentMarker()); | ||
| format = format.withEscape(config.getEscapeCharacter()); | ||
| format = format.withIgnoreEmptyLines(config.isIgnoreEmptyLines()); | ||
| format = format.withIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces()); | ||
| format = format.withSkipHeaderRecord(config.isSkipHeader()); | ||
| format = format.withQuote(config.getQuoteCharacter()); | ||
|
|
||
| if (config.getQuoteMode() != null) { | ||
|
|
@@ -123,7 +158,7 @@ public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable Re | |
|
|
||
| CSVRecordExtractorConfig recordExtractorConfig = new CSVRecordExtractorConfig(); | ||
| recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter); | ||
| recordExtractorConfig.setColumnNames(_parser.getHeaderMap().keySet()); | ||
| recordExtractorConfig.setColumnNames(_headerMap.keySet()); | ||
| _recordExtractor.init(fieldsToRead, recordExtractorConfig); | ||
| } | ||
|
|
||
|
|
@@ -147,7 +182,12 @@ private boolean delimiterNotPresentInHeader(char delimiter, String csvHeader) { | |
|
|
||
| private void init() | ||
| throws IOException { | ||
| if (_useLineIterator) { | ||
| initLineIteratorResources(); | ||
| return; | ||
| } | ||
| _parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile)); | ||
| _headerMap = _parser.getHeaderMap(); | ||
| _iterator = _parser.iterator(); | ||
| } | ||
|
|
||
|
|
@@ -161,36 +201,140 @@ private void init() | |
| public Map<String, Integer> getCSVHeaderMap() { | ||
| // if header row is not configured and input file doesn't contain a valid header record, the returned map would | ||
| // contain values from the first row in the input file. | ||
| return _parser.getHeaderMap(); | ||
| return _headerMap; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean hasNext() { | ||
| if (_useLineIterator) { | ||
| // When line iterator is used, the call to this method won't throw an exception. The default and the only iterator | ||
| // from commons-csv library can throw an exception upon calling the hasNext() method. The line iterator overcomes | ||
| // this limitation. | ||
| return _nextLine != null; | ||
| } | ||
| return _iterator.hasNext(); | ||
| } | ||
|
|
||
| @Override | ||
| public GenericRow next() { | ||
| public GenericRow next() | ||
| throws IOException { | ||
| return next(new GenericRow()); | ||
| } | ||
|
|
||
| @Override | ||
| public GenericRow next(GenericRow reuse) { | ||
| CSVRecord record = _iterator.next(); | ||
| _recordExtractor.extract(record, reuse); | ||
| public GenericRow next(GenericRow reuse) | ||
| throws IOException { | ||
| if (_useLineIterator) { | ||
| readNextLine(reuse); | ||
| } else { | ||
| CSVRecord record = _iterator.next(); | ||
| _recordExtractor.extract(record, reuse); | ||
| } | ||
| return reuse; | ||
| } | ||
|
|
||
| @Override | ||
| public void rewind() | ||
| throws IOException { | ||
| _parser.close(); | ||
| if (_useLineIterator) { | ||
| resetLineIteratorResources(); | ||
| } | ||
|
|
||
| if (_parser != null && !_parser.isClosed()) { | ||
| _parser.close(); | ||
| } | ||
|
|
||
| init(); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() | ||
| throws IOException { | ||
| _parser.close(); | ||
| if (_useLineIterator) { | ||
| resetLineIteratorResources(); | ||
snleee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| if (_parser != null && !_parser.isClosed()) { | ||
| _parser.close(); | ||
| } | ||
| } | ||
|
|
||
| private void readNextLine(GenericRow reuse) | ||
| throws IOException { | ||
| while (_nextLine != null) { | ||
| try (Reader reader = new StringReader(_nextLine)) { | ||
| try (CSVParser csvParser = _format.parse(reader)) { | ||
| List<CSVRecord> csvRecords = csvParser.getRecords(); | ||
| if (csvRecords != null && csvRecords.size() > 0) { | ||
| // There would be only one record as lines are read one after the other | ||
| CSVRecord record = csvRecords.get(0); | ||
| _recordExtractor.extract(record, reuse); | ||
| break; | ||
| } else { | ||
| // Can be thrown on: 1) Empty lines 2) Commented lines | ||
| throw new NoSuchElementException("Failed to find any records"); | ||
| } | ||
| } catch (Exception e) { | ||
| _skippedLinesCount++; | ||
| LOGGER.debug("Skipped input line: {} from file: {}", _nextLine, _dataFile, e); | ||
| // Find the next line that can be parsed | ||
| _nextLine = _bufferedReader.readLine(); | ||
snleee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| } | ||
| if (_nextLine != null) { | ||
| // Advance the pointer to the next line for future reading | ||
| _nextLine = _bufferedReader.readLine(); | ||
| } else { | ||
| throw new RuntimeException("No more parseable lines. Line iterator reached end of file."); | ||
| } | ||
| } | ||
|
|
||
| private void setHeaderMap(String[] header) { | ||
| int columnPos = 0; | ||
| for (String columnName : header) { | ||
| _headerMap.put(columnName, columnPos++); | ||
| } | ||
| } | ||
|
|
||
| private void initLineIteratorResources() | ||
| throws IOException { | ||
| _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 * 32); // 32KB buffer size | ||
snleee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // When header is supplied by the client | ||
| if (_isHeaderProvided) { | ||
| if (_skipHeaderRecord) { | ||
| // When skip header config is set and header is supplied – skip the first line from the input file | ||
| _bufferedReader.readLine(); | ||
| // turn off the property so that it doesn't interfere with further parsing | ||
| _format = _format.withSkipHeaderRecord(false); | ||
| } | ||
| _nextLine = _bufferedReader.readLine(); | ||
| } else { | ||
| // Read the first line and set the header | ||
| String headerLine = _bufferedReader.readLine(); | ||
| String[] header = StringUtils.split(headerLine, _format.getDelimiter()); | ||
|
||
| setHeaderMap(header); | ||
| _format = _format.withHeader(header); | ||
| _nextLine = _bufferedReader.readLine(); | ||
| } | ||
| } | ||
|
|
||
| private void resetLineIteratorResources() | ||
| throws IOException { | ||
| _nextLine = null; | ||
|
|
||
| LOGGER.info("Total lines skipped in file: {} were: {}", _dataFile, _skippedLinesCount); | ||
| _skippedLinesCount = 0; | ||
|
|
||
| // if header is not provided by the client it would be rebuilt. When it's provided by the client it's initialized | ||
| // once in the constructor | ||
| if (!_isHeaderProvided) { | ||
| _headerMap.clear(); | ||
| } | ||
|
|
||
| if (_bufferedReader != null) { | ||
| _bufferedReader.close(); | ||
| } | ||
| } | ||
| } | ||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does other record reader has this annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, they do not. I have added this as a good practice. This is purely for documentation.