Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1538] feat(spark): report blockIds to spark driver optionally #1677

Merged
merged 11 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@

public class RssSparkConfig {

public static final ConfigOption<Boolean> RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED =
ConfigOptions.key("rss.blockId.selfManagementEnabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable the blockId self management in spark driver side. Default value is false.");

public static final ConfigOption<Long> RSS_CLIENT_SEND_SIZE_LIMITATION =
ConfigOptions.key("rss.client.send.size.limit")
.longType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.shuffle;

import java.util.List;
import java.util.Map;

import org.apache.commons.collections.CollectionUtils;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;

/** The class is to manage the shuffle data blockIds in spark driver side. */
public class BlockIdManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a little concern about performance. We ever stored all block ids in one shuffle server. It has poor performance. Because bitmap has poor performance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will try this and apply this into production to check the performance

private static final Logger LOGGER = LoggerFactory.getLogger(BlockIdManager.class);

// shuffleId -> partitionId -> blockIds
private Map<Integer, Map<Integer, Roaring64NavigableMap>> blockIds;

public BlockIdManager() {
this.blockIds = JavaUtils.newConcurrentMap();
}

public void add(int shuffleId, int partitionId, List<Long> ids) {
if (CollectionUtils.isEmpty(ids)) {
return;
}
Map<Integer, Roaring64NavigableMap> partitionedBlockIds =
blockIds.computeIfAbsent(shuffleId, (k) -> JavaUtils.newConcurrentMap());
partitionedBlockIds.compute(
partitionId,
(id, bitmap) -> {
Roaring64NavigableMap store = bitmap == null ? Roaring64NavigableMap.bitmapOf() : bitmap;
ids.stream().forEach(x -> store.add(x));
return store;
});
}

public Roaring64NavigableMap get(int shuffleId, int partitionId) {
Map<Integer, Roaring64NavigableMap> partitionedBlockIds = blockIds.get(shuffleId);
if (partitionedBlockIds == null || partitionedBlockIds.isEmpty()) {
return Roaring64NavigableMap.bitmapOf();
}

Roaring64NavigableMap idMap = partitionedBlockIds.get(partitionId);
if (idMap == null || idMap.isEmpty()) {
return Roaring64NavigableMap.bitmapOf();
}

return RssUtils.cloneBitMap(idMap);
}

public boolean remove(int shuffleId) {
blockIds.remove(shuffleId);
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.shuffle;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.roaringbitmap.longlong.Roaring64NavigableMap;

import org.apache.uniffle.client.PartitionDataReplicaRequirementTracking;
import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest;
import org.apache.uniffle.client.request.RssGetShuffleResultRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.BlockIdLayout;

/**
* This class delegates the blockIds reporting/getting operations from shuffleServer side to Spark
* driver side.
*/
public class BlockIdSelfManagedShuffleWriteClient extends ShuffleWriteClientImpl {
private ShuffleManagerClient shuffleManagerClient;

public BlockIdSelfManagedShuffleWriteClient(
RssShuffleClientFactory.ExtendWriteClientBuilder builder) {
super(builder);

if (builder.getShuffleManagerClient() == null) {
throw new RssException("Illegal empty shuffleManagerClient. This should not happen");
}
this.shuffleManagerClient = builder.getShuffleManagerClient();
}

@Override
public void reportShuffleResult(
Map<ShuffleServerInfo, Map<Integer, Set<Long>>> serverToPartitionToBlockIds,
String appId,
int shuffleId,
long taskAttemptId,
int bitmapNum) {
Map<Integer, List<Long>> partitionToBlockIds = new HashMap<>();
for (Map<Integer, Set<Long>> k : serverToPartitionToBlockIds.values()) {
for (Map.Entry<Integer, Set<Long>> entry : k.entrySet()) {
int partitionId = entry.getKey();
partitionToBlockIds
.computeIfAbsent(partitionId, x -> new ArrayList<>())
.addAll(entry.getValue());
}
}

RssReportShuffleResultRequest request =
new RssReportShuffleResultRequest(
appId, shuffleId, taskAttemptId, partitionToBlockIds, bitmapNum);
shuffleManagerClient.reportShuffleResult(request);
}

@Override
public Roaring64NavigableMap getShuffleResult(
String clientType,
Set<ShuffleServerInfo> shuffleServerInfoSet,
String appId,
int shuffleId,
int partitionId) {
RssGetShuffleResultRequest request =
new RssGetShuffleResultRequest(appId, shuffleId, partitionId, BlockIdLayout.DEFAULT);
return shuffleManagerClient.getShuffleResult(request).getBlockIdBitmap();
}

@Override
public Roaring64NavigableMap getShuffleResultForMultiPart(
String clientType,
Map<ShuffleServerInfo, Set<Integer>> serverToPartitions,
String appId,
int shuffleId,
Set<Integer> failedPartitions,
PartitionDataReplicaRequirementTracking replicaRequirementTracking) {
Set<Integer> partitionIds =
serverToPartitions.values().stream().flatMap(x -> x.stream()).collect(Collectors.toSet());
RssGetShuffleResultForMultiPartRequest request =
new RssGetShuffleResultForMultiPartRequest(
appId, shuffleId, partitionIds, BlockIdLayout.DEFAULT);
return shuffleManagerClient.getShuffleResultForMultiPart(request).getBlockIdBitmap();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.shuffle;

import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;

public class RssShuffleClientFactory extends ShuffleClientFactory {

private static final RssShuffleClientFactory INSTANCE = new RssShuffleClientFactory();

public static RssShuffleClientFactory getInstance() {
return INSTANCE;
}

public ShuffleWriteClient createShuffleWriteClient(ExtendWriteClientBuilder builder) {
return builder.build();
}

public static ExtendWriteClientBuilder<?> newWriteBuilder() {
return new ExtendWriteClientBuilder();
}

public static class ExtendWriteClientBuilder<T extends ExtendWriteClientBuilder<T>>
extends WriteClientBuilder<T> {
private boolean blockIdSelfManagedEnabled;
private ShuffleManagerClient shuffleManagerClient;

public boolean isBlockIdSelfManagedEnabled() {
return blockIdSelfManagedEnabled;
}

public ShuffleManagerClient getShuffleManagerClient() {
return shuffleManagerClient;
}

public T shuffleManagerClient(ShuffleManagerClient client) {
this.shuffleManagerClient = client;
return self();
}

public T blockIdSelfManagedEnabled(boolean blockIdSelfManagedEnabled) {
this.blockIdSelfManagedEnabled = blockIdSelfManagedEnabled;
return self();
}

@Override
public ShuffleWriteClientImpl build() {
if (blockIdSelfManagedEnabled) {
return new BlockIdSelfManagedShuffleWriteClient(this);
}
return super.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.shuffle.BlockIdManager;

import static org.apache.uniffle.common.config.RssClientConf.HADOOP_CONFIG_KEY_PREFIX;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED;
Expand All @@ -61,6 +62,28 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac
private AtomicBoolean isInitialized = new AtomicBoolean(false);
private Method unregisterAllMapOutputMethod;
private Method registerShuffleMethod;
private volatile BlockIdManager blockIdManager;
private Object blockIdManagerLock = new Object();

public BlockIdManager getBlockIdManager() {
if (blockIdManager == null) {
synchronized (blockIdManagerLock) {
if (blockIdManager == null) {
blockIdManager = new BlockIdManager();
LOG.info("BlockId manager has been initialized.");
}
}
}
return blockIdManager;
}

@Override
public boolean unregisterShuffle(int shuffleId) {
if (blockIdManager != null) {
blockIdManager.remove(shuffleId);
}
return true;
}

/** See static overload of this method. */
public abstract void configureBlockIdLayout(SparkConf sparkConf, RssConf rssConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;

import org.apache.uniffle.common.ReceivingFailureServer;
import org.apache.uniffle.shuffle.BlockIdManager;

/**
* This is a proxy interface that mainly delegates the un-registration of shuffles to the
Expand Down Expand Up @@ -82,4 +83,6 @@ boolean reassignAllShuffleServersForWholeStage(

MutableShuffleHandleInfo reassignOnBlockSendFailure(
int shuffleId, Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers);

BlockIdManager getBlockIdManager();
}
Loading
Loading