Skip to content

Demo of implementation to the local directory #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
06bf9f9
initial merge
ifilonenko Dec 19, 2018
e3e9d68
working version
ifilonenko Dec 21, 2018
458b2be
added shuffle location discovery
ifilonenko Dec 26, 2018
646f1bf
running tests on driver and executor logic
ifilonenko Dec 28, 2018
5555bc9
testing executor writing
ifilonenko Dec 28, 2018
7fabde7
added index file write and data read
ifilonenko Jan 2, 2019
109fbaa
fixing read issues
ifilonenko Jan 2, 2019
bce2ed0
investigating issue with correctness bug
ifilonenko Jan 7, 2019
28714b3
refactored executor specific logic and began fixing transport client …
ifilonenko Jan 8, 2019
d598e00
remove client issues
ifilonenko Jan 8, 2019
90f3804
added hashcode
ifilonenko Jan 9, 2019
7f30751
small changes to replica-based shuffle service implementation
ifilonenko Jan 9, 2019
cffc20c
solved read issue in terms of deserialization
ifilonenko Jan 9, 2019
c91574d
IT WORKSSSSSSSS
ifilonenko Jan 10, 2019
7f1b215
scratch
yifeih Jan 15, 2019
d0c8f29
attempt 1
yifeih Jan 15, 2019
c2231a0
resolving a few of the initial comments while still preserving correc…
ifilonenko Jan 16, 2019
45343fa
fix serialization
yifeih Jan 16, 2019
a301d24
basic cleanup
yifeih Jan 16, 2019
9a17589
Merge branch 'SPARK-25299-v3' of github.com:ifilonenko/spark into yh/…
yifeih Jan 16, 2019
3ba25ab
compiles
yifeih Jan 17, 2019
d7919f2
Bypass Merge sort works
yifeih Jan 17, 2019
0befe41
small refactors
yifeih Jan 18, 2019
4fba8d2
done refactoring
yifeih Jan 18, 2019
a5ee746
more cleanup
yifeih Jan 18, 2019
6e86ac0
more housekeeping
yifeih Jan 18, 2019
4a12c93
sweep sweep
yifeih Jan 18, 2019
40ab79f
Merge pull request #12 from yifeih/yh/ess-metadata-v1
ifilonenko Jan 18, 2019
75ecb66
Update ShuffleLocation to be part of the read API too
yifeih Jan 22, 2019
af58978
Changes to ByteBuffer and serialization logic
vanzin Jan 22, 2019
1381f55
resolve some comments regarding BlockManager and slight style
ifilonenko Jan 23, 2019
7c0fa1d
Fix UnsafeShuffleWriter (#15)
yifeih Jan 24, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.slf4j.LoggerFactory;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.OneForOneStreamManager;
Expand Down Expand Up @@ -81,6 +82,22 @@ public void receive(TransportClient client, ByteBuffer message, RpcResponseCallb
handleMessage(msgObj, client, callback);
}

@Override
public StreamCallbackWithID receiveStream(
TransportClient client,
ByteBuffer messageHeader,
RpcResponseCallback callback) {
BlockTransferMessage header = BlockTransferMessage.Decoder.fromByteBuffer(messageHeader);
return handleStream(header, client, callback);
}

protected StreamCallbackWithID handleStream(
BlockTransferMessage header,
TransportClient client,
RpcResponseCallback callback) {
throw new UnsupportedOperationException("Unexpected message header: " + header);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks odd at least to me - why is this change needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this is for the case in which a stream, that is handled by the KubernetesExternalShuffleBlockResolver, is malformed and would be then be defaulted to this class via a super.handleStream() request. I can take this out, it isn't necessary

}

protected void handleMessage(
BlockTransferMessage msgObj,
TransportClient client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.regex.Pattern;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public void registerWithShuffleServer(
ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
checkInit();
try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) {
ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
ByteBuffer registerMessage =
new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
client.sendRpcSync(registerMessage, registrationTimeoutMs);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package org.apache.spark.network.shuffle;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.StandardOpenOption;

import org.apache.spark.network.client.StreamCallbackWithID;

public class FileWriterStreamCallback implements StreamCallbackWithID {

private static final Logger logger = LoggerFactory.getLogger(FileWriterStreamCallback.class);

public enum FileType {
DATA("shuffle-data"),
INDEX("shuffle-index");

private final String typeString;

FileType(String typeString) {
this.typeString = typeString;
}

@Override
public String toString() {
return typeString;
}
}

private final String appId;
private final int shuffleId;
private final int mapId;
private final File file;
private final FileType fileType;
private WritableByteChannel fileOutputChannel = null;

public FileWriterStreamCallback(
String appId,
int shuffleId,
int mapId,
File file,
FileWriterStreamCallback.FileType fileType) {
this.appId = appId;
this.shuffleId = shuffleId;
this.mapId = mapId;
this.file = file;
this.fileType = fileType;
}

public void open() {
logger.info(
"Opening {} for remote writing. File type: {}", file.getAbsolutePath(), fileType);
if (fileOutputChannel != null) {
throw new IllegalStateException(
String.format(
"File %s for is already open for writing (type: %s).",
file.getAbsolutePath(),
fileType));
}
if (!file.exists()) {
try {
if (!file.getParentFile().isDirectory() && !file.getParentFile().mkdirs()) {
throw new IOException(
String.format(
"Failed to create shuffle file directory at"
+ file.getParentFile().getAbsolutePath() + "(type: %s).", fileType));
}

if (!file.createNewFile()) {
throw new IOException(
String.format(
"Failed to create shuffle file (type: %s).", fileType));
}
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Failed to create shuffle file at %s for backup (type: %s).",
file.getAbsolutePath(),
fileType),
e);
}
}
try {
// TODO encryption
fileOutputChannel = FileChannel.open(file.toPath(), StandardOpenOption.APPEND);
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Failed to find file for writing at %s (type: %s).",
file.getAbsolutePath(),
fileType),
e);
}
}

@Override
public String getID() {
return String.format("%s-%d-%d-%s",
appId,
shuffleId,
mapId,
fileType);
}

@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
verifyShuffleFileOpenForWriting();
while (buf.hasRemaining()) {
fileOutputChannel.write(buf);
}
}

@Override
public void onComplete(String streamId) throws IOException {
logger.info(
"Finished writing {}. File type: {}", file.getAbsolutePath(), fileType);
fileOutputChannel.close();
}

@Override
public void onFailure(String streamId, Throwable cause) throws IOException {
logger.warn("Failed to write shuffle file at {} (type: %s).",
file.getAbsolutePath(),
fileType,
cause);
fileOutputChannel.close();
// TODO delete parent dirs too
if (!file.delete()) {
logger.warn(
"Failed to delete incomplete remote shuffle file at %s (type: %s)",
file.getAbsolutePath(),
fileType);
}
}

private void verifyShuffleFileOpenForWriting() {
if (fileOutputChannel == null) {
throw new RuntimeException(
String.format(
"Shuffle file at %s not open for writing (type: %s).",
file.getAbsolutePath(),
fileType));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle.k8s;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.spark.network.shuffle.protocol.ShuffleServiceHeartbeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.protocol.RegisterDriver;
import org.apache.spark.network.util.TransportConf;

/**
* A client for talking to the external shuffle service in Kubernetes coarse-grained mode.
*
* This is used by the Spark driver to register with each external shuffle service on the cluster.
* The reason why the driver has to talk to the service is for cleaning up shuffle files reliably
* after the application exits. Kubernetes does not provide a great alternative to do this, so Spark
* has to detect this itself.
*/
public class KubernetesExternalShuffleClient extends ExternalShuffleClient {
private static final Logger logger =
LoggerFactory.getLogger(KubernetesExternalShuffleClient.class);

private final ScheduledExecutorService heartbeaterThread =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("kubernetes-external-shuffle-client-heartbeater")
.build());

/**
* Creates a Kubernetes external shuffle client that wraps the {@link ExternalShuffleClient}.
* Please refer to docs on {@link ExternalShuffleClient} for more information.
*/
public KubernetesExternalShuffleClient(
TransportConf conf,
SecretKeyHolder secretKeyHolder,
boolean authEnabled,
long registrationTimeoutMs) {
super(conf, secretKeyHolder, authEnabled, registrationTimeoutMs);
}

public void registerDriverWithShuffleService(
String host,
int port,
long heartbeatTimeoutMs,
long heartbeatIntervalMs) throws IOException, InterruptedException {

checkInit();
ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer();
logger.info("Registering with external shuffle service at " + host + ":" + port);
TransportClient client = clientFactory.createClient(host, port);
client.sendRpc(registerDriver, new RegisterDriverCallback(client, heartbeatIntervalMs));
}

private class RegisterDriverCallback implements RpcResponseCallback {
private final TransportClient client;
private final long heartbeatIntervalMs;

private RegisterDriverCallback(TransportClient client, long heartbeatIntervalMs) {
this.client = client;
this.heartbeatIntervalMs = heartbeatIntervalMs;
}

@Override
public void onSuccess(ByteBuffer response) {
heartbeaterThread.scheduleAtFixedRate(
new Heartbeater(client), 0, heartbeatIntervalMs, TimeUnit.MILLISECONDS);
logger.info("Successfully registered app " + appId + " with external shuffle service.");
}

@Override
public void onFailure(Throwable e) {
logger.warn("Unable to register app " + appId + " with external shuffle service. " +
"Please manually remove shuffle data after driver exit. Error: " + e);
}
}

@Override
public void close() {
heartbeaterThread.shutdownNow();
super.close();
}

private class Heartbeater implements Runnable {

private final TransportClient client;

private Heartbeater(TransportClient client) {
this.client = client;
}

@Override
public void run() {
// TODO: Stop sending heartbeats if the shuffle service has lost the app due to timeout
client.send(new ShuffleServiceHeartbeat(appId).toByteBuffer());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;
import org.apache.spark.network.shuffle.protocol.ShuffleServiceHeartbeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
import org.apache.spark.network.shuffle.protocol.RegisterDriver;
import org.apache.spark.network.util.TransportConf;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import io.netty.buffer.Unpooled;

import org.apache.spark.network.protocol.Encodable;
import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;

/**
* Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or
Expand All @@ -42,7 +40,8 @@ public abstract class BlockTransferMessage implements Encodable {
/** Preceding every serialized message is its type, which allows us to deserialize it. */
public enum Type {
OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4),
HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6);
HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6), UPLOAD_SHUFFLE_PARTITION_STREAM(7),
REGISTER_SHUFFLE_INDEX(8), OPEN_SHUFFLE_PARTITION(9), UPLOAD_SHUFFLE_INDEX(10);

private final byte id;

Expand All @@ -68,6 +67,10 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
case 4: return RegisterDriver.decode(buf);
case 5: return ShuffleServiceHeartbeat.decode(buf);
case 6: return UploadBlockStream.decode(buf);
case 7: return UploadShufflePartitionStream.decode(buf);
case 8: return RegisterShuffleIndex.decode(buf);
case 9: return OpenShufflePartition.decode(buf);
case 10: return UploadShuffleIndex.decode(buf);
default: throw new IllegalArgumentException("Unknown message type: " + type);
}
}
Expand Down
Loading