Skip to content

Commit

Permalink
[Streaming] Test build fixed (#10617)
Browse files Browse the repository at this point in the history
  • Loading branch information
lixin-wei authored Sep 8, 2020
1 parent ca8792e commit 2b95e71
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public JobMaster(Map<String, String> confMap) {
runtimeContext = new JobMasterRuntimeContext(streamingConfig);

// load checkpoint if is recover
if (Ray.getRuntimeContext().wasCurrentActorRestarted()) {
if (!Ray.getRuntimeContext().isSingleProcess() && Ray.getRuntimeContext()
.wasCurrentActorRestarted()) {
loadMasterCheckpoint();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
package io.ray.streaming.runtime.master;

import io.ray.api.Ray;
import io.ray.streaming.runtime.BaseUnitTest;
import java.util.HashMap;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class JobMasterTest {
public class JobMasterTest extends BaseUnitTest {

@BeforeMethod
public void init() {
// ray init
Ray.init();
}

@AfterMethod
public void tearDown() {
Ray.shutdown();
}

@Test
public void testCreation() {
Expand Down
27 changes: 23 additions & 4 deletions streaming/src/data_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ StreamingStatus DataReader::GetMessageFromChannel(ConsumerChannelInfo &channel_i
channel_map_[channel_info.channel_id]->ConsumeItemFromChannel(
message->data, message->data_size, wait_time_ms);

STREAMING_LOG(DEBUG) << "ConsumeItemFromChannel done, bytes="
<< Util::Byte2hex(message->data, message->data_size);

channel_info.get_queue_item_times++;
if (!message->data) {
RETURN_IF_NOT_OK(reliability_helper_->HandleNoValidItem(channel_info));
Expand Down Expand Up @@ -265,8 +268,9 @@ StreamingStatus DataReader::StashNextMessageAndPop(std::shared_ptr<DataBundle> &

// Get the first message.
message = reader_merger_->top();
STREAMING_LOG(DEBUG) << "Messages to be poped=" << *message
<< ", merger size=" << reader_merger_->size();
STREAMING_LOG(DEBUG) << "Messages to be popped=" << *message
<< ", merger size=" << reader_merger_->size()
<< ", bytes=" << Util::Byte2hex(message->data, message->data_size);

// Then stash next message from its from queue.
std::shared_ptr<DataBundle> new_msg = std::make_shared<DataBundle>();
Expand All @@ -275,11 +279,23 @@ StreamingStatus DataReader::StashNextMessageAndPop(std::shared_ptr<DataBundle> &
new_msg->last_barrier_id = channel_info.barrier_id;
reader_merger_->push(new_msg);
STREAMING_LOG(DEBUG) << "New message pushed=" << *new_msg
<< ", merger size=" << reader_merger_->size();
<< ", merger size=" << reader_merger_->size()
<< ", bytes=" << Util::Byte2hex(new_msg->data, new_msg->data_size);
// Barrier's message ID is equal to last message's ID.
// We will mark last message's ID as consumed in GetBundle.
// So barrier might be erased by streaming queue. We make a hack way here to
// copy barrier's data from streaming queue. TODO: There should be a more elegant way to
// do this.
if (new_msg->meta->IsBarrier()) {
uint8_t *origin_data = new_msg->data;
new_msg->Realloc(new_msg->data_size);
memcpy(new_msg->data, origin_data, new_msg->data_size);
}

// Pop message.
reader_merger_->pop();
STREAMING_LOG(DEBUG) << "Message poped, msg=" << *message;
STREAMING_LOG(DEBUG) << "Message popped, msg=" << *message
<< ", bytes=" << Util::Byte2hex(message->data, message->data_size);

// Record some metrics.
channel_info.last_queue_item_delay =
Expand Down Expand Up @@ -397,6 +413,7 @@ StreamingStatus DataReader::GetBundle(const uint32_t timeout_ms,
STREAMING_LOG(INFO) << "merger vector item=" << bundle->from;
}
}

RETURN_IF_NOT_OK(GetMergedMessageBundle(message, is_valid_break, timeout_ms));
if (!is_valid_break) {
empty_bundle_cnt++;
Expand All @@ -420,6 +437,8 @@ void DataReader::GetOffsetInfo(
}

void DataReader::NotifyConsumedItem(ConsumerChannelInfo &channel_info, uint64_t offset) {
STREAMING_LOG(DEBUG) << "NotifyConsumedItem, offset=" << offset
<< ", channel_id=" << channel_info.channel_id;
channel_map_[channel_info.channel_id]->NotifyChannelConsumed(offset);
}

Expand Down
17 changes: 10 additions & 7 deletions streaming/src/data_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,21 @@ void DataWriter::Run() {
uint64_t DataWriter::WriteMessageToBufferRing(const ObjectID &q_id, uint8_t *data,
uint32_t data_size,
StreamingMessageType message_type) {
STREAMING_LOG(DEBUG) << "WriteMessageToBufferRing q_id: " << q_id
<< " data_size: " << data_size
<< ", message_type=" << static_cast<uint32_t>(message_type)
<< ", data=" << Util::Byte2hex(data, data_size);
// TODO(lingxuan.zlx): currently, unsafe in multithreads
ProducerChannelInfo &channel_info = channel_info_map_[q_id];
// Write message id stands for current lastest message id and differs from
// channel.current_message_id if it's barrier message.
uint64_t &write_message_id = channel_info.current_message_id;
write_message_id++;
if (message_type == StreamingMessageType::Message) {
write_message_id++;
}

STREAMING_LOG(DEBUG) << "WriteMessageToBufferRing q_id: " << q_id
<< " data_size: " << data_size
<< ", message_type=" << static_cast<uint32_t>(message_type)
<< ", data=" << Util::Byte2hex(data, data_size)
<< ", current_message_id=" << write_message_id;

auto &ring_buffer_ptr = channel_info.writer_ring_buffer;
while (ring_buffer_ptr->IsFull() &&
runtime_context_->GetRuntimeStatus() == RuntimeStatus::Running) {
Expand Down Expand Up @@ -345,8 +350,6 @@ bool DataWriter::CollectFromRingBuffer(ProducerChannelInfo &channel_info,
<< static_cast<uint32_t>(message_ptr->GetMessageType());
break;
}
// ClassBytesSize = DataSize + MetaDataSize
// bundle_buffer_size += message_ptr->GetDataSize();
bundle_buffer_size += message_total_size;
message_list.push_back(message_ptr);
buffer_ptr->Pop();
Expand Down
2 changes: 1 addition & 1 deletion streaming/src/ring_buffer/ring_buffer.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "ring_buffer.h"
#include "ring_buffer/ring_buffer.h"

#include "util/streaming_logging.h"

Expand Down
6 changes: 3 additions & 3 deletions streaming/src/test/message_serialization_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ TEST(StreamingSerializationTest, streaming_message_serialization_test) {
uint8_t *bytes = new uint8_t[message_length];
message->ToBytes(bytes);
StreamingMessagePtr new_message = StreamingMessage::FromBytes(bytes);
EXPECT_EQ(std::memcmp(new_message->RawData(), data, 3), 0);
EXPECT_EQ(std::memcmp(new_message->Payload(), data, 3), 0);
delete[] bytes;
}

Expand Down Expand Up @@ -81,9 +81,9 @@ TEST(StreamingSerializationTest, streaming_message_barrier_bundle_serialization_
EXPECT_TRUE(s_item->ClassBytesSize() == m_item->ClassBytesSize());
EXPECT_TRUE(s_item->GetMessageType() == m_item->GetMessageType());
EXPECT_TRUE(s_item->GetMessageId() == m_item->GetMessageId());
EXPECT_TRUE(s_item->GetDataSize() == m_item->GetDataSize());
EXPECT_TRUE(s_item->PayloadSize() == m_item->PayloadSize());
EXPECT_TRUE(
std::memcmp(s_item->RawData(), m_item->RawData(), m_item->GetDataSize()) == 0);
std::memcmp(s_item->Payload(), m_item->Payload(), m_item->PayloadSize()) == 0);
EXPECT_TRUE(*(s_item.get()) == (*(m_item.get())));

delete[] bundle_bytes;
Expand Down
28 changes: 19 additions & 9 deletions streaming/src/test/mock_actor.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#define BOOST_BIND_NO_PLACEHOLDERS
#include "common/status.h"
#include "data_reader.h"
#include "data_writer.h"
#include "gtest/gtest.h"
Expand All @@ -8,8 +9,7 @@
#include "ray/common/test_util.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/core_worker.h"
#include "ring_buffer.h"
#include "status.h"
#include "ring_buffer/ring_buffer.h"
using namespace std::placeholders;

const uint32_t MESSAGE_BOUND_SIZE = 10000;
Expand Down Expand Up @@ -126,9 +126,13 @@ class StreamingQueueWriterTestSuite : public StreamingQueueTestSuite {
for (uint32_t j = 0; j < buffer_len; ++j) {
data[j] = j % 128;
}

STREAMING_LOG(DEBUG) << "Write data to queue, count=" << i
<< ", queue_id=" << q_id;
writer_client->WriteMessageToBufferRing(q_id, data, buffer_len,
StreamingMessageType::Message);
if (i % 10 == 0) {
writer_client->BroadcastBarrier(i / 10, nullptr, 0);
}
}
++i;
}
Expand Down Expand Up @@ -159,7 +163,8 @@ class StreamingQueueReaderTestSuite : public StreamingQueueTestSuite {
for (auto &q_id : queue_id_vec) {
queue_last_cp_id[q_id] = 0;
}
STREAMING_LOG(INFO) << "Start read message bundle";
STREAMING_LOG(INFO) << "Start read message bundle, queue_id_size="
<< queue_id_vec.size();
while (true) {
std::shared_ptr<DataBundle> msg;
StreamingStatus st = reader_client->GetBundle(100, msg);
Expand All @@ -173,8 +178,13 @@ class StreamingQueueReaderTestSuite : public StreamingQueueTestSuite {
<< "read null pointer message, queue id => " << msg->from.Hex();

if (msg->meta->GetBundleType() == StreamingMessageBundleType::Barrier) {
STREAMING_LOG(DEBUG) << "barrier message recevied => "
<< msg->meta->GetMessageBundleTs();
StreamingBarrierHeader barrier_header;
StreamingMessage::GetBarrierIdFromRawData(msg->data + kMessageHeaderSize,
&barrier_header);
STREAMING_LOG(DEBUG) << "barrier message recevied, time="
<< msg->meta->GetMessageBundleTs()
<< ", barrier_id=" << barrier_header.barrier_id
<< ", data=" << Util::Byte2hex(msg->data, msg->data_size);
std::unordered_map<ray::ObjectID, ConsumerChannelInfo> *offset_map;
reader_client->GetOffsetInfo(offset_map);

Expand Down Expand Up @@ -206,12 +216,12 @@ class StreamingQueueReaderTestSuite : public StreamingQueueTestSuite {
uint32_t buff_len = i % DEFAULT_STREAMING_MESSAGE_BUFFER_SIZE;
if (i > MESSAGE_BOUND_SIZE) break;

EXPECT_EQ(buff_len, item->GetDataSize());
EXPECT_EQ(buff_len, item->PayloadSize());
uint8_t *compared_data = new uint8_t[buff_len];
for (uint32_t j = 0; j < item->GetDataSize(); ++j) {
for (uint32_t j = 0; j < item->PayloadSize(); ++j) {
compared_data[j] = j % 128;
}
EXPECT_EQ(std::memcmp(compared_data, item->RawData(), item->GetDataSize()), 0);
EXPECT_EQ(std::memcmp(compared_data, item->Payload(), item->PayloadSize()), 0);
delete[] compared_data;
}
STREAMING_LOG(DEBUG) << "Received message count => " << recevied_message_cnt;
Expand Down
8 changes: 4 additions & 4 deletions streaming/src/test/mock_transfer_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ TEST_F(StreamingTransferTest, exchange_single_channel_test) {
StreamingMessageBundlePtr bundle_ptr = StreamingMessageBundle::FromBytes(msg->data);
auto &message_list = bundle_ptr->GetMessageList();
auto &message = message_list.front();
EXPECT_EQ(std::memcmp(message->RawData(), data, data_size), 0);
EXPECT_EQ(std::memcmp(message->Payload(), data, data_size), 0);
}

TEST_F(StreamingTransferTest, exchange_multichannel_test) {
Expand All @@ -94,7 +94,7 @@ TEST_F(StreamingTransferTest, exchange_multichannel_test) {
StreamingMessageBundlePtr bundle_ptr = StreamingMessageBundle::FromBytes(msg->data);
auto &message_list = bundle_ptr->GetMessageList();
auto &message = message_list.front();
EXPECT_EQ(std::memcmp(message->RawData(), data, data_size), 0);
EXPECT_EQ(std::memcmp(message->Payload(), data, data_size), 0);
}
}

Expand Down Expand Up @@ -125,7 +125,7 @@ TEST_F(StreamingTransferTest, exchange_consumed_test) {
int index = 0;
for (auto &message : read_message_list) {
func(index++);
EXPECT_EQ(std::memcmp(message->RawData(), data.get(), data_size), 0);
EXPECT_EQ(std::memcmp(message->Payload(), data.get(), data_size), 0);
}
write_thread.join();
}
Expand Down Expand Up @@ -180,7 +180,7 @@ TEST_F(StreamingTransferTest, flow_control_test) {
int index = 0;
for (auto &message : read_message_list) {
func(index++);
EXPECT_EQ(std::memcmp(message->RawData(), data.get(), data_size), 0);
EXPECT_EQ(std::memcmp(message->Payload(), data.get(), data_size), 0);
}
write_thread.join();
}
Expand Down
10 changes: 1 addition & 9 deletions streaming/src/test/queue_tests_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
msg.ToBytes(), nullptr, std::vector<ObjectID>(), true)));
std::unordered_map<std::string, double> resources;
<<<<<<< HEAD
TaskOptions options(0, resources);
=======
TaskOptions options{"", 0, resources};
>>>>>>> 6a78ba9752dc7f17b0e4b7423898c0facf777d3d
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("", "", "init", "")};
Expand All @@ -107,7 +103,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
args.emplace_back(new TaskArgByValue(
std::make_shared<RayObject>(buffer, nullptr, std::vector<ObjectID>(), true)));
std::unordered_map<std::string, double> resources;
TaskOptions options(0, resources);
TaskOptions options("", 0, resources);
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"", test, "execute_test", "")};
Expand All @@ -123,11 +119,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
args.emplace_back(new TaskArgByValue(
std::make_shared<RayObject>(buffer, nullptr, std::vector<ObjectID>(), true)));
std::unordered_map<std::string, double> resources;
<<<<<<< HEAD
TaskOptions options(1, resources);
=======
TaskOptions options{"", 1, resources};
>>>>>>> 6a78ba9752dc7f17b0e4b7423898c0facf777d3d
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"", "", "check_current_test_status", "")};
Expand Down
10 changes: 5 additions & 5 deletions streaming/src/test/ring_buffer_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "gtest/gtest.h"
#include "message/message.h"
#include "ray/util/logging.h"
#include "ring_buffer.h"
#include "ring_buffer/ring_buffer.h"

using namespace ray;
using namespace ray::streaming;
Expand All @@ -28,8 +28,8 @@ TEST(StreamingRingBufferTest, streaming_message_ring_buffer_test) {
while (!ring_buffer.IsEmpty()) {
StreamingMessagePtr message_ptr = ring_buffer.Front();
ring_buffer.Pop();
EXPECT_EQ(message_ptr->GetDataSize(), 3);
EXPECT_EQ(*(message_ptr->RawData()), th++);
EXPECT_EQ(message_ptr->PayloadSize(), 3);
EXPECT_EQ(*(message_ptr->Payload()), th++);
}
}
}
Expand All @@ -52,7 +52,7 @@ TEST(StreamingRingBufferTest, spsc_test) {
while (ring_buffer.IsEmpty()) {
}
auto &msg = ring_buffer.Front();
EXPECT_EQ(std::memcmp(msg->RawData(), &count, sizeof(size_t)), 0);
EXPECT_EQ(std::memcmp(msg->Payload(), &count, sizeof(size_t)), 0);
ring_buffer.Pop();
count++;
}
Expand All @@ -78,7 +78,7 @@ TEST(StreamingRingBufferTest, mutex_test) {
while (ring_buffer.IsEmpty()) {
}
auto msg = ring_buffer.Front();
EXPECT_EQ(std::memcmp(msg->RawData(), &count, sizeof(size_t)), 0);
EXPECT_EQ(std::memcmp(msg->Payload(), &count, sizeof(size_t)), 0);
ring_buffer.Pop();
count++;
}
Expand Down
2 changes: 2 additions & 0 deletions streaming/src/test/run_streaming_queue_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ STREAMING_TEST_WORKER_EXEC="$RAY_ROOT/bazel-bin/streaming/streaming_test_worker"
GCS_SERVER_EXEC="$RAY_ROOT/bazel-bin/gcs_server"

# clear env
set +e
pgrep "plasma|DefaultDriver|DefaultWorker|AppStarter|redis|http_server|job_agent" | xargs kill -9 &> /dev/null
set -e

# Run tests.

Expand Down
2 changes: 1 addition & 1 deletion streaming/src/test/streaming_queue_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "queue/queue_client.h"
#include "ray/common/test_util.h"
#include "ray/core_worker/core_worker.h"
#include "ring_buffer.h"
#include "ring_buffer/ring_buffer.h"
#include "test/queue_tests_base.h"

using namespace std::placeholders;
Expand Down

0 comments on commit 2b95e71

Please sign in to comment.