From 512276d2b45e25f367d577e098d2137fa3d724eb Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Fri, 30 Aug 2024 12:15:28 -0700 Subject: [PATCH] Allow file name to be supplied to non-bucketing table write node Summary: For some non Presto/Spark use cases, we need to use `PlanBuilder` to construct plan and supply file name directly to table write. Differential Revision: D62032439 --- velox/connectors/hive/HiveDataSink.cpp | 10 ++++++++-- velox/connectors/hive/HiveDataSink.h | 10 +++++++++- velox/exec/tests/TableWriteTest.cpp | 27 ++++++++++++++++++++++++++ velox/exec/tests/utils/PlanBuilder.cpp | 12 ++++++++---- velox/exec/tests/utils/PlanBuilder.h | 10 ++++++++-- 5 files changed, 60 insertions(+), 9 deletions(-) diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index c0d5cb46b9fa..80fe115d1118 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -898,10 +898,15 @@ HiveWriterParameters HiveDataSink::getWriterParameters( std::pair HiveDataSink::getWriterFileNames( std::optional bucketId) const { std::string targetFileName; + const bool generateFileName = + insertTableHandle_->locationHandle()->targetFileName().empty(); if (bucketId.has_value()) { + VELOX_CHECK(generateFileName); // TODO: add hive.file_renaming_enabled support. targetFileName = computeBucketedFileName( connectorQueryCtx_->queryId(), bucketId.value()); + } else if (!generateFileName) { + targetFileName = insertTableHandle_->locationHandle()->targetFileName(); } else { // targetFileName includes planNodeId and Uuid. As a result, different // table writers run by the same task driver or the same table writer @@ -916,8 +921,9 @@ std::pair HiveDataSink::getWriterFileNames( const std::string writeFileName = isCommitRequired() ? fmt::format(".tmp.velox.{}_{}", targetFileName, makeUuid()) : targetFileName; - if (insertTableHandle_->tableStorageFormat() == - dwio::common::FileFormat::PARQUET) { + if (generateFileName && + insertTableHandle_->tableStorageFormat() == + dwio::common::FileFormat::PARQUET) { return { fmt::format("{}{}", targetFileName, ".parquet"), fmt::format("{}{}", writeFileName, ".parquet")}; diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index a916316bda45..233f5a8991b9 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -47,8 +47,10 @@ class LocationHandle : public ISerializable { LocationHandle( std::string targetPath, std::string writePath, - TableType tableType) + TableType tableType, + std::string targetFileName = "") : targetPath_(std::move(targetPath)), + targetFileName_(std::move(targetFileName)), writePath_(std::move(writePath)), tableType_(tableType) {} @@ -56,6 +58,10 @@ class LocationHandle : public ISerializable { return targetPath_; } + const std::string& targetFileName() const { + return targetFileName_; + } + const std::string& writePath() const { return writePath_; } @@ -79,6 +85,8 @@ class LocationHandle : public ISerializable { private: // Target directory path. const std::string targetPath_; + // If non-empty, use this name instead of generating our own. + const std::string targetFileName_; // Staging directory path. const std::string writePath_; // Whether the table to be written is new, already existing or temporary. diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index c54ebd3e443b..c1e718478500 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -1104,6 +1104,33 @@ TEST_F(BasicTableWriteTest, roundTrip) { assertEqualResults({data}, {copy}); } +TEST_F(BasicTableWriteTest, targetFileName) { + constexpr const char* kFileName = "test.dwrf"; + auto data = makeRowVector({makeFlatVector(10, folly::identity)}); + auto directory = TempDirectoryPath::create(); + auto plan = PlanBuilder() + .values({data}) + .tableWrite( + directory->getPath(), + dwio::common::FileFormat::DWRF, + {}, + nullptr, + kFileName) + .planNode(); + auto results = AssertQueryBuilder(plan).copyResults(pool()); + auto* details = results->childAt(TableWriteTraits::kFragmentChannel) + ->asUnchecked>(); + auto detail = folly::parseJson(details->valueAt(1)); + auto fileWriteInfos = detail["fileWriteInfos"]; + ASSERT_EQ(1, fileWriteInfos.size()); + ASSERT_EQ(fileWriteInfos[0]["writeFileName"].asString(), kFileName); + plan = PlanBuilder().tableScan(asRowType(data->type())).planNode(); + AssertQueryBuilder(plan) + .split(makeHiveConnectorSplit( + fmt::format("{}/{}", directory->getPath(), kFileName))) + .assertResults(data); +} + class PartitionedTableWriterTest : public TableWriteTest, public testing::WithParamInterface { diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 8ec7dc813a91..59b719458024 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -341,7 +341,8 @@ PlanBuilder& PlanBuilder::tableWrite( const std::string& outputDirectoryPath, const dwio::common::FileFormat fileFormat, const std::vector& aggregates, - const std::shared_ptr& options) { + const std::shared_ptr& options, + const std::string& outputFileName) { return tableWrite( outputDirectoryPath, {}, @@ -352,7 +353,8 @@ PlanBuilder& PlanBuilder::tableWrite( aggregates, kHiveDefaultConnectorId, {}, - options); + options, + outputFileName); } PlanBuilder& PlanBuilder::tableWrite( @@ -405,7 +407,8 @@ PlanBuilder& PlanBuilder::tableWrite( const std::vector& aggregates, const std::string_view& connectorId, const std::unordered_map& serdeParameters, - const std::shared_ptr& options) { + const std::shared_ptr& options, + const std::string& outputFileName) { VELOX_CHECK_NOT_NULL(planNode_, "TableWrite cannot be the source node"); auto rowType = planNode_->outputType(); @@ -428,7 +431,8 @@ PlanBuilder& PlanBuilder::tableWrite( auto locationHandle = std::make_shared( outputDirectoryPath, outputDirectoryPath, - connector::hive::LocationHandle::TableType::kNew); + connector::hive::LocationHandle::TableType::kNew, + outputFileName); std::shared_ptr bucketProperty; if (!partitionBy.empty() && bucketCount != 0) { bucketProperty = diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 4c4e74c66b2a..ec1863c7e228 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -383,6 +383,8 @@ class PlanBuilder { /// @param aggregates Aggregations for column statistics collection during /// @param polymorphic options object to be passed to the writer. /// write, supported aggregation types vary for different column types. + /// @param outputFileName Optional file name of the output. If specified + /// (non-empty), use it instead of generating the file name in Velox. /// For example: /// Boolean: count, countIf. /// NumericType/Date/Timestamp: min, max, approx_distinct, count. @@ -393,7 +395,8 @@ class PlanBuilder { const dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF, const std::vector& aggregates = {}, - const std::shared_ptr& options = nullptr); + const std::shared_ptr& options = nullptr, + const std::string& outputFileName = ""); /// Adds a TableWriteNode to write all input columns into a partitioned Hive /// table without compression. @@ -447,6 +450,8 @@ class PlanBuilder { /// @param connectorId Name used to register the connector. /// @param serdeParameters Additional parameters passed to the writer. /// @param Option objects passed to the writer. + /// @param outputFileName Optional file name of the output. If specified + /// (non-empty), use it instead of generating the file name in Velox. PlanBuilder& tableWrite( const std::string& outputDirectoryPath, const std::vector& partitionBy, @@ -459,7 +464,8 @@ class PlanBuilder { const std::vector& aggregates = {}, const std::string_view& connectorId = kHiveDefaultConnectorId, const std::unordered_map& serdeParameters = {}, - const std::shared_ptr& options = nullptr); + const std::shared_ptr& options = nullptr, + const std::string& outputFileName = ""); /// Add a TableWriteMergeNode. PlanBuilder& tableWriteMerge(