Skip to content

Commit

Permalink
[Streaming] Fault Tolerance Implementation (#10595)
Browse files Browse the repository at this point in the history
  • Loading branch information
lixin-wei authored Sep 5, 2020
1 parent 31f8ce4 commit f31ee84
Show file tree
Hide file tree
Showing 161 changed files with 7,056 additions and 1,224 deletions.
73 changes: 69 additions & 4 deletions streaming/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ proto_library(
srcs = ["src/protobuf/streaming.proto"],
strip_import_prefix = "src",
visibility = ["//visibility:public"],
deps = [
"@com_google_protobuf//:any_proto",
],
)

proto_library(
Expand All @@ -22,7 +25,10 @@ proto_library(
srcs = ["src/protobuf/remote_call.proto"],
strip_import_prefix = "src",
visibility = ["//visibility:public"],
deps = ["streaming_proto"],
deps = [
"streaming_proto",
"@com_google_protobuf//:any_proto",
],
)

cc_proto_library(
Expand Down Expand Up @@ -70,9 +76,10 @@ cc_library(
"src/util/*.h",
]),
copts = COPTS,
strip_include_prefix = "src",
includes = ["src"],
visibility = ["//visibility:public"],
deps = [
"ray_common.so",
"ray_util.so",
"@boost//:any",
"@com_google_googletest//:gtest",
Expand Down Expand Up @@ -143,6 +150,62 @@ cc_library(
}),
)

cc_library(
name = "streaming_channel",
srcs = glob(["src/channel/*.cc"]),
hdrs = glob(["src/channel/*.h"]),
copts = COPTS,
visibility = ["//visibility:public"],
deps = [
":streaming_common",
":streaming_message",
":streaming_queue",
":streaming_ring_buffer",
":streaming_util",
],
)

cc_library(
name = "streaming_reliability",
srcs = glob(["src/reliability/*.cc"]),
hdrs = glob(["src/reliability/*.h"]),
copts = COPTS,
includes = ["src/"],
visibility = ["//visibility:public"],
deps = [
":streaming_channel",
":streaming_message",
":streaming_util",
],
)

cc_library(
name = "streaming_ring_buffer",
srcs = glob(["src/ring_buffer/*.cc"]),
hdrs = glob(["src/ring_buffer/*.h"]),
copts = COPTS,
includes = ["src/"],
visibility = ["//visibility:public"],
deps = [
"core_worker_lib.so",
":ray_common.so",
":ray_util.so",
":streaming_message",
"@boost//:circular_buffer",
"@boost//:thread",
],
)

cc_library(
name = "streaming_common",
srcs = glob(["src/common/*.cc"]),
hdrs = glob(["src/common/*.h"]),
copts = COPTS,
includes = ["src/"],
visibility = ["//visibility:public"],
deps = [],
)

cc_library(
name = "streaming_lib",
srcs = glob([
Expand All @@ -159,11 +222,13 @@ cc_library(
deps = [
"ray_common.so",
"ray_util.so",
":streaming_channel",
":streaming_common",
":streaming_config",
":streaming_message",
":streaming_queue",
":streaming_reliability",
":streaming_util",
"@boost//:circular_buffer",
],
)

Expand Down Expand Up @@ -284,6 +349,7 @@ genrule(
mkdir -p "$$GENERATED_DIR"
touch "$$GENERATED_DIR/__init__.py"
sed -i -E 's/from streaming.src.protobuf/from ./' "$$GENERATED_DIR/remote_call_pb2.py"
sed -i -E 's/from protobuf/from ./' "$$GENERATED_DIR/remote_call_pb2.py"
date > $@
""",
local = 1,
Expand All @@ -298,7 +364,6 @@ cc_binary(
]),
copts = COPTS,
linkshared = 1,
linkstatic = 1,
visibility = ["//visibility:public"],
deps = [
":streaming_lib",
Expand Down
2 changes: 2 additions & 0 deletions streaming/java/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,12 @@ define_java_module(
":io_ray_ray_streaming-state",
"//java:io_ray_ray_api",
"//java:io_ray_ray_runtime",
"@maven//:commons_io_commons_io",
"@ray_streaming_maven//:com_github_davidmoten_flatbuffers_java",
"@ray_streaming_maven//:com_google_code_findbugs_jsr305",
"@ray_streaming_maven//:com_google_guava_guava",
"@ray_streaming_maven//:com_google_protobuf_protobuf_java",
"@ray_streaming_maven//:commons_collections_commons_collections",
"@ray_streaming_maven//:de_ruedigermoeller_fst",
"@ray_streaming_maven//:org_aeonbits_owner_owner",
"@ray_streaming_maven//:org_apache_commons_commons_lang3",
Expand Down
3 changes: 3 additions & 0 deletions streaming/java/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@
<suppress checks="ClassTypeParameterName" files="StreamTask.java"/>
<!-- suppress check for flatbuffer-generated files. -->
<suppress checks=".*" files="io[\\/]ray[\\/]streaming[\\/]runtime[\\/]generated[\\/]"/>

<!-- suppress indention check for lambdas-->
<suppress checks="Indentation" files="FailoverCoordinator.java"/>
</suppressions>
1 change: 1 addition & 0 deletions streaming/java/dependencies.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def gen_streaming_java_deps():
"org.mockito:mockito-all:1.10.19",
"org.powermock:powermock-module-testng:1.6.6",
"org.powermock:powermock-api-mockito:1.6.6",
"commons-collections:commons-collections:3.2.1",
],
repositories = [
"https://repo.spring.io/plugins-release/",
Expand Down
42 changes: 42 additions & 0 deletions streaming/java/generate_jni_header_files.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env bash

set -e
set -x

cd "$(dirname "$0")"

bazel build all_streaming_tests_deploy.jar

function generate_one()
{
file=${1//./_}.h
javah -classpath ../../bazel-bin/streaming/java/all_streaming_tests_deploy.jar "$1"

# prepend licence first
cat <<EOF > ../src/lib/java/"$file"
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
EOF
# then append the generated header file
cat "$file" >> ../src/lib/java/"$file"
rm -f "$file"
}

generate_one io.ray.streaming.runtime.transfer.channel.ChannelId
generate_one io.ray.streaming.runtime.transfer.DataReader
generate_one io.ray.streaming.runtime.transfer.DataWriter
generate_one io.ray.streaming.runtime.transfer.TransferHandler

rm -f io_ray_streaming_*.h
1 change: 0 additions & 1 deletion streaming/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
<guava.version>27.0.1-jre</guava.version>
<fst.version>2.57</fst.version>
</properties>

<profiles>
<profile>
<id>release</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,26 @@
*/
public interface Function extends Serializable {

/**
* This method will be called periodically by framework, you should return a a serializable
* object which represents function state, framework will help you to serialize this object, save
* it to storage, and load it back when in fail-over through.
* {@link Function#loadCheckpoint(Serializable)}.
*
* @return A serializable object which represents function state.
*/
default Serializable saveCheckpoint() {
return null;
}

/**
* This method will be called by framework when a worker died and been restarted.
* We will pass the last object you returned in {@link Function#saveCheckpoint()} when
* doing checkpoint, you are responsible to load this object back to you function.
*
* @param checkpointObject the last object you returned in {@link Function#saveCheckpoint()}
*/
default void loadCheckpoint(Serializable checkpointObject) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
*/
public interface SourceFunction<T> extends Function {

void init(int parallel, int index);
void init(int parallelism, int index);

void run(SourceContext<T> ctx) throws Exception;
void fetch(SourceContext<T> ctx) throws Exception;

void close();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.ray.streaming.api.function.internal;

import io.ray.streaming.api.function.impl.SourceFunction;
import java.util.ArrayList;
import java.util.Collection;

/**
Expand All @@ -12,22 +11,25 @@
public class CollectionSourceFunction<T> implements SourceFunction<T> {

private Collection<T> values;
private boolean finished = false;

public CollectionSourceFunction(Collection<T> values) {
this.values = values;
}

@Override
public void init(int parallel, int index) {
public void init(int totalParallel, int currentIndex) {
}

@Override
public void run(SourceContext<T> ctx) throws Exception {
public void fetch(SourceContext<T> ctx) throws Exception {
if (finished) {
return;
}
for (T value : values) {
ctx.collect(value);
}
// empty collection
values = new ArrayList<>();
finished = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.ray.streaming.message;


import java.util.Objects;

public class KeyRecord<K, T> extends Record<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,13 @@ public interface Operator extends Serializable {

ChainStrategy getChainStrategy();

/**
* See {@link Function#saveCheckpoint()}.
*/
Serializable saveCheckpoint();

/**
* See {@link Function#loadCheckpoint(Serializable)}.
*/
void loadCheckpoint(Serializable checkpointObject);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

public interface SourceOperator<T> extends Operator {

void run();
void fetch();

SourceContext<T> getSourceContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.ray.streaming.api.function.internal.Functions;
import io.ray.streaming.message.KeyRecord;
import io.ray.streaming.message.Record;
import java.io.Serializable;
import java.util.List;

public abstract class StreamOperator<F extends Function> implements Operator {
Expand Down Expand Up @@ -72,6 +73,16 @@ protected void collect(KeyRecord keyRecord) {
}
}

@Override
public Serializable saveCheckpoint() {
return function.saveCheckpoint();
}

@Override
public void loadCheckpoint(Serializable checkpointObject) {
function.loadCheckpoint(checkpointObject);
}

@Override
public String getName() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.ray.streaming.operator.SourceOperator;
import io.ray.streaming.operator.StreamOperator;
import io.ray.streaming.operator.TwoInputOperator;
import java.io.Serializable;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -85,6 +86,23 @@ public Operator getTailOperator() {
return tailOperator;
}

@Override
public Serializable saveCheckpoint() {
Object[] checkpoints = new Object[operators.size()];
for (int i = 0; i < operators.size(); ++i) {
checkpoints[i] = operators.get(i).saveCheckpoint();
}
return checkpoints;
}

@Override
public void loadCheckpoint(Serializable checkpointObject) {
Serializable[] checkpoints = (Serializable[]) checkpointObject;
for (int i = 0; i < operators.size(); ++i) {
operators.get(i).loadCheckpoint(checkpoints[i]);
}
}

private RuntimeContext createRuntimeContext(RuntimeContext runtimeContext, int index) {
return (RuntimeContext) Proxy.newProxyInstance(runtimeContext.getClass().getClassLoader(),
new Class[] {RuntimeContext.class},
Expand Down Expand Up @@ -125,8 +143,8 @@ static class ChainedSourceOperator<T> extends ChainedOperator
}

@Override
public void run() {
sourceOperator.run();
public void fetch() {
sourceOperator.fetch();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
}

@Override
public void run() {
public void fetch() {
try {
this.function.run(this.sourceContext);
this.function.fetch(this.sourceContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Loading

0 comments on commit f31ee84

Please sign in to comment.