Skip to content

Commit

Permalink
Add session properties for scale writer query configs
Browse files Browse the repository at this point in the history
  • Loading branch information
duxiao1212 authored and arhimondr committed Dec 16, 2024
1 parent 05bc56c commit 8f3cd13
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 5 deletions.
44 changes: 43 additions & 1 deletion presto-docs/src/main/sphinx/presto_cpp/properties-session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -376,4 +376,46 @@ The shard id to be traced. If not specified, all shards will be matched.
* **Default value:** ``""``

The regular expression to match a task for tracing. It will be deprecated if there is
no issue with native_query_trace_fragment_id and native_query_trace_shard_id.
no issue with native_query_trace_fragment_id and native_query_trace_shard_id.

``native_scaled_writer_rebalance_max_memory_usage_ratio``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``double``
* **Minimum value:** ``0``
* **Maximum value:** ``1``
* **Default value:** ``0.7``

The max ratio of a query used memory to its max capacity, and the scale
writer exchange stops scaling writer processing if the query's current
memory usage exceeds this ratio. The value is in the range of (0, 1].

``native_scaled_writer_max_partitions_per_writer``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``integer``
* **Default value:** ``128``

The max number of logical table partitions that can be assigned to a
single table writer thread. The logical table partition is used by local
exchange writer for writer scaling, and multiple physical table
partitions can be mapped to the same logical table partition based on the
hash value of calculated partitioned ids.

``native_scaled_writer_min_partition_processed_bytes_rebalance_threshold``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``bigint``
* **Default value:** ``134217728``

Minimum amount of data processed by a logical table partition to trigger
writer scaling if it is detected as overloaded by scale writer exchange.

``native_scaled_writer_min_processed_bytes_rebalance_threshold``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``bigint``
* **Default value:** ``268435456``

Minimum amount of data processed by all the logical table partitions to
trigger skewed partition rebalancing by scale writer exchange.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;

import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.longProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
Expand Down Expand Up @@ -68,6 +69,10 @@ public class NativeWorkerSessionPropertyProvider
public static final String NATIVE_PREFIXSORT_NORMALIZED_KEY_MAX_BYTES = "native_prefixsort_normalized_key_max_bytes";
public static final String NATIVE_PREFIXSORT_MIN_ROWS = "native_prefixsort_min_rows";
public static final String NATIVE_OP_TRACE_DIR_CREATE_CONFIG = "native_op_trace_directory_create_config";
public static final String NATIVE_SCALED_WRITER_REBALANCE_MAX_MEMORY_USAGE_RATIO = "native_scaled_writer_rebalance_max_memory_usage_ratio";
public static final String NATIVE_SCALED_WRITER_MAX_PARTITIONS_PER_WRITER = "native_scaled_writer_max_partitions_per_writer";
public static final String NATIVE_SCALED_WRITER_MIN_PARTITION_PROCESSED_BYTES_REBALANCE_THRESHOLD = "native_scaled_writer_min_partition_processed_bytes_rebalance_threshold";
public static final String NATIVE_SCALED_WRITER_MIN_PROCESSED_BYTES_REBALANCE_THRESHOLD = "native_scaled_writer_min_processed_bytes_rebalance_threshold";
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand Down Expand Up @@ -133,7 +138,7 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig)
longProperty(
NATIVE_WRITER_FLUSH_THRESHOLD_BYTES,
"Native Execution only. Minimum memory footprint size required to reclaim memory from a file " +
"writer by flushing its buffered data to disk.",
"writer by flushing its buffered data to disk.",
96L << 20,
false),
booleanProperty(
Expand Down Expand Up @@ -276,6 +281,34 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig)
"Minimum number of rows to use prefix-sort. " +
"The default value (130) has been derived using micro-benchmarking.",
130,
!nativeExecution),
doubleProperty(
NATIVE_SCALED_WRITER_REBALANCE_MAX_MEMORY_USAGE_RATIO,
"The max ratio of a query used memory to its max capacity, " +
"and the scale writer exchange stops scaling writer processing if the query's current " +
"memory usage exceeds this ratio. The value is in the range of (0, 1].",
0.7,
!nativeExecution),
integerProperty(
NATIVE_SCALED_WRITER_MAX_PARTITIONS_PER_WRITER,
"The max number of logical table partitions that can be assigned to a " +
"single table writer thread. The logical table partition is used by local " +
"exchange writer for writer scaling, and multiple physical table " +
"partitions can be mapped to the same logical table partition based on the " +
"hash value of calculated partitioned ids",
128,
!nativeExecution),
longProperty(
NATIVE_SCALED_WRITER_MIN_PARTITION_PROCESSED_BYTES_REBALANCE_THRESHOLD,
"Minimum amount of data processed by a logical table partition " +
"to trigger writer scaling if it is detected as overloaded by scale writer exchange.",
128L << 20,
!nativeExecution),
longProperty(
NATIVE_SCALED_WRITER_MIN_PROCESSED_BYTES_REBALANCE_THRESHOLD,
"Minimum amount of data processed by all the logical table partitions " +
"to trigger skewed partition rebalancing by scale writer exchange.",
256L << 20,
!nativeExecution));
}

Expand Down
41 changes: 41 additions & 0 deletions presto-native-execution/presto_cpp/main/SessionProperties.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,47 @@ SessionProperties::SessionProperties() {
false,
QueryConfig::kPrefixSortMinRows,
std::to_string(c.prefixSortMinRows()));

addSessionProperty(
kScaleWriterRebalanceMaxMemoryUsageRatio,
"The max ratio of a query used memory to its max capacity, "
"and the scale writer exchange stops scaling writer processing if the query's current "
"memory usage exceeds this ratio. The value is in the range of (0, 1].",
DOUBLE(),
false,
QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio,
std::to_string(c.scaleWriterRebalanceMaxMemoryUsageRatio()));

addSessionProperty(
kScaleWriterMaxPartitionsPerWriter,
"The max number of logical table partitions that can be assigned to a "
"single table writer thread. The logical table partition is used by local "
"exchange writer for writer scaling, and multiple physical table "
"partitions can be mapped to the same logical table partition based on the "
"hash value of calculated partitioned ids.",
INTEGER(),
false,
QueryConfig::kScaleWriterMaxPartitionsPerWriter,
std::to_string(c.scaleWriterMaxPartitionsPerWriter()));

addSessionProperty(
kScaleWriterMinPartitionProcessedBytesRebalanceThreshold,
"Minimum amount of data processed by a logical table partition "
"to trigger writer scaling if it is detected as overloaded by scale writer exchange.",
BIGINT(),
false,
QueryConfig::kScaleWriterMinPartitionProcessedBytesRebalanceThreshold,
std::to_string(
c.scaleWriterMinPartitionProcessedBytesRebalanceThreshold()));

addSessionProperty(
kScaleWriterMinProcessedBytesRebalanceThreshold,
"Minimum amount of data processed by all the logical table partitions "
"to trigger skewed partition rebalancing by scale writer exchange.",
BIGINT(),
false,
QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold,
std::to_string(c.scaleWriterMinProcessedBytesRebalanceThreshold()));
}

const std::unordered_map<std::string, std::shared_ptr<SessionProperty>>&
Expand Down
27 changes: 26 additions & 1 deletion presto-native-execution/presto_cpp/main/SessionProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,31 @@ class SessionProperties {
static constexpr const char* kSelectiveNimbleReaderEnabled =
"native_selective_nimble_reader_enabled";

/// The max ratio of a query used memory to its max capacity, and the scale
/// writer exchange stops scaling writer processing if the query's current
/// memory usage exceeds this ratio. The value is in the range of (0, 1].
static constexpr const char* kScaleWriterRebalanceMaxMemoryUsageRatio =
"native_scaled_writer_rebalance_max_memory_usage_ratio";

/// The max number of logical table partitions that can be assigned to a
/// single table writer thread. The logical table partition is used by local
/// exchange writer for writer scaling, and multiple physical table
/// partitions can be mapped to the same logical table partition based on the
/// hash value of calculated partitioned ids.
static constexpr const char* kScaleWriterMaxPartitionsPerWriter =
"native_scaled_writer_max_partitions_per_writer";

/// Minimum amount of data processed by a logical table partition to trigger
/// writer scaling if it is detected as overloaded by scale writer exchange.
static constexpr const char*
kScaleWriterMinPartitionProcessedBytesRebalanceThreshold =
"native_scaled_writer_min_partition_processed_bytes_rebalance_threshold";

/// Minimum amount of data processed by all the logical table partitions to
/// trigger skewed partition rebalancing by scale writer exchange.
static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold =
"native_scaled_writer_min_processed_bytes_rebalance_threshold";

/// Enable timezone-less timestamp conversions.
static constexpr const char* kLegacyTimestamp = "legacy_timestamp";

Expand Down Expand Up @@ -246,7 +271,7 @@ class SessionProperties {
/// prefix keys, which might have potential risk of running out of server
/// memory.
static constexpr const char* kSpillPrefixSortEnabled =
"spill_prefixsort_enabled";
"native_spill_prefixsort_enabled";

/// Maximum number of bytes to use for the normalized key in prefix-sort. Use
/// 0 to disable prefix-sort.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,21 @@ TEST_F(SessionPropertiesTest, validateMapping) {
const std::vector<std::string> names = {
SessionProperties::kLegacyTimestamp,
SessionProperties::kDriverCpuTimeSliceLimitMs,
SessionProperties::kSpillCompressionCodec};
SessionProperties::kSpillCompressionCodec,
SessionProperties::kScaleWriterRebalanceMaxMemoryUsageRatio,
SessionProperties::kScaleWriterMaxPartitionsPerWriter,
SessionProperties::
kScaleWriterMinPartitionProcessedBytesRebalanceThreshold,
SessionProperties::kScaleWriterMinProcessedBytesRebalanceThreshold};
const std::vector<std::string> veloxConfigNames = {
core::QueryConfig::kAdjustTimestampToTimezone,
core::QueryConfig::kDriverCpuTimeSliceLimitMs,
core::QueryConfig::kSpillCompressionKind};
core::QueryConfig::kSpillCompressionKind,
core::QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio,
core::QueryConfig::kScaleWriterMaxPartitionsPerWriter,
core::QueryConfig::
kScaleWriterMinPartitionProcessedBytesRebalanceThreshold,
core::QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold};
auto sessionProperties = SessionProperties().getSessionProperties();
const auto len = names.size();
for (auto i = 0; i < len; i++) {
Expand Down

0 comments on commit 8f3cd13

Please sign in to comment.