Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 45 additions & 13 deletions cpp/src/arrow/csv/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,24 @@ using detail::ParsedValueDesc;

namespace {

Status ParseError(const char* message) {
return Status::Invalid("CSV parse error: ", message);
template <typename... Args>
Status ParseError(Args&&... args) {
return Status::Invalid("CSV parse error: ", std::forward<Args>(args)...);
}

Status MismatchingColumns(int32_t expected, int32_t actual) {
char s[50];
snprintf(s, sizeof(s), "Expected %d columns, got %d", expected, actual);
return ParseError(s);
Status MismatchingColumns(int32_t expected, int32_t actual, int64_t row_num,
util::string_view row) {
std::string ellipse;
if (row.length() > 100) {
row = row.substr(0, 96);
ellipse = " ...";
}
if (row_num < 0) {
return ParseError("Expected ", expected, " columns, got ", actual, ": ", row,
ellipse);
}
return ParseError("Row #", row_num, ": Expected ", expected, " columns, got ", actual,
": ", row, ellipse);
}

inline bool IsControlChar(uint8_t c) { return c < ' '; }
Expand Down Expand Up @@ -173,17 +183,24 @@ class PresizedValueDescWriter : public ValueDescWriter<PresizedValueDescWriter>
class BlockParserImpl {
public:
BlockParserImpl(MemoryPool* pool, ParseOptions options, int32_t num_cols,
int32_t max_num_rows)
: pool_(pool), options_(options), max_num_rows_(max_num_rows), batch_(num_cols) {}
int64_t first_row, int32_t max_num_rows)
: pool_(pool),
options_(options),
first_row_(first_row),
max_num_rows_(max_num_rows),
batch_(num_cols) {}

const DataBatch& parsed_batch() const { return batch_; }

int64_t first_row_num() const { return first_row_; }

template <typename SpecializedOptions, typename ValueDescWriter, typename DataWriter>
Status ParseLine(ValueDescWriter* values_writer, DataWriter* parsed_writer,
const char* data, const char* data_end, bool is_final,
const char** out_data) {
int32_t num_cols = 0;
char c;
const auto start = data;

DCHECK_GT(data_end, data);

Expand Down Expand Up @@ -299,7 +316,17 @@ class BlockParserImpl {
if (batch_.num_cols_ == -1) {
batch_.num_cols_ = num_cols;
} else {
return MismatchingColumns(batch_.num_cols_, num_cols);
// Find the end of the line without newline or carriage return
auto end = data;
if (*(end - 1) == '\n') {
--end;
}
if (*(end - 1) == '\r') {
--end;
}
return MismatchingColumns(batch_.num_cols_, num_cols,
first_row_ < 0 ? -1 : first_row_ + batch_.num_rows_,
util::string_view(start, end - start));
}
}
++batch_.num_rows_;
Expand Down Expand Up @@ -481,6 +508,7 @@ class BlockParserImpl {
protected:
MemoryPool* pool_;
const ParseOptions options_;
const int64_t first_row_;
// The maximum number of rows to parse from a block
int32_t max_num_rows_;

Expand All @@ -490,12 +518,14 @@ class BlockParserImpl {
DataBatch batch_;
};

BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int32_t max_num_rows)
: BlockParser(default_memory_pool(), options, num_cols, max_num_rows) {}
BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int64_t first_row,
int32_t max_num_rows)
: BlockParser(default_memory_pool(), options, num_cols, first_row, max_num_rows) {}

BlockParser::BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols,
int32_t max_num_rows)
: impl_(new BlockParserImpl(pool, std::move(options), num_cols, max_num_rows)) {}
int64_t first_row, int32_t max_num_rows)
: impl_(new BlockParserImpl(pool, std::move(options), num_cols, first_row,
max_num_rows)) {}

BlockParser::~BlockParser() {}

Expand All @@ -519,6 +549,8 @@ Status BlockParser::ParseFinal(util::string_view data, uint32_t* out_size) {

const DataBatch& BlockParser::parsed_batch() const { return impl_->parsed_batch(); }

int64_t BlockParser::first_row_num() const { return impl_->first_row_num(); }

int32_t SkipRows(const uint8_t* data, uint32_t size, int32_t num_rows,
const uint8_t** out_data) {
const auto end = data + size;
Expand Down
22 changes: 16 additions & 6 deletions cpp/src/arrow/csv/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,26 @@ class ARROW_EXPORT DataBatch {
uint32_t num_bytes() const { return parsed_size_; }

template <typename Visitor>
Status VisitColumn(int32_t col_index, Visitor&& visit) const {
Status VisitColumn(int32_t col_index, int64_t first_row, Visitor&& visit) const {
using detail::ParsedValueDesc;

int64_t row = first_row;
for (size_t buf_index = 0; buf_index < values_buffers_.size(); ++buf_index) {
const auto& values_buffer = values_buffers_[buf_index];
const auto values = reinterpret_cast<const ParsedValueDesc*>(values_buffer->data());
const auto max_pos =
static_cast<int32_t>(values_buffer->size() / sizeof(ParsedValueDesc)) - 1;
for (int32_t pos = col_index; pos < max_pos; pos += num_cols_) {
for (int32_t pos = col_index; pos < max_pos; pos += num_cols_, ++row) {
auto start = values[pos].offset;
auto stop = values[pos + 1].offset;
auto quoted = values[pos + 1].quoted;
ARROW_RETURN_NOT_OK(visit(parsed_ + start, stop - start, quoted));
Status status = visit(parsed_ + start, stop - start, quoted);
if (ARROW_PREDICT_FALSE(!status.ok())) {
if (first_row >= 0) {
status = status.WithMessage("Row #", row, ": ", status.message());
}
ARROW_RETURN_NOT_OK(status);
}
}
}
return Status::OK();
Expand Down Expand Up @@ -134,9 +141,9 @@ constexpr int32_t kMaxParserNumRows = 100000;
class ARROW_EXPORT BlockParser {
public:
explicit BlockParser(ParseOptions options, int32_t num_cols = -1,
int32_t max_num_rows = kMaxParserNumRows);
int64_t first_row = -1, int32_t max_num_rows = kMaxParserNumRows);
explicit BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols = -1,
int32_t max_num_rows = kMaxParserNumRows);
int64_t first_row = -1, int32_t max_num_rows = kMaxParserNumRows);
~BlockParser();

/// \brief Parse a block of data
Expand Down Expand Up @@ -167,14 +174,17 @@ class ARROW_EXPORT BlockParser {
int32_t num_cols() const { return parsed_batch().num_cols(); }
/// \brief Return the total size in bytes of parsed data
uint32_t num_bytes() const { return parsed_batch().num_bytes(); }
/// \brief Return the row number of the first row in the block or -1 if unsupported
int64_t first_row_num() const;

/// \brief Visit parsed values in a column
///
/// The signature of the visitor is
/// Status(const uint8_t* data, uint32_t size, bool quoted)
template <typename Visitor>
Status VisitColumn(int32_t col_index, Visitor&& visit) const {
return parsed_batch().VisitColumn(col_index, std::forward<Visitor>(visit));
return parsed_batch().VisitColumn(col_index, first_row_num(),
std::forward<Visitor>(visit));
}

template <typename Visitor>
Expand Down
71 changes: 64 additions & 7 deletions cpp/src/arrow/csv/parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <utility>
#include <vector>

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "arrow/csv/options.h"
Expand Down Expand Up @@ -295,7 +296,7 @@ TEST(BlockParser, Newlines) {

TEST(BlockParser, MaxNumRows) {
auto csv = MakeCSVData({"a\n", "b\n", "c\n", "d\n"});
BlockParser parser(ParseOptions::Defaults(), -1, 3 /* max_num_rows */);
BlockParser parser(ParseOptions::Defaults(), -1, 0, 3 /* max_num_rows */);

AssertParsePartial(parser, csv, 6);
AssertColumnsEq(parser, {{"a", "b", "c"}});
Expand Down Expand Up @@ -536,22 +537,37 @@ TEST(BlockParser, QuotesSpecial) {
TEST(BlockParser, MismatchingNumColumns) {
uint32_t out_size;
{
BlockParser parser(ParseOptions::Defaults());
BlockParser parser(ParseOptions::Defaults(), -1, 0 /* first_row */);
auto csv = MakeCSVData({"a,b\nc\n"});
Status st = Parse(parser, csv, &out_size);
ASSERT_RAISES(Invalid, st);
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid,
testing::HasSubstr("CSV parse error: Row #1: Expected 2 columns, got 1: c"), st);
}
{
BlockParser parser(ParseOptions::Defaults(), 2 /* num_cols */);
BlockParser parser(ParseOptions::Defaults(), 2 /* num_cols */, 0 /* first_row */);
auto csv = MakeCSVData({"a\n"});
Status st = Parse(parser, csv, &out_size);
ASSERT_RAISES(Invalid, st);
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid,
testing::HasSubstr("CSV parse error: Row #0: Expected 2 columns, got 1: a"), st);
}
{
BlockParser parser(ParseOptions::Defaults(), 2 /* num_cols */);
BlockParser parser(ParseOptions::Defaults(), 2 /* num_cols */, 50 /* first_row */);
auto csv = MakeCSVData({"a,b,c\n"});
Status st = Parse(parser, csv, &out_size);
ASSERT_RAISES(Invalid, st);
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid,
testing::HasSubstr("CSV parse error: Row #50: Expected 2 columns, got 3: a,b,c"),
st);
}
// No row number
{
BlockParser parser(ParseOptions::Defaults(), 2 /* num_cols */, -1);
auto csv = MakeCSVData({"a\n"});
Status st = Parse(parser, csv, &out_size);
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, testing::HasSubstr("CSV parse error: Expected 2 columns, got 1: a"), st);
}
}

Expand Down Expand Up @@ -623,5 +639,46 @@ TEST(BlockParser, QuotedEscape) {
}
}

TEST(BlockParser, RowNumberAppendedToError) {
auto options = ParseOptions::Defaults();
auto csv = "a,b,c\nd,e,f\ng,h,i\n";
{
BlockParser parser(options, -1, 0);
ASSERT_NO_FATAL_FAILURE(AssertParseOk(parser, csv));
int row = 0;
auto status = parser.VisitColumn(
0, [row](const uint8_t* data, uint32_t size, bool quoted) mutable -> Status {
return ++row == 2 ? Status::Invalid("Bad value") : Status::OK();
});
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("Row #1: Bad value"),
status);
}

{
BlockParser parser(options, -1, 100);
ASSERT_NO_FATAL_FAILURE(AssertParseOk(parser, csv));
int row = 0;
auto status = parser.VisitColumn(
0, [row](const uint8_t* data, uint32_t size, bool quoted) mutable -> Status {
return ++row == 3 ? Status::Invalid("Bad value") : Status::OK();
});
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("Row #102: Bad value"),
status);
}

// No first row specified should not append row information
{
BlockParser parser(options, -1, -1);
ASSERT_NO_FATAL_FAILURE(AssertParseOk(parser, csv));
int row = 0;
auto status = parser.VisitColumn(
0, [row](const uint8_t* data, uint32_t size, bool quoted) mutable -> Status {
return ++row == 3 ? Status::Invalid("Bad value") : Status::OK();
});
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::Not(testing::HasSubstr("Row")),
status);
}
}

} // namespace csv
} // namespace arrow
37 changes: 28 additions & 9 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,13 @@ class ReaderMixin {
public:
ReaderMixin(io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options)
const ConvertOptions& convert_options, bool count_rows)
: io_context_(std::move(io_context)),
read_options_(read_options),
parse_options_(parse_options),
convert_options_(convert_options),
count_rows_(count_rows),
num_rows_seen_(count_rows_ ? 1 : -1),
input_(std::move(input)) {}

protected:
Expand All @@ -344,11 +346,15 @@ class ReaderMixin {
" rows from CSV file, "
"either file is too short or header is larger than block size");
}
if (count_rows_) {
num_rows_seen_ = num_skipped_rows;
}
}

if (read_options_.column_names.empty()) {
// Parse one row (either to read column names or to know the number of columns)
BlockParser parser(io_context_.pool(), parse_options_, num_csv_cols_, 1);
BlockParser parser(io_context_.pool(), parse_options_, num_csv_cols_,
num_rows_seen_, 1);
uint32_t parsed_size = 0;
RETURN_NOT_OK(parser.Parse(
util::string_view(reinterpret_cast<const char*>(data), data_end - data),
Expand All @@ -374,6 +380,9 @@ class ReaderMixin {
DCHECK_EQ(static_cast<size_t>(parser.num_cols()), column_names_.size());
// Skip parsed header row
data += parsed_size;
if (count_rows_) {
++num_rows_seen_;
}
}
} else {
column_names_ = read_options_.column_names;
Expand Down Expand Up @@ -466,8 +475,8 @@ class ReaderMixin {
const std::shared_ptr<Buffer>& block, int64_t block_index,
bool is_final) {
static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
auto parser = std::make_shared<BlockParser>(io_context_.pool(), parse_options_,
num_csv_cols_, max_num_rows);
auto parser = std::make_shared<BlockParser>(
io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows);

std::shared_ptr<Buffer> straddling;
std::vector<util::string_view> views;
Expand All @@ -490,6 +499,9 @@ class ReaderMixin {
} else {
RETURN_NOT_OK(parser->Parse(views, &parsed_size));
}
if (count_rows_) {
num_rows_seen_ += parser->num_rows();
}
return ParseResult{std::move(parser), static_cast<int64_t>(parsed_size)};
}

Expand All @@ -500,6 +512,10 @@ class ReaderMixin {

// Number of columns in the CSV file
int32_t num_csv_cols_ = -1;
// Whether num_rows_seen_ tracks the number of rows seen in the CSV being parsed
bool count_rows_;
// Number of rows seen in the csv. Not used if count_rows is false
int64_t num_rows_seen_;
// Column names in the CSV file
std::vector<std::string> column_names_;
ConversionSchema conversion_schema_;
Expand Down Expand Up @@ -588,9 +604,9 @@ class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader {
BaseStreamingReader(io::IOContext io_context, Executor* cpu_executor,
std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options)
const ConvertOptions& convert_options, bool count_rows)
: ReaderMixin(io_context, std::move(input), read_options, parse_options,
convert_options),
convert_options, count_rows),
cpu_executor_(cpu_executor) {}

virtual Future<std::shared_ptr<csv::StreamingReader>> Init() = 0;
Expand Down Expand Up @@ -889,8 +905,9 @@ class AsyncThreadedTableReader
const ReadOptions& read_options,
const ParseOptions& parse_options,
const ConvertOptions& convert_options, Executor* cpu_executor)
// Count rows is currently not supported during parallel read
: BaseTableReader(std::move(io_context), input, read_options, parse_options,
convert_options),
convert_options, /*count_rows=*/false),
cpu_executor_(cpu_executor) {}

~AsyncThreadedTableReader() override {
Expand Down Expand Up @@ -992,7 +1009,8 @@ Result<std::shared_ptr<TableReader>> MakeTableReader(
io_context, input, read_options, parse_options, convert_options, cpu_executor);
} else {
reader = std::make_shared<SerialTableReader>(io_context, input, read_options,
parse_options, convert_options);
parse_options, convert_options,
/*count_rows=*/true);
}
RETURN_NOT_OK(reader->Init());
return reader;
Expand All @@ -1004,7 +1022,8 @@ Future<std::shared_ptr<StreamingReader>> MakeStreamingReader(
const ParseOptions& parse_options, const ConvertOptions& convert_options) {
std::shared_ptr<BaseStreamingReader> reader;
reader = std::make_shared<SerialStreamingReader>(
io_context, cpu_executor, input, read_options, parse_options, convert_options);
io_context, cpu_executor, input, read_options, parse_options, convert_options,
/*count_rows=*/true);
return reader->Init();
}

Expand Down
Loading