Skip to content

Commit

Permalink
Add native_max_local_exchange_partition_count session property
Browse files Browse the repository at this point in the history
Maps to the max_local_exchange_partition_count Velox query property
introduced in facebookincubator/velox#11292
  • Loading branch information
arhimondr committed Oct 29, 2024
1 parent a4fe924 commit 2834ecc
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 0 deletions.
9 changes: 9 additions & 0 deletions presto-docs/src/main/sphinx/presto_cpp/properties-session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,12 @@ The maximum size in bytes for the task's buffered output. The buffer is shared a
The maximum bytes to buffer per PartitionedOutput operator to avoid creating tiny SerializedPages.
For PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator would buffer up to that number of
bytes / number of destinations for each destination before producing a SerializedPage. Default is 32MB.

``native_max_local_exchange_partition_count``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``bigint``
* **Default value:** ``4294967296``

Maximum number of partitions created by a local exchange.
Affects concurrency for pipelines containing LocalPartitionNode.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class NativeWorkerSessionPropertyProvider
public static final String NATIVE_QUERY_TRACE_NODE_IDS = "native_query_trace_node_ids";
public static final String NATIVE_QUERY_TRACE_MAX_BYTES = "native_query_trace_max_bytes";
public static final String NATIVE_QUERY_TRACE_REG_EXP = "native_query_trace_task_reg_exp";
public static final String NATIVE_MAX_LOCAL_EXCHANGE_PARTITION_COUNT = "native_max_local_exchange_partition_count";
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand Down Expand Up @@ -232,6 +233,12 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig)
"would buffer up to that number of bytes / number of destinations for each destination before " +
"producing a SerializedPage.",
24L << 20,
!nativeExecution),
integerProperty(
NATIVE_MAX_LOCAL_EXCHANGE_PARTITION_COUNT,
"Maximum number of partitions created by a local exchange. " +
"Affects concurrency for pipelines containing LocalPartitionNode",
null,
!nativeExecution));
}

Expand Down
9 changes: 9 additions & 0 deletions presto-native-execution/presto_cpp/main/SessionProperties.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,15 @@ SessionProperties::SessionProperties() {
// Overrides velox default value. Set it to 1 second to be aligned with
// Presto Java.
std::to_string(1000));

addSessionProperty(
kMaxLocalExchangePartitionCount,
"Maximum number of partitions created by a local exchange."
"Affects concurrency for pipelines containing LocalPartitionNode",
BIGINT(),
false,
QueryConfig::kMaxLocalExchangePartitionCount,
std::to_string(c.maxLocalExchangePartitionCount()));
}

const std::unordered_map<std::string, std::shared_ptr<SessionProperty>>&
Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/SessionProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ class SessionProperties {
static constexpr const char* kMaxPartitionedOutputBufferSize =
"native_max_page_partitioning_buffer_size";

/// Maximum number of partitions created by a local exchange.
/// Affects concurrency for pipelines containing LocalPartitionNode.
static constexpr const char* kMaxLocalExchangePartitionCount =
"native_max_local_exchange_partition_count";

SessionProperties();

const std::unordered_map<std::string, std::shared_ptr<SessionProperty>>&
Expand Down

0 comments on commit 2834ecc

Please sign in to comment.