Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming]Streaming queue support failover #8161

Merged
merged 15 commits into from
Aug 25, 2020
8 changes: 8 additions & 0 deletions streaming/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,14 @@ cc_test(
deps = test_common_deps,
)

cc_test(
name = "queue_protobuf_tests",
srcs = [
"src/test/queue_protobuf_tests.cc",
],
deps = test_common_deps,
)

python_proto_compile(
name = "streaming_py_proto",
deps = ["//streaming:streaming_proto"],
Expand Down
4 changes: 2 additions & 2 deletions streaming/python/runtime/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ def get_parameters(self):
def __init__(self):
self._parameters = []

def build_input_queue_parameters(self, queue_ids_dict):
self.build_parameters(queue_ids_dict,
def build_input_queue_parameters(self, from_actors):
self.build_parameters(from_actors,
self._java_writer_async_function_descriptor,
self._java_writer_sync_function_descriptor,
self._python_writer_async_function_descriptor,
Expand Down
71 changes: 48 additions & 23 deletions streaming/src/channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ StreamingStatus StreamingQueueProducer::CreateQueue() {
<< " data_size: " << channel_info_.queue_size;
auto upstream_handler = ray::streaming::UpstreamQueueMessageHandler::GetService();
if (upstream_handler->UpstreamQueueExists(channel_info_.channel_id)) {
RAY_LOG(INFO) << "StreamingQueueWriter::CreateQueue duplicate!!!";
STREAMING_LOG(INFO) << "StreamingQueueWriter::CreateQueue duplicate!!!";
MissiontoMars marked this conversation as resolved.
Show resolved Hide resolved
return StreamingStatus::OK;
}

Expand Down Expand Up @@ -99,8 +99,9 @@ StreamingStatus StreamingQueueProducer::NotifyChannelConsumed(uint64_t channel_o

StreamingStatus StreamingQueueProducer::ProduceItemToChannel(uint8_t *data,
uint32_t data_size) {
Status status =
PushQueueItem(channel_info_.current_seq_id + 1, data, data_size, current_time_ms());
/// TODO: Fix msg_id_start and msg_id_end
Status status = PushQueueItem(channel_info_.current_seq_id + 1, data, data_size,
current_time_ms(), 0, 0);

if (status.code() != StatusCode::OK) {
STREAMING_LOG(DEBUG) << channel_info_.channel_id << " => Queue is full"
Expand All @@ -119,19 +120,22 @@ StreamingStatus StreamingQueueProducer::ProduceItemToChannel(uint8_t *data,
}

Status StreamingQueueProducer::PushQueueItem(uint64_t seq_id, uint8_t *data,
uint32_t data_size, uint64_t timestamp) {
STREAMING_LOG(INFO) << "StreamingQueueProducer::PushQueueItem:"
<< " qid: " << channel_info_.channel_id << " seq_id: " << seq_id
<< " data_size: " << data_size;
Status status = queue_->Push(seq_id, data, data_size, timestamp, false);
uint32_t data_size, uint64_t timestamp,
uint64_t msg_id_start, uint64_t msg_id_end) {
STREAMING_LOG(DEBUG) << "StreamingQueueProducer::PushQueueItem:"
<< " qid: " << channel_info_.channel_id << " seq_id: " << seq_id
<< " data_size: " << data_size;
Status status =
queue_->Push(seq_id, data, data_size, timestamp, msg_id_start, msg_id_end, false);
if (status.IsOutOfMemory()) {
status = queue_->TryEvictItems();
if (!status.ok()) {
STREAMING_LOG(INFO) << "Evict fail.";
return status;
}

status = queue_->Push(seq_id, data, data_size, timestamp, false);
status =
queue_->Push(seq_id, data, data_size, timestamp, msg_id_start, msg_id_end, false);
}

queue_->Send();
Expand All @@ -148,24 +152,45 @@ StreamingQueueConsumer::~StreamingQueueConsumer() {
STREAMING_LOG(INFO) << "Consumer Destroy";
}

StreamingStatus StreamingQueueConsumer::CreateTransferChannel() {
StreamingQueueStatus StreamingQueueConsumer::GetQueue(
const ObjectID &queue_id, uint64_t start_msg_id,
const ChannelCreationParameter &init_param) {
STREAMING_LOG(INFO) << "GetQueue qid: " << queue_id << " start_msg_id: " << start_msg_id
<< " actor_id: " << init_param.actor_id;
auto downstream_handler = ray::streaming::DownstreamQueueMessageHandler::GetService();
STREAMING_LOG(INFO) << "GetQueue qid: " << channel_info_.channel_id
<< " start_seq_id: " << channel_info_.current_seq_id + 1;
if (downstream_handler->DownstreamQueueExists(channel_info_.channel_id)) {
RAY_LOG(INFO) << "StreamingQueueReader::GetQueue duplicate!!!";
return StreamingStatus::OK;
if (downstream_handler->DownstreamQueueExists(queue_id)) {
STREAMING_LOG(INFO) << "StreamingQueueReader:: Already got this queue.";
return StreamingQueueStatus::OK;
}

downstream_handler->SetPeerActorID(
channel_info_.channel_id, channel_info_.parameter.actor_id,
*channel_info_.parameter.async_function, *channel_info_.parameter.sync_function);
STREAMING_LOG(INFO) << "Create ReaderQueue " << channel_info_.channel_id
<< " pull from start_seq_id: " << channel_info_.current_seq_id + 1;
queue_ = downstream_handler->CreateDownstreamQueue(channel_info_.channel_id,
channel_info_.parameter.actor_id);
downstream_handler->SetPeerActorID(queue_id, channel_info_.parameter.actor_id,
*init_param.async_function,
*init_param.sync_function);
STREAMING_LOG(INFO) << "Create ReaderQueue " << queue_id
<< " pull from start_msg_id: " << start_msg_id;
queue_ = downstream_handler->CreateDownstreamQueue(queue_id, init_param.actor_id);
STREAMING_CHECK(queue_ != nullptr);

return StreamingStatus::OK;
bool is_first_pull;
return downstream_handler->PullQueue(queue_id, start_msg_id, is_first_pull);
}

TransferCreationStatus StreamingQueueConsumer::CreateTransferChannel() {
StreamingQueueStatus status =
GetQueue(channel_info_.channel_id, channel_info_.current_seq_id + 1,
channel_info_.parameter);

if (status == StreamingQueueStatus::OK) {
return TransferCreationStatus::PullOk;
} else if (status == StreamingQueueStatus::NoValidData) {
return TransferCreationStatus::FreshStarted;
} else if (status == StreamingQueueStatus::Timeout) {
return TransferCreationStatus::Timeout;
} else if (status == StreamingQueueStatus::DataLost) {
return TransferCreationStatus::DataLost;
}
STREAMING_LOG(FATAL) << "Invalid StreamingQueueStatus, status=" << status;
return TransferCreationStatus::Invalid;
}

StreamingStatus StreamingQueueConsumer::DestroyTransferChannel() {
Expand Down
20 changes: 16 additions & 4 deletions streaming/src/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
namespace ray {
namespace streaming {

enum class TransferCreationStatus : uint32_t {
FreshStarted = 0,
PullOk = 1,
Timeout = 2,
DataLost = 3,
Invalid = 999,
};

struct StreamingQueueInfo {
uint64_t first_seq_id = 0;
uint64_t last_seq_id = 0;
Expand Down Expand Up @@ -98,7 +106,7 @@ class ConsumerChannel {
explicit ConsumerChannel(std::shared_ptr<Config> &transfer_config,
ConsumerChannelInfo &c_channel_info);
virtual ~ConsumerChannel() = default;
virtual StreamingStatus CreateTransferChannel() = 0;
virtual TransferCreationStatus CreateTransferChannel() = 0;
virtual StreamingStatus DestroyTransferChannel() = 0;
virtual StreamingStatus ClearTransferCheckpoint(uint64_t checkpoint_id,
uint64_t checkpoint_offset) = 0;
Expand Down Expand Up @@ -129,7 +137,7 @@ class StreamingQueueProducer : public ProducerChannel {
private:
StreamingStatus CreateQueue();
Status PushQueueItem(uint64_t seq_id, uint8_t *data, uint32_t data_size,
uint64_t timestamp);
uint64_t timestamp, uint64_t msg_id_start, uint64_t msg_id_end);

private:
std::shared_ptr<WriterQueue> queue_;
Expand All @@ -140,7 +148,7 @@ class StreamingQueueConsumer : public ConsumerChannel {
explicit StreamingQueueConsumer(std::shared_ptr<Config> &transfer_config,
ConsumerChannelInfo &c_channel_info);
~StreamingQueueConsumer() override;
StreamingStatus CreateTransferChannel() override;
TransferCreationStatus CreateTransferChannel() override;
StreamingStatus DestroyTransferChannel() override;
StreamingStatus ClearTransferCheckpoint(uint64_t checkpoint_id,
uint64_t checkpoint_offset) override;
Expand All @@ -149,6 +157,10 @@ class StreamingQueueConsumer : public ConsumerChannel {
uint32_t &data_size, uint32_t timeout) override;
StreamingStatus NotifyChannelConsumed(uint64_t offset_id) override;

private:
StreamingQueueStatus GetQueue(const ObjectID &queue_id, uint64_t start_msg_id,
const ChannelCreationParameter &init_param);

private:
std::shared_ptr<ReaderQueue> queue_;
};
Expand Down Expand Up @@ -183,7 +195,7 @@ class MockConsumer : public ConsumerChannel {
explicit MockConsumer(std::shared_ptr<Config> &transfer_config,
ConsumerChannelInfo &c_channel_info)
: ConsumerChannel(transfer_config, c_channel_info){};
StreamingStatus CreateTransferChannel() override { return StreamingStatus::OK; }
TransferCreationStatus CreateTransferChannel() override { return TransferCreationStatus::PullOk; }
StreamingStatus DestroyTransferChannel() override { return StreamingStatus::OK; }
StreamingStatus ClearTransferCheckpoint(uint64_t checkpoint_id,
uint64_t checkpoint_offset) override {
Expand Down
4 changes: 2 additions & 2 deletions streaming/src/data_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ StreamingStatus DataReader::InitChannel() {
}

channel_map_.emplace(input_channel, channel);
StreamingStatus status = channel->CreateTransferChannel();
if (StreamingStatus::OK != status) {
TransferCreationStatus status = channel->CreateTransferChannel();
if (TransferCreationStatus::PullOk != status) {
STREAMING_LOG(ERROR) << "Initialize queue failed, id => " << input_channel;
}
}
Expand Down
58 changes: 43 additions & 15 deletions streaming/src/protobuf/streaming_queue.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,45 @@ enum StreamingQueueMessageType {
StreamingQueueNotificationMsgType = 3;
StreamingQueueTestInitMsgType = 4;
StreamingQueueTestCheckStatusRspMsgType = 5;
StreamingQueuePullRequestMsgType = 6;
StreamingQueuePullResponseMsgType = 7;
StreamingQueueResendDataMsgType = 8;
}

enum StreamingQueueError {
OK = 0;
QUEUE_NOT_EXIST = 1;
NO_VALID_DATA_TO_PULL = 2;
DATA_LOST = 2;
NO_VALID_DATA = 3;
}

message StreamingQueueDataMsg {
message MessageCommon {
bytes src_actor_id = 1;
bytes dst_actor_id = 2;
bytes queue_id = 3;
uint64 seq_id = 4;
}

message StreamingQueueDataMsg {
MessageCommon common = 1;
uint64 seq_id = 2;
uint64 msg_id_start = 3;
uint64 msg_id_end = 4;
uint64 length = 5;
bool raw = 6;
}

message StreamingQueueCheckMsg {
bytes src_actor_id = 1;
bytes dst_actor_id = 2;
bytes queue_id = 3;
MessageCommon common = 1;
}

message StreamingQueueCheckRspMsg {
bytes src_actor_id = 1;
bytes dst_actor_id = 2;
bytes queue_id = 3;
StreamingQueueError err_code = 4;
MessageCommon common = 1;
StreamingQueueError err_code = 2;
}

message StreamingQueueNotificationMsg {
bytes src_actor_id = 1;
bytes dst_actor_id = 2;
bytes queue_id = 3;
uint64 seq_id = 4;
MessageCommon common = 1;
uint64 seq_id = 2;
}

// for test
Expand All @@ -67,4 +71,28 @@ message StreamingQueueTestInitMsg {
message StreamingQueueTestCheckStatusRspMsg {
string test_name = 1;
bool status = 2;
}
}

message StreamingQueuePullRequestMsg {
MissiontoMars marked this conversation as resolved.
Show resolved Hide resolved
MessageCommon common = 1;
uint64 msg_id = 2;
}

message StreamingQueuePullResponseMsg {
MessageCommon common = 1;
uint64 seq_id = 2;
uint64 msg_id = 3;
StreamingQueueError err_code = 4;
bool is_upstream_first_pull = 5;
}

message StreamingQueueResendDataMsg {
MessageCommon common = 1;
uint64 first_seq_id = 2;
uint64 last_seq_id = 3;
uint64 seq_id = 4;
uint64 msg_id_start = 5;
uint64 msg_id_end = 6;
uint64 length = 7;
bool raw = 8;
}
Loading