Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming] fix streaming ci #9675

Merged
merged 42 commits into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c3f0ae5
fix replace proto import
chaokunyang Jul 24, 2020
c3bf18c
fix ArgByRef
chaokunyang Jul 27, 2020
d73a4d1
add explicit for TaskArgByValue
chaokunyang Jul 28, 2020
6d3e9c8
print binary for id check
chaokunyang Jul 28, 2020
e8715a5
fix arg by ref
chaokunyang Jul 28, 2020
accc059
refine wait file written
chaokunyang Jul 28, 2020
128988b
enforce protobuf version
chaokunyang Aug 12, 2020
c95c756
fix clear_object_ref
chaokunyang Aug 12, 2020
1670c7b
using wait_for_condition
chaokunyang Aug 12, 2020
d6de502
add comments for change import path
chaokunyang Aug 12, 2020
8b5f8f6
enforce protobuf version
chaokunyang Aug 12, 2020
9a5341e
revert protobuf version
chaokunyang Aug 12, 2020
0d39a8a
fix replace proto import
chaokunyang Jul 24, 2020
957c231
fix ArgByRef
chaokunyang Jul 27, 2020
0d5a561
add explicit for TaskArgByValue
chaokunyang Jul 28, 2020
c498cf7
print binary for id check
chaokunyang Jul 28, 2020
16daa11
fix arg by ref
chaokunyang Jul 28, 2020
eafc792
refine wait file written
chaokunyang Jul 28, 2020
eb0d371
enforce protobuf version
chaokunyang Aug 12, 2020
3baf8ac
fix clear_object_ref
chaokunyang Aug 12, 2020
12f6a13
using wait_for_condition
chaokunyang Aug 12, 2020
f6a0efd
add comments for change import path
chaokunyang Aug 12, 2020
d7102a1
enforce protobuf version
chaokunyang Aug 12, 2020
e842d16
revert protobuf version
chaokunyang Aug 12, 2020
115f726
Merge branch 'fix_streaming_ut' of https://github.com/ant-tech-allian…
Sep 4, 2020
2120b60
test.cc
Sep 4, 2020
17d4e4a
lint and refine name
chaokunyang Sep 4, 2020
12ef086
revert
chaokunyang Sep 4, 2020
75f4328
refine export symbols
chaokunyang Sep 4, 2020
75db61b
lint
chaokunyang Sep 4, 2020
359d17d
refine symbols export
chaokunyang Sep 5, 2020
2af9857
fix
chaokunyang Sep 5, 2020
e217239
lint
chaokunyang Sep 5, 2020
16c2a9f
lint
chaokunyang Sep 6, 2020
504a2ad
lint
chaokunyang Sep 7, 2020
28af33e
ld fixed
lixin-wei Sep 7, 2020
fba61d7
Merge branch 'master' of github.com:ray-project/ray into fix_streamin…
lixin-wei Sep 8, 2020
d8fe3cc
fix load_code_from_local
chaokunyang Sep 8, 2020
d2bba6f
stop ray cluster before test testHybridDataStream
chaokunyang Sep 8, 2020
19406b7
remove getNumWorkersPerProcess > 1 assert
chaokunyang Sep 8, 2020
c76ad5d
lint
chaokunyang Sep 8, 2020
e401318
fix ray init
chaokunyang Sep 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,27 @@ cc_library(
],
)

cc_library(
name = "exported_lib",
srcs = glob(
[
"src/ray/exported/exported.cc",
],
),
hdrs = glob(
[
"src/ray/exported/exported.h",
],
),
copts = COPTS,
alwayslink = 1,
strip_include_prefix = "src",
visibility = ["//visibility:public"],
deps = [
":core_worker_lib",
],
)

cc_binary(
name = "raylet",
srcs = ["src/ray/raylet/main.cc"],
Expand Down Expand Up @@ -1701,6 +1722,7 @@ pyx_library(
linkstatic = 1,
),
deps = [
"//:exported_lib",
"//:core_worker_lib",
"//:global_state_accessor_lib",
"//:ray_util",
Expand Down Expand Up @@ -1755,6 +1777,7 @@ cc_binary(
linkstatic = 1,
visibility = ["//java:__subpackages__"],
deps = [
"//:exported_lib",
"//:core_worker_lib",
"//:global_state_accessor_lib",
"//:src/ray/ray_exported_symbols.lds",
Expand Down
43 changes: 23 additions & 20 deletions src/ray/common/id.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class BaseID {
protected:
BaseID(const std::string &binary) {
RAY_CHECK(binary.size() == Size() || binary.size() == 0)
<< "expected size is " << Size() << ", but got " << binary.size();
<< "expected size is " << Size() << ", but got data " << binary << " of size "
<< binary.size();
std::memcpy(const_cast<uint8_t *>(this->Data()), binary.data(), binary.size());
}
// All IDs are immutable for hash evaluations. MutableData is only allow to use
Expand Down Expand Up @@ -341,24 +342,25 @@ std::ostream &operator<<(std::ostream &os, const TaskID &id);
std::ostream &operator<<(std::ostream &os, const ObjectID &id);
std::ostream &operator<<(std::ostream &os, const PlacementGroupID &id);

#define DEFINE_UNIQUE_ID(type) \
class RAY_EXPORT type : public UniqueID { \
public: \
explicit type(const UniqueID &from) { \
std::memcpy(&id_, from.Data(), kUniqueIDSize); \
} \
type() : UniqueID() {} \
static type FromRandom() { return type(UniqueID::FromRandom()); } \
static type FromBinary(const std::string &binary) { return type(binary); } \
static type Nil() { return type(UniqueID::Nil()); } \
static size_t Size() { return kUniqueIDSize; } \
\
private: \
explicit type(const std::string &binary) { \
RAY_CHECK(binary.size() == Size() || binary.size() == 0) \
<< "expected size is " << Size() << ", but got " << binary.size(); \
std::memcpy(&id_, binary.data(), binary.size()); \
} \
#define DEFINE_UNIQUE_ID(type) \
class RAY_EXPORT type : public UniqueID { \
public: \
explicit type(const UniqueID &from) { \
std::memcpy(&id_, from.Data(), kUniqueIDSize); \
} \
type() : UniqueID() {} \
static type FromRandom() { return type(UniqueID::FromRandom()); } \
static type FromBinary(const std::string &binary) { return type(binary); } \
static type Nil() { return type(UniqueID::Nil()); } \
static size_t Size() { return kUniqueIDSize; } \
\
private: \
explicit type(const std::string &binary) { \
RAY_CHECK(binary.size() == Size() || binary.size() == 0) \
<< "expected size is " << Size() << ", but got data " << binary << " of size " \
<< binary.size(); \
std::memcpy(&id_, binary.data(), binary.size()); \
} \
};

#include "ray/common/id_def.h"
Expand All @@ -385,7 +387,8 @@ T BaseID<T>::FromRandom() {
template <typename T>
T BaseID<T>::FromBinary(const std::string &binary) {
RAY_CHECK(binary.size() == T::Size() || binary.size() == 0)
<< "expected size is " << T::Size() << ", but got " << binary.size();
<< "expected size is " << T::Size() << ", but got data " << binary << " of size "
<< binary.size();
T t;
std::memcpy(t.MutableData(), binary.data(), binary.size());
return t;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/task/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ ObjectID TaskSpecification::ReturnId(size_t return_index) const {
}

bool TaskSpecification::ArgByRef(size_t arg_index) const {
return message_->args(arg_index).object_ref().object_id() != "";
return message_->args(arg_index).has_object_ref();
}

ObjectID TaskSpecification::ArgId(size_t arg_index) const {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/task/task_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class TaskArgByValue : public TaskArg {
///
/// \param[in] value Value of the argument.
/// \return The task argument.
TaskArgByValue(const std::shared_ptr<RayObject> &value) : value_(value) {
explicit TaskArgByValue(const std::shared_ptr<RayObject> &value) : value_(value) {
RAY_CHECK(value) << "Value can't be null.";
}

Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/transport/dependency_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void InlineDependencies(
if (!it->second->IsInPlasmaError()) {
// The object has not been promoted to plasma. Inline the object by
// clearing the reference and replacing it with the raw value.
mutable_arg->mutable_object_ref()->Clear();
mutable_arg->clear_object_ref();
if (it->second->HasData()) {
const auto &data = it->second->GetData();
mutable_arg->set_data(data->Data(), data->Size());
Expand Down
34 changes: 34 additions & 0 deletions src/ray/exported/exported.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include "ray/core_worker/core_worker.h"
#include "ray/exported/test.h"

namespace ray {
namespace exported_ {
chaokunyang marked this conversation as resolved.
Show resolved Hide resolved

void SubmitActorTask(const ActorID &peer_actor_id,
chaokunyang marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<LocalMemoryBuffer> buffer, RayFunction &function,
int return_num, std::vector<ObjectID> &return_ids) {
std::unordered_map<std::string, double> resources;
std::string name = function.GetFunctionDescriptor()->DefaultTaskName();
TaskOptions options{name, return_num, resources};

char meta_data[3] = {'R', 'A', 'W'};
std::shared_ptr<LocalMemoryBuffer> meta =
std::make_shared<LocalMemoryBuffer>((uint8_t *)meta_data, 3, true);

std::vector<std::unique_ptr<TaskArg>> args;
if (function.GetLanguage() == Language::PYTHON) {
auto dummy = "__RAY_DUMMY__";
std::shared_ptr<LocalMemoryBuffer> dummyBuffer =
std::make_shared<LocalMemoryBuffer>((uint8_t *)dummy, 13, true);
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
std::move(dummyBuffer), meta, std::vector<ObjectID>(), true)));
}
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
std::move(buffer), meta, std::vector<ObjectID>(), true)));

std::vector<std::shared_ptr<RayObject>> results;
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id, function, args,
options, &return_ids);
}
} // namespace exported_
} // namespace ray
14 changes: 14 additions & 0 deletions src/ray/exported/exported.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once
#include "ray/common/buffer.h"
#include "ray/common/id.h"
#include "ray/core_worker/common.h"

namespace ray {
namespace exported_ {

void SubmitActorTask(const ActorID &peer_actor_id,
std::shared_ptr<LocalMemoryBuffer> buffer, RayFunction &function,
int return_num, std::vector<ObjectID> &return_ids);

} // namespace exported_
} // namespace ray
2 changes: 2 additions & 0 deletions src/ray/ray_exported_symbols.lds
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
*ray*TaskID*
*ray*ActorID*
*ray*ObjectID*
*ray*ObjectReference*
# Others
*ray*CoreWorker*
*PyInit*
*init_raylet*
*Java*
*JNI_*
*ray*exported_*
2 changes: 2 additions & 0 deletions src/ray/ray_version_script.lds
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Ray ABI is not finalized, the exact set of exported (C/C++) APIs is subject to change.
VERSION_1.0 {
global:
*ray*exported_*
# common
*ray*Language*;
*ray*RayObject*;
Expand All @@ -23,6 +24,7 @@ VERSION_1.0 {
*ray*TaskID*;
*ray*ActorID*;
*ray*ObjectID*;
*ray*ObjectReference*;
# Others
*ray*CoreWorker*;
*PyInit*;
Expand Down
11 changes: 11 additions & 0 deletions streaming/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ cc_binary(
deps = ["//:core_worker_lib"],
)

cc_binary(
name = "exported_lib.so",
copts = COPTS,
linkshared = 1,
deps = ["//:exported_lib"],
)


cc_library(
name = "streaming_util",
srcs = glob([
Expand Down Expand Up @@ -139,6 +147,7 @@ cc_library(
],
"//conditions:default": [
"core_worker_lib.so",
"exported_lib.so",
],
}),
)
Expand Down Expand Up @@ -283,7 +292,9 @@ genrule(
GENERATED_DIR="streaming/python/generated"
mkdir -p "$$GENERATED_DIR"
touch "$$GENERATED_DIR/__init__.py"
# Use this `sed` command to change the import path in the generated file.
sed -i -E 's/from streaming.src.protobuf/from ./' "$$GENERATED_DIR/remote_call_pb2.py"
sed -i -E 's/from protobuf/from ./' "$$GENERATED_DIR/remote_call_pb2.py"
chaokunyang marked this conversation as resolved.
Show resolved Hide resolved
date > $@
""",
local = 1,
Expand Down
30 changes: 21 additions & 9 deletions streaming/python/tests/test_hybrid_stream.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import json
import os
import subprocess

import ray
from ray.streaming import StreamingContext
import subprocess
import os
from ray.test_utils import wait_for_condition


def map_func1(x):
Expand Down Expand Up @@ -45,6 +47,7 @@ def sink_func(x):
print("HybridStreamTest", x)
with open(sink_file, "a") as f:
f.write(str(x))
f.flush()

ctx = StreamingContext.Builder().build()
ctx.from_values("a", "b", "c") \
Expand All @@ -54,14 +57,23 @@ def sink_func(x):
.as_python_stream() \
.sink(sink_func)
ctx.submit("HybridStreamTest")
import time
time.sleep(3)

def check_succeed():
if os.path.exists(sink_file):
import time
time.sleep(3) # Wait all data be written
with open(sink_file, "r") as f:
result = f.read()
assert "a" in result
assert "b" not in result
assert "c" in result
print("Execution succeed")
return True
return False

wait_for_condition(check_succeed, timeout=60, retry_interval_ms=1000)
print("Execution succeed")
ray.shutdown()
with open(sink_file, "r") as f:
result = f.read()
assert "a" in result
assert "b" not in result
assert "c" in result


if __name__ == "__main__":
Expand Down
32 changes: 3 additions & 29 deletions streaming/src/queue/transport.cc
Original file line number Diff line number Diff line change
@@ -1,50 +1,24 @@
#include "queue/transport.h"

#include "queue/utils.h"
#include "ray/exported/test.h"

namespace ray {
namespace streaming {

static constexpr int TASK_OPTION_RETURN_NUM_0 = 0;
static constexpr int TASK_OPTION_RETURN_NUM_1 = 1;

void Transport::SendInternal(std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function, int return_num,
std::vector<ObjectID> &return_ids) {
std::unordered_map<std::string, double> resources;
std::string name = function.GetFunctionDescriptor()->DefaultTaskName();
TaskOptions options{name, return_num, resources};

char meta_data[3] = {'R', 'A', 'W'};
std::shared_ptr<LocalMemoryBuffer> meta =
std::make_shared<LocalMemoryBuffer>((uint8_t *)meta_data, 3, true);

std::vector<std::unique_ptr<TaskArg>> args;
if (function.GetLanguage() == Language::PYTHON) {
auto dummy = "__RAY_DUMMY__";
std::shared_ptr<LocalMemoryBuffer> dummyBuffer =
std::make_shared<LocalMemoryBuffer>((uint8_t *)dummy, 13, true);
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
std::move(dummyBuffer), meta, std::vector<ObjectID>(), true)));
}
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
std::move(buffer), meta, std::vector<ObjectID>(), true)));

std::vector<std::shared_ptr<RayObject>> results;
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id_, function, args,
options, &return_ids);
}

void Transport::Send(std::shared_ptr<LocalMemoryBuffer> buffer) {
STREAMING_LOG(INFO) << "Transport::Send buffer size: " << buffer->Size();
std::vector<ObjectID> return_ids;
SendInternal(std::move(buffer), async_func_, TASK_OPTION_RETURN_NUM_0, return_ids);
ray::exported_::SendInternal(peer_actor_id_, std::move(buffer), async_func_, TASK_OPTION_RETURN_NUM_0, return_ids);
}

std::shared_ptr<LocalMemoryBuffer> Transport::SendForResult(
std::shared_ptr<LocalMemoryBuffer> buffer, int64_t timeout_ms) {
std::vector<ObjectID> return_ids;
SendInternal(buffer, sync_func_, TASK_OPTION_RETURN_NUM_1, return_ids);
ray_exported::ray::SendInternal(peer_actor_id_, buffer, sync_func_, TASK_OPTION_RETURN_NUM_1, return_ids);

std::vector<std::shared_ptr<RayObject>> results;
Status get_st =
Expand Down
10 changes: 0 additions & 10 deletions streaming/src/queue/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,6 @@ class Transport {
std::shared_ptr<LocalMemoryBuffer> SendForResultWithRetry(
std::shared_ptr<LocalMemoryBuffer> buffer, int retry_cnt, int64_t timeout_ms);

private:
/// Send buffer internal
/// \param[in] buffer buffer to be sent.
/// \param[in] function the function descriptor of peer's function.
/// \param[in] return_num return value number of the call.
/// \param[out] return_ids return ids from SubmitActorTask.
virtual void SendInternal(std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function, int return_num,
std::vector<ObjectID> &return_ids);

private:
WorkerID worker_id_;
ActorID peer_actor_id_;
Expand Down