layout | displayTitle | title | description | license |
---|---|---|---|---|
page |
Uniffle Shuffle Client Guide |
Uniffle Shuffle Client Guide |
Uniffle Shuffle Client Guide |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
|
Uniffle is designed as a unified shuffle engine for multiple computing frameworks, including Apache Spark, Apache Hadoop and Apache Tez. Uniffle has provided pluggable client plugins to enable remote shuffle in Spark, MapReduce and Tez.
Refer to the following documents on how to deploy Uniffle client plugins with Spark, MapReduce and Tez. Client specific configurations are also listed in each document.
Client | Link |
---|---|
Spark | Deploy Spark Client Plugin & Configurations |
MapReduce | Deploy MapReduce Client Plugin & Configurations |
Tez | Deploy Tez Client Plugin & Configurations |
The important configuration of client is listed as following. These configurations are shared by all types of clients.
Property Name | Default | Description |
---|---|---|
<client_type>.rss.coordinator.quorum | - | Coordinator quorum |
<client_type>.rss.writer.buffer.size | 3m | Buffer size for single partition data |
<client_type>.rss.storage.type | - | Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS |
<client_type>.rss.client.read.buffer.size | 14m | The max data size read from storage |
<client_type>.rss.client.send.threadPool.size | 5 | The thread size for send shuffle data to shuffle server |
<client_type>.rss.client.assignment.tags | - | The comma-separated list of tags for deciding assignment shuffle servers. Notice that the SHUFFLE_SERVER_VERSION will always as the assignment tag whether this conf is set or not |
<client_type>.rss.client.data.commit.pool.size | The number of assigned shuffle servers | The thread size for sending commit to shuffle servers |
<client_type>.rss.client.assignment.shuffle.nodes.max | -1 | The number of required assignment shuffle servers. If it is less than 0 or equals to 0 or greater than the coordinator's config of "rss.coordinator.shuffle.nodes.max", it will use the size of "rss.coordinator.shuffle.nodes.max" default |
<client_type>.rss.client.io.compression.codec | lz4 | The compression codec is used to compress the shuffle data. Default codec is lz4 . Other options areZSTD and SNAPPY . |
<client_type>.rss.client.io.compression.zstd.level | 3 | The zstd compression level, the default level is 3 |
<client_type>.rss.client.shuffle.data.distribution.type | NORMAL | The type of partition shuffle data distribution, including normal and local_order. The default value is normal. Now this config is only valid in Spark3.x |
<client_type>.rss.estimate.task.concurrency.dynamic.factor | 1.0 | Between 0 and 1, used to estimate task concurrency, when the client is spark, it represents how likely is this part of the resource between spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors to be allocated, when the client is mr, it represents how likely the resources of map and reduce are satisfied. Effective when <client_type>.rss.estimate.server.assignment.enabled=true or Coordinator's rss.coordinator.select.partition.strategy is CONTINUOUS. |
<client_type>.rss.estimate.server.assignment.enabled | false | Support mr and spark, whether to enable estimation of the number of ShuffleServers that need to be allocated based on the number of concurrent tasks. |
<client_type>.rss.estimate.task.concurrency.per.server | 80 | It takes effect when rss.estimate.server.assignment.enabled=true, how many tasks are concurrently assigned to a ShuffleServer. |
<client_type>.rss.client.max.concurrency.of.per-partition.write | - | The maximum number of files that can be written concurrently to a single partition is determined. This value will only be respected by the remote shuffle server if it is greater than 0. |
<client_type>.rss.client.rpc.timeout.ms | 60000 | The timeout value in milliseconds for gRPC and Netty Type RPC Clients, including ShuffleServerClient and ShuffleManagerClient. |
<client_type>.rss.client.rpc.maxAttempts | 3 | When we fail to send RPC calls, we will retry for maxAttempts times. |
<client_type>.rss.client.rpc.netty.pageSize | 4096 | The value of pageSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
<client_type>.rss.client.rpc.netty.maxOrder | 3 | The value of maxOrder for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
<client_type>.rss.client.rpc.netty.smallCacheSize | 1024 | The value of smallCacheSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
Notice:
-
<client_type>
should bemapreduce
tez
orspark
-
<client_type>.rss.coordinator.quorum
is compulsory, and other configurations are optional when coordinator dynamic configuration is enabled.
Uniffle supports client-side quorum protocol to tolerant shuffle server crash. This feature is client-side behaviour, in which shuffle writer sends each block to multiple servers, and shuffle readers could fetch block data from one of server. Since sending multiple replicas of blocks can reduce the shuffle performance and resource consumption, we designed it as an optional feature.
Property Name | Default | Description |
---|---|---|
<client_type>.rss.data.replica | 1 | The max server number that each block can be send by client in quorum protocol |
<client_type>.rss.data.replica.write | 1 | The min server number that each block should be send by client successfully |
<client_type>.rss.data.replica.read | 1 | The min server number that metadata should be fetched by client successfully |
Notice:
spark.rss.data.replica.write
+spark.rss.data.replica.read
>spark.rss.data.replica
Recommended examples:
- Performance First (default)
spark.rss.data.replica 1
spark.rss.data.replica.write 1
spark.rss.data.replica.read 1
- Fault-tolerant First
spark.rss.data.replica 3
spark.rss.data.replica.write 2
spark.rss.data.replica.read 2
Property Name | Default | Description |
---|---|---|
<client_type>.rss.client.type | GRPC | Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using GRPC_NETTY to enable Netty on the client side for better stability and performance. |
<client_type>.rss.client.netty.io.mode | NIO | Netty EventLoopGroup backend, available options: NIO, EPOLL. |
<client_type>.rss.client.netty.client.connection.timeout.ms | 600000 | Connection active timeout. |
<client_type>.rss.client.netty.client.threads | 0 | Number of threads used in the client thread pool. Default is 0, Netty will use the number of (available logical cores * 2) as the number of threads. |
<client_type>.rss.client.netty.client.threads.ratio | 2.0 | The number of threads used in the client thread pool will be (<client_type>.rss.client.netty.client.connections.per.peer * <client_type>.rss.client.netty.client.threads.ratio ). This is only effective when <client_type>.rss.client.netty.client.threads is not explicitly set. |
<client_type>.rss.client.netty.client.prefer.direct.bufs | true | If true, we will prefer allocating off-heap byte buffers within Netty. |
<client_type>.rss.client.netty.client.pooled.allocator.enabled | true | If true, we will use PooledByteBufAllocator to allocate byte buffers within Netty, otherwise we'll use UnpooledByteBufAllocator. |
<client_type>.rss.client.netty.client.shared.allocator.enabled | true | A flag indicating whether to share the ByteBuf allocators between the different Netty channels when enabling Netty. If enabled then only three ByteBuf allocators are created: one PooledByteBufAllocator where caching is allowed, one PooledByteBufAllocator where not and one UnpooledByteBufAllocator. When disabled, a new allocator is created for each transport client. |
<client_type>.rss.client.netty.client.connections.per.peer | 2 | Suppose there are 100 executors, spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer will establish a total of (100 * 2) connections with multiple clients. |
<client_type>.rss.client.netty.client.receive.buffer | 0 | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system automatically estimates the receive buffer size based on default settings. |
<client_type>.rss.client.netty.client.send.buffer | 0 | Send buffer size (SO_SNDBUF). |