Skip to content

Commit 133b1a9

Browse files
bkietzlidavidm
authored andcommitted
ARROW-10440: [C++][Dataset] Visit FileWriters before Finish
This enables collection of paths written to during writing of a FileSystemDataset Closes apache#10573 from bkietz/10440-Add-a-callback-to-visit-f Authored-by: Benjamin Kietzman <bengilgit@gmail.com> Signed-off-by: David Li <li.davidm96@gmail.com>
1 parent da841cc commit 133b1a9

File tree

9 files changed

+87
-46
lines changed

9 files changed

+87
-46
lines changed

cpp/src/arrow/dataset/file_base.cc

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,8 @@ class WriteQueue {
418418

419419
ARROW_ASSIGN_OR_RAISE(
420420
writer_, write_options.format()->MakeWriter(std::move(destination), schema_,
421-
write_options.file_write_options));
421+
write_options.file_write_options,
422+
{write_options.filesystem, path}));
422423
return Status::OK();
423424
}
424425

@@ -445,15 +446,15 @@ struct WriteState {
445446
std::unordered_map<std::string, std::unique_ptr<WriteQueue>> queues;
446447
};
447448

448-
Status WriteNextBatch(WriteState& state, const std::shared_ptr<Fragment>& fragment,
449+
Status WriteNextBatch(WriteState* state, const std::shared_ptr<Fragment>& fragment,
449450
std::shared_ptr<RecordBatch> batch) {
450-
ARROW_ASSIGN_OR_RAISE(auto groups, state.write_options.partitioning->Partition(batch));
451+
ARROW_ASSIGN_OR_RAISE(auto groups, state->write_options.partitioning->Partition(batch));
451452
batch.reset(); // drop to hopefully conserve memory
452453

453-
if (groups.batches.size() > static_cast<size_t>(state.write_options.max_partitions)) {
454+
if (groups.batches.size() > static_cast<size_t>(state->write_options.max_partitions)) {
454455
return Status::Invalid("Fragment would be written into ", groups.batches.size(),
455456
" partitions. This exceeds the maximum of ",
456-
state.write_options.max_partitions);
457+
state->write_options.max_partitions);
457458
}
458459

459460
std::unordered_set<WriteQueue*> need_flushed;
@@ -462,20 +463,20 @@ Status WriteNextBatch(WriteState& state, const std::shared_ptr<Fragment>& fragme
462463
and_(std::move(groups.expressions[i]), fragment->partition_expression());
463464
auto batch = std::move(groups.batches[i]);
464465

465-
ARROW_ASSIGN_OR_RAISE(auto part,
466-
state.write_options.partitioning->Format(partition_expression));
466+
ARROW_ASSIGN_OR_RAISE(
467+
auto part, state->write_options.partitioning->Format(partition_expression));
467468

468469
WriteQueue* queue;
469470
{
470471
// lookup the queue to which batch should be appended
471-
auto queues_lock = state.mutex.Lock();
472+
auto queues_lock = state->mutex.Lock();
472473

473474
queue = internal::GetOrInsertGenerated(
474-
&state.queues, std::move(part),
475+
&state->queues, std::move(part),
475476
[&](const std::string& emplaced_part) {
476477
// lookup in `queues` also failed,
477478
// generate a new WriteQueue
478-
size_t queue_index = state.queues.size() - 1;
479+
size_t queue_index = state->queues.size() - 1;
479480

480481
return internal::make_unique<WriteQueue>(emplaced_part, queue_index,
481482
batch->schema());
@@ -489,12 +490,12 @@ Status WriteNextBatch(WriteState& state, const std::shared_ptr<Fragment>& fragme
489490

490491
// flush all touched WriteQueues
491492
for (auto queue : need_flushed) {
492-
RETURN_NOT_OK(queue->Flush(state.write_options));
493+
RETURN_NOT_OK(queue->Flush(state->write_options));
493494
}
494495
return Status::OK();
495496
}
496497

497-
Status WriteInternal(const ScanOptions& scan_options, WriteState& state,
498+
Status WriteInternal(const ScanOptions& scan_options, WriteState* state,
498499
ScanTaskVector scan_tasks) {
499500
// Store a mapping from partitions (represened by their formatted partition expressions)
500501
// to a WriteQueue which flushes batches into that partition's output file. In principle
@@ -544,7 +545,7 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
544545
#pragma warning(disable : 4996)
545546
#endif
546547

547-
// TODO: (ARROW-11782/ARROW-12288) Remove calls to Scan()
548+
// TODO(ARROW-11782/ARROW-12288) Remove calls to Scan()
548549
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, scanner->Scan());
549550
ARROW_ASSIGN_OR_RAISE(ScanTaskVector scan_tasks, scan_task_it.ToVector());
550551

@@ -555,11 +556,14 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
555556
#endif
556557

557558
WriteState state(write_options);
558-
RETURN_NOT_OK(WriteInternal(*scanner->options(), state, std::move(scan_tasks)));
559+
RETURN_NOT_OK(WriteInternal(*scanner->options(), &state, std::move(scan_tasks)));
559560

560561
auto task_group = scanner->options()->TaskGroup();
561562
for (const auto& part_queue : state.queues) {
562-
task_group->Append([&] { return part_queue.second->writer()->Finish(); });
563+
task_group->Append([&] {
564+
RETURN_NOT_OK(write_options.writer_pre_finish(part_queue.second->writer().get()));
565+
return part_queue.second->writer()->Finish();
566+
});
563567
}
564568
return task_group->Finish();
565569
}

cpp/src/arrow/dataset/file_base.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma
175175
/// \brief Create a writer for this format.
176176
virtual Result<std::shared_ptr<FileWriter>> MakeWriter(
177177
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
178-
std::shared_ptr<FileWriteOptions> options) const = 0;
178+
std::shared_ptr<FileWriteOptions> options,
179+
fs::FileLocator destination_locator) const = 0;
179180

180181
/// \brief Get default write options for this format.
181182
virtual std::shared_ptr<FileWriteOptions> DefaultWriteOptions() = 0;
@@ -313,19 +314,23 @@ class ARROW_DS_EXPORT FileWriter {
313314
const std::shared_ptr<FileFormat>& format() const { return options_->format(); }
314315
const std::shared_ptr<Schema>& schema() const { return schema_; }
315316
const std::shared_ptr<FileWriteOptions>& options() const { return options_; }
317+
const fs::FileLocator& destination() const { return destination_locator_; }
316318

317319
protected:
318320
FileWriter(std::shared_ptr<Schema> schema, std::shared_ptr<FileWriteOptions> options,
319-
std::shared_ptr<io::OutputStream> destination)
321+
std::shared_ptr<io::OutputStream> destination,
322+
fs::FileLocator destination_locator)
320323
: schema_(std::move(schema)),
321324
options_(std::move(options)),
322-
destination_(destination) {}
325+
destination_(std::move(destination)),
326+
destination_locator_(std::move(destination_locator)) {}
323327

324328
virtual Status FinishInternal() = 0;
325329

326330
std::shared_ptr<Schema> schema_;
327331
std::shared_ptr<FileWriteOptions> options_;
328332
std::shared_ptr<io::OutputStream> destination_;
333+
fs::FileLocator destination_locator_;
329334
};
330335

331336
/// \brief Options for writing a dataset.
@@ -349,6 +354,12 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
349354
/// {i} will be replaced by an auto incremented integer.
350355
std::string basename_template;
351356

357+
/// Callback to be invoked against all FileWriters before
358+
/// they are finalized with FileWriter::Finish().
359+
std::function<Status(FileWriter*)> writer_pre_finish = [](FileWriter*) {
360+
return Status::OK();
361+
};
362+
352363
const std::shared_ptr<FileFormat>& format() const {
353364
return file_write_options->format();
354365
}

cpp/src/arrow/dataset/file_csv.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
6767

6868
Result<std::shared_ptr<FileWriter>> MakeWriter(
6969
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
70-
std::shared_ptr<FileWriteOptions> options) const override {
70+
std::shared_ptr<FileWriteOptions> options,
71+
fs::FileLocator destination_locator) const override {
7172
return Status::NotImplemented("writing fragment of CsvFileFormat");
7273
}
7374

cpp/src/arrow/dataset/file_ipc.cc

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,8 @@ std::shared_ptr<FileWriteOptions> IpcFileFormat::DefaultWriteOptions() {
258258

259259
Result<std::shared_ptr<FileWriter>> IpcFileFormat::MakeWriter(
260260
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
261-
std::shared_ptr<FileWriteOptions> options) const {
261+
std::shared_ptr<FileWriteOptions> options,
262+
fs::FileLocator destination_locator) const {
262263
if (!Equals(*options->format())) {
263264
return Status::TypeError("Mismatching format/write options.");
264265
}
@@ -274,14 +275,16 @@ Result<std::shared_ptr<FileWriter>> IpcFileFormat::MakeWriter(
274275

275276
return std::shared_ptr<FileWriter>(
276277
new IpcFileWriter(std::move(destination), std::move(writer), std::move(schema),
277-
std::move(ipc_options)));
278+
std::move(ipc_options), std::move(destination_locator)));
278279
}
279280

280281
IpcFileWriter::IpcFileWriter(std::shared_ptr<io::OutputStream> destination,
281282
std::shared_ptr<ipc::RecordBatchWriter> writer,
282283
std::shared_ptr<Schema> schema,
283-
std::shared_ptr<IpcFileWriteOptions> options)
284-
: FileWriter(std::move(schema), std::move(options), std::move(destination)),
284+
std::shared_ptr<IpcFileWriteOptions> options,
285+
fs::FileLocator destination_locator)
286+
: FileWriter(std::move(schema), std::move(options), std::move(destination),
287+
std::move(destination_locator)),
285288
batch_writer_(std::move(writer)) {}
286289

287290
Status IpcFileWriter::Write(const std::shared_ptr<RecordBatch>& batch) {

cpp/src/arrow/dataset/file_ipc.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
6767

6868
Result<std::shared_ptr<FileWriter>> MakeWriter(
6969
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
70-
std::shared_ptr<FileWriteOptions> options) const override;
70+
std::shared_ptr<FileWriteOptions> options,
71+
fs::FileLocator destination_locator) const override;
7172

7273
std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
7374
};
@@ -107,7 +108,8 @@ class ARROW_DS_EXPORT IpcFileWriter : public FileWriter {
107108
IpcFileWriter(std::shared_ptr<io::OutputStream> destination,
108109
std::shared_ptr<ipc::RecordBatchWriter> writer,
109110
std::shared_ptr<Schema> schema,
110-
std::shared_ptr<IpcFileWriteOptions> options);
111+
std::shared_ptr<IpcFileWriteOptions> options,
112+
fs::FileLocator destination_locator);
111113

112114
Status FinishInternal() override;
113115

cpp/src/arrow/dataset/file_parquet.cc

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class ParquetScanTask : public ScanTask {
7070
reader_(std::move(reader)),
7171
pre_buffer_once_(std::move(pre_buffer_once)),
7272
pre_buffer_row_groups_(std::move(pre_buffer_row_groups)),
73-
io_context_(io_context),
73+
io_context_(std::move(io_context)),
7474
cache_options_(cache_options) {}
7575

7676
Result<RecordBatchIterator> Execute() override {
@@ -540,7 +540,8 @@ std::shared_ptr<FileWriteOptions> ParquetFileFormat::DefaultWriteOptions() {
540540

541541
Result<std::shared_ptr<FileWriter>> ParquetFileFormat::MakeWriter(
542542
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
543-
std::shared_ptr<FileWriteOptions> options) const {
543+
std::shared_ptr<FileWriteOptions> options,
544+
fs::FileLocator destination_locator) const {
544545
if (!Equals(*options->format())) {
545546
return Status::TypeError("Mismatching format/write options");
546547
}
@@ -552,14 +553,17 @@ Result<std::shared_ptr<FileWriter>> ParquetFileFormat::MakeWriter(
552553
*schema, default_memory_pool(), destination, parquet_options->writer_properties,
553554
parquet_options->arrow_writer_properties, &parquet_writer));
554555

555-
return std::shared_ptr<FileWriter>(new ParquetFileWriter(
556-
std::move(destination), std::move(parquet_writer), std::move(parquet_options)));
556+
return std::shared_ptr<FileWriter>(
557+
new ParquetFileWriter(std::move(destination), std::move(parquet_writer),
558+
std::move(parquet_options), std::move(destination_locator)));
557559
}
558560

559561
ParquetFileWriter::ParquetFileWriter(std::shared_ptr<io::OutputStream> destination,
560562
std::shared_ptr<parquet::arrow::FileWriter> writer,
561-
std::shared_ptr<ParquetFileWriteOptions> options)
562-
: FileWriter(writer->schema(), std::move(options), std::move(destination)),
563+
std::shared_ptr<ParquetFileWriteOptions> options,
564+
fs::FileLocator destination_locator)
565+
: FileWriter(writer->schema(), std::move(options), std::move(destination),
566+
std::move(destination_locator)),
563567
parquet_writer_(std::move(writer)) {}
564568

565569
Status ParquetFileWriter::Write(const std::shared_ptr<RecordBatch>& batch) {

cpp/src/arrow/dataset/file_parquet.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
128128

129129
Result<std::shared_ptr<FileWriter>> MakeWriter(
130130
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
131-
std::shared_ptr<FileWriteOptions> options) const override;
131+
std::shared_ptr<FileWriteOptions> options,
132+
fs::FileLocator destination_locator) const override;
132133

133134
std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
134135
};
@@ -252,7 +253,8 @@ class ARROW_DS_EXPORT ParquetFileWriter : public FileWriter {
252253
private:
253254
ParquetFileWriter(std::shared_ptr<io::OutputStream> destination,
254255
std::shared_ptr<parquet::arrow::FileWriter> writer,
255-
std::shared_ptr<ParquetFileWriteOptions> options);
256+
std::shared_ptr<ParquetFileWriteOptions> options,
257+
fs::FileLocator destination_locator);
256258

257259
Status FinishInternal() override;
258260

cpp/src/arrow/dataset/file_test.cc

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,22 +87,23 @@ constexpr int kNumScanTasks = 2;
8787
constexpr int kBatchesPerScanTask = 2;
8888
constexpr int kRowsPerBatch = 1024;
8989
class MockFileFormat : public FileFormat {
90-
virtual std::string type_name() const { return "mock"; }
91-
virtual bool Equals(const FileFormat& other) const { return false; }
92-
virtual Result<bool> IsSupported(const FileSource& source) const { return true; }
93-
virtual Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const {
90+
std::string type_name() const override { return "mock"; }
91+
bool Equals(const FileFormat& other) const override { return false; }
92+
Result<bool> IsSupported(const FileSource& source) const override { return true; }
93+
Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override {
9494
return Status::NotImplemented("Not needed for test");
9595
}
96-
virtual Result<std::shared_ptr<FileWriter>> MakeWriter(
96+
Result<std::shared_ptr<FileWriter>> MakeWriter(
9797
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
98-
std::shared_ptr<FileWriteOptions> options) const {
98+
std::shared_ptr<FileWriteOptions> options,
99+
fs::FileLocator destination_locator) const override {
99100
return Status::NotImplemented("Not needed for test");
100101
}
101-
virtual std::shared_ptr<FileWriteOptions> DefaultWriteOptions() { return nullptr; }
102+
std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override { return nullptr; }
102103

103-
virtual Result<ScanTaskIterator> ScanFile(
104+
Result<ScanTaskIterator> ScanFile(
104105
const std::shared_ptr<ScanOptions>& options,
105-
const std::shared_ptr<FileFragment>& file) const {
106+
const std::shared_ptr<FileFragment>& file) const override {
106107
auto sch = schema({field("i32", int32())});
107108
ScanTaskVector scan_tasks;
108109
for (int i = 0; i < kNumScanTasks; i++) {

cpp/src/arrow/dataset/test_util.h

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ class FileFormatFixtureMixin : public ::testing::Test {
486486
EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink());
487487

488488
if (!options) options = format->DefaultWriteOptions();
489-
EXPECT_OK_AND_ASSIGN(auto writer, format->MakeWriter(sink, schema, options));
489+
EXPECT_OK_AND_ASSIGN(auto writer, format->MakeWriter(sink, schema, options, {}));
490490
ARROW_EXPECT_OK(writer->Write(GetRecordBatchReader(schema).get()));
491491
ARROW_EXPECT_OK(writer->Finish());
492492
EXPECT_OK_AND_ASSIGN(auto written, sink->Finish());
@@ -722,7 +722,8 @@ class DummyFileFormat : public FileFormat {
722722

723723
Result<std::shared_ptr<FileWriter>> MakeWriter(
724724
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
725-
std::shared_ptr<FileWriteOptions> options) const override {
725+
std::shared_ptr<FileWriteOptions> options,
726+
fs::FileLocator destination_locator) const override {
726727
return Status::NotImplemented("writing fragment of DummyFileFormat");
727728
}
728729

@@ -770,7 +771,8 @@ class JSONRecordBatchFileFormat : public FileFormat {
770771

771772
Result<std::shared_ptr<FileWriter>> MakeWriter(
772773
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
773-
std::shared_ptr<FileWriteOptions> options) const override {
774+
std::shared_ptr<FileWriteOptions> options,
775+
fs::FileLocator destination_locator) const override {
774776
return Status::NotImplemented("writing fragment of JSONRecordBatchFileFormat");
775777
}
776778

@@ -1057,8 +1059,12 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
10571059
void SetWriteOptions(std::shared_ptr<FileWriteOptions> file_write_options) {
10581060
write_options_.file_write_options = file_write_options;
10591061
write_options_.filesystem = fs_;
1060-
write_options_.base_dir = "new_root/";
1062+
write_options_.base_dir = "/new_root/";
10611063
write_options_.basename_template = "dat_{i}";
1064+
write_options_.writer_pre_finish = [this](FileWriter* writer) {
1065+
visited_paths_.push_back(writer->destination().path);
1066+
return Status::OK();
1067+
};
10621068
}
10631069

10641070
void DoWrite(std::shared_ptr<Partitioning> desired_partitioning) {
@@ -1210,11 +1216,17 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
12101216
for (const auto& file_contents : expected_files_) {
12111217
expected_paths.insert(file_contents.first);
12121218
}
1219+
1220+
// expect the written filesystem to contain precisely the paths we expected
12131221
for (auto path : checked_pointer_cast<FileSystemDataset>(written_)->files()) {
12141222
actual_paths.insert(std::move(path));
12151223
}
12161224
EXPECT_THAT(actual_paths, testing::UnorderedElementsAreArray(expected_paths));
12171225

1226+
// Additionally, the writer producing each written file was visited and its path
1227+
// collected. That should match the expected paths as well
1228+
EXPECT_THAT(visited_paths_, testing::UnorderedElementsAreArray(expected_paths));
1229+
12181230
ASSERT_OK_AND_ASSIGN(auto written_fragments_it, written_->GetFragments());
12191231
for (auto maybe_fragment : written_fragments_it) {
12201232
ASSERT_OK_AND_ASSIGN(auto fragment, maybe_fragment);
@@ -1257,6 +1269,7 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
12571269
PathAndContent expected_files_;
12581270
std::shared_ptr<Schema> expected_physical_schema_;
12591271
std::shared_ptr<Dataset> written_;
1272+
std::vector<std::string> visited_paths_;
12601273
FileSystemDatasetWriteOptions write_options_;
12611274
std::shared_ptr<ScanOptions> scan_options_;
12621275
};

0 commit comments

Comments
 (0)