Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions include/internal/basic_csv_parser.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "basic_csv_parser.hpp"

#include <system_error>

namespace csv {
namespace internals {
CSV_INLINE size_t get_file_size(csv::string_view filename) {
Expand Down Expand Up @@ -237,11 +239,18 @@ namespace csv {
this->reset_data_ptr();

// Create memory map
size_t length = std::min(this->source_size - this->mmap_pos, bytes);
const size_t offset = this->mmap_pos;
const size_t length = std::min(this->source_size - offset, bytes);
std::error_code error;
this->data_ptr->_data = std::make_shared<mio::basic_mmap_source<char>>(mio::make_mmap_source(this->_filename, this->mmap_pos, length, error));
auto mmap = mio::make_mmap_source(this->_filename, offset, length, error);
if (error) {
std::string msg = "Memory mapping failed during CSV parsing: file='" + this->_filename
+ "' offset=" + std::to_string(offset)
+ " length=" + std::to_string(length);
throw std::system_error(error, msg);
}
this->data_ptr->_data = std::make_shared<mio::basic_mmap_source<char>>(std::move(mmap));
this->mmap_pos += length;
if (error) throw error;

auto mmap_ptr = (mio::basic_mmap_source<char>*)(this->data_ptr->_data.get());

Expand Down
41 changes: 29 additions & 12 deletions include/internal/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,18 @@ namespace csv {
// Tell read_row() to listen for CSV rows
this->records->notify_all();

this->parser->set_output(*this->records);
this->parser->next(bytes);
try {
this->parser->set_output(*this->records);
this->parser->next(bytes);

if (!this->header_trimmed) {
this->trim_header();
if (!this->header_trimmed) {
this->trim_header();
}
}
catch (...) {
// Never allow exceptions to escape the worker thread, or std::terminate will be invoked.
// Store the exception and rethrow from the consumer thread (read_row / iterator).
this->set_read_csv_exception(std::current_exception());
}

// Tell read_row() to stop waiting
Expand All @@ -272,19 +279,29 @@ namespace csv {
CSV_INLINE bool CSVReader::read_row(CSVRow &row) {
while (true) {
if (this->records->empty()) {
if (this->records->is_waitable())
if (this->records->is_waitable()) {
// Reading thread is currently active => wait for it to populate records
this->records->wait();
else if (this->parser->eof())
continue;
}

// Reading thread is not active
if (this->read_csv_worker.joinable())
this->read_csv_worker.join();

// If the worker thread failed, rethrow the error here
this->rethrow_read_csv_exception_if_any();

if (this->parser->eof())
// End of file and no more records
return false;
else {
// Reading thread is not active => start another one
if (this->read_csv_worker.joinable())
this->read_csv_worker.join();

this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
}
// Start another reading thread
// Mark as waitable before starting the thread to avoid a race where
// read_row() observes is_waitable()==false immediately after thread creation.
this->records->notify_all();
this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
continue;
}
else if (this->records->front().size() != this->n_cols &&
this->_format.variable_column_policy != VariableColumnPolicy::KEEP) {
Expand Down
24 changes: 24 additions & 0 deletions include/internal/csv_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <algorithm>
#include <deque>
#include <exception>
#include <fstream>
#include <iterator>
#include <memory>
Expand Down Expand Up @@ -229,10 +230,33 @@ namespace csv {
std::thread read_csv_worker; /**< Worker thread for read_csv() */
///@}

/** If the worker thread throws, store it here and rethrow on the consumer thread. */
std::exception_ptr read_csv_exception = nullptr;
std::mutex read_csv_exception_lock;

void set_read_csv_exception(std::exception_ptr eptr) {
std::lock_guard<std::mutex> lock(this->read_csv_exception_lock);
this->read_csv_exception = std::move(eptr);
}

std::exception_ptr take_read_csv_exception() {
std::lock_guard<std::mutex> lock(this->read_csv_exception_lock);
auto eptr = this->read_csv_exception;
this->read_csv_exception = nullptr;
return eptr;
}

void rethrow_read_csv_exception_if_any() {
if (auto eptr = this->take_read_csv_exception()) {
std::rethrow_exception(eptr);
}
}

/** Read initial chunk to get metadata */
void initial_read() {
this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv_worker.join();
this->rethrow_read_csv_exception_if_any();
}

void trim_header();
Expand Down
1 change: 1 addition & 0 deletions include/internal/csv_reader_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace csv {
if (this->records->empty()) {
this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv_worker.join();
this->rethrow_read_csv_exception_if_any();

// Still empty => return end iterator
if (this->records->empty()) return this->end();
Expand Down
Loading