diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java index 6115e4d50e88..87124afdbd2d 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java @@ -67,7 +67,8 @@ public JobMaster(Map confMap) { runtimeContext = new JobMasterRuntimeContext(streamingConfig); // load checkpoint if is recover - if (Ray.getRuntimeContext().wasCurrentActorRestarted()) { + if (!Ray.getRuntimeContext().isSingleProcess() && Ray.getRuntimeContext() + .wasCurrentActorRestarted()) { loadMasterCheckpoint(); } diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/JobMasterTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/JobMasterTest.java index 76658e1ea671..3951911bb364 100644 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/JobMasterTest.java +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/JobMasterTest.java @@ -1,10 +1,25 @@ package io.ray.streaming.runtime.master; +import io.ray.api.Ray; +import io.ray.streaming.runtime.BaseUnitTest; import java.util.HashMap; import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -public class JobMasterTest { +public class JobMasterTest extends BaseUnitTest { + + @BeforeMethod + public void init() { + // ray init + Ray.init(); + } + + @AfterMethod + public void tearDown() { + Ray.shutdown(); + } @Test public void testCreation() { diff --git a/streaming/src/data_reader.cc b/streaming/src/data_reader.cc index cf723820147b..2d481fabf7de 100644 --- a/streaming/src/data_reader.cc +++ b/streaming/src/data_reader.cc @@ -159,6 +159,9 @@ StreamingStatus DataReader::GetMessageFromChannel(ConsumerChannelInfo &channel_i channel_map_[channel_info.channel_id]->ConsumeItemFromChannel( message->data, message->data_size, wait_time_ms); + STREAMING_LOG(DEBUG) << "ConsumeItemFromChannel done, bytes=" + << Util::Byte2hex(message->data, message->data_size); + channel_info.get_queue_item_times++; if (!message->data) { RETURN_IF_NOT_OK(reliability_helper_->HandleNoValidItem(channel_info)); @@ -265,8 +268,9 @@ StreamingStatus DataReader::StashNextMessageAndPop(std::shared_ptr & // Get the first message. message = reader_merger_->top(); - STREAMING_LOG(DEBUG) << "Messages to be poped=" << *message - << ", merger size=" << reader_merger_->size(); + STREAMING_LOG(DEBUG) << "Messages to be popped=" << *message + << ", merger size=" << reader_merger_->size() + << ", bytes=" << Util::Byte2hex(message->data, message->data_size); // Then stash next message from its from queue. std::shared_ptr new_msg = std::make_shared(); @@ -275,11 +279,23 @@ StreamingStatus DataReader::StashNextMessageAndPop(std::shared_ptr & new_msg->last_barrier_id = channel_info.barrier_id; reader_merger_->push(new_msg); STREAMING_LOG(DEBUG) << "New message pushed=" << *new_msg - << ", merger size=" << reader_merger_->size(); + << ", merger size=" << reader_merger_->size() + << ", bytes=" << Util::Byte2hex(new_msg->data, new_msg->data_size); + // Barrier's message ID is equal to last message's ID. + // We will mark last message's ID as consumed in GetBundle. + // So barrier might be erased by streaming queue. We make a hack way here to + // copy barrier's data from streaming queue. TODO: There should be a more elegant way to + // do this. + if (new_msg->meta->IsBarrier()) { + uint8_t *origin_data = new_msg->data; + new_msg->Realloc(new_msg->data_size); + memcpy(new_msg->data, origin_data, new_msg->data_size); + } // Pop message. reader_merger_->pop(); - STREAMING_LOG(DEBUG) << "Message poped, msg=" << *message; + STREAMING_LOG(DEBUG) << "Message popped, msg=" << *message + << ", bytes=" << Util::Byte2hex(message->data, message->data_size); // Record some metrics. channel_info.last_queue_item_delay = @@ -397,6 +413,7 @@ StreamingStatus DataReader::GetBundle(const uint32_t timeout_ms, STREAMING_LOG(INFO) << "merger vector item=" << bundle->from; } } + RETURN_IF_NOT_OK(GetMergedMessageBundle(message, is_valid_break, timeout_ms)); if (!is_valid_break) { empty_bundle_cnt++; @@ -420,6 +437,8 @@ void DataReader::GetOffsetInfo( } void DataReader::NotifyConsumedItem(ConsumerChannelInfo &channel_info, uint64_t offset) { + STREAMING_LOG(DEBUG) << "NotifyConsumedItem, offset=" << offset + << ", channel_id=" << channel_info.channel_id; channel_map_[channel_info.channel_id]->NotifyChannelConsumed(offset); } diff --git a/streaming/src/data_writer.cc b/streaming/src/data_writer.cc index e28ba26eb391..ff0496ccbf30 100644 --- a/streaming/src/data_writer.cc +++ b/streaming/src/data_writer.cc @@ -62,16 +62,21 @@ void DataWriter::Run() { uint64_t DataWriter::WriteMessageToBufferRing(const ObjectID &q_id, uint8_t *data, uint32_t data_size, StreamingMessageType message_type) { - STREAMING_LOG(DEBUG) << "WriteMessageToBufferRing q_id: " << q_id - << " data_size: " << data_size - << ", message_type=" << static_cast(message_type) - << ", data=" << Util::Byte2hex(data, data_size); // TODO(lingxuan.zlx): currently, unsafe in multithreads ProducerChannelInfo &channel_info = channel_info_map_[q_id]; // Write message id stands for current lastest message id and differs from // channel.current_message_id if it's barrier message. uint64_t &write_message_id = channel_info.current_message_id; - write_message_id++; + if (message_type == StreamingMessageType::Message) { + write_message_id++; + } + + STREAMING_LOG(DEBUG) << "WriteMessageToBufferRing q_id: " << q_id + << " data_size: " << data_size + << ", message_type=" << static_cast(message_type) + << ", data=" << Util::Byte2hex(data, data_size) + << ", current_message_id=" << write_message_id; + auto &ring_buffer_ptr = channel_info.writer_ring_buffer; while (ring_buffer_ptr->IsFull() && runtime_context_->GetRuntimeStatus() == RuntimeStatus::Running) { @@ -345,8 +350,6 @@ bool DataWriter::CollectFromRingBuffer(ProducerChannelInfo &channel_info, << static_cast(message_ptr->GetMessageType()); break; } - // ClassBytesSize = DataSize + MetaDataSize - // bundle_buffer_size += message_ptr->GetDataSize(); bundle_buffer_size += message_total_size; message_list.push_back(message_ptr); buffer_ptr->Pop(); diff --git a/streaming/src/ring_buffer/ring_buffer.cc b/streaming/src/ring_buffer/ring_buffer.cc index 6012f4a058e8..b9c1d2139d46 100644 --- a/streaming/src/ring_buffer/ring_buffer.cc +++ b/streaming/src/ring_buffer/ring_buffer.cc @@ -1,4 +1,4 @@ -#include "ring_buffer.h" +#include "ring_buffer/ring_buffer.h" #include "util/streaming_logging.h" diff --git a/streaming/src/test/message_serialization_tests.cc b/streaming/src/test/message_serialization_tests.cc index 14dfc0232df8..f38fc223cc58 100644 --- a/streaming/src/test/message_serialization_tests.cc +++ b/streaming/src/test/message_serialization_tests.cc @@ -16,7 +16,7 @@ TEST(StreamingSerializationTest, streaming_message_serialization_test) { uint8_t *bytes = new uint8_t[message_length]; message->ToBytes(bytes); StreamingMessagePtr new_message = StreamingMessage::FromBytes(bytes); - EXPECT_EQ(std::memcmp(new_message->RawData(), data, 3), 0); + EXPECT_EQ(std::memcmp(new_message->Payload(), data, 3), 0); delete[] bytes; } @@ -81,9 +81,9 @@ TEST(StreamingSerializationTest, streaming_message_barrier_bundle_serialization_ EXPECT_TRUE(s_item->ClassBytesSize() == m_item->ClassBytesSize()); EXPECT_TRUE(s_item->GetMessageType() == m_item->GetMessageType()); EXPECT_TRUE(s_item->GetMessageId() == m_item->GetMessageId()); - EXPECT_TRUE(s_item->GetDataSize() == m_item->GetDataSize()); + EXPECT_TRUE(s_item->PayloadSize() == m_item->PayloadSize()); EXPECT_TRUE( - std::memcmp(s_item->RawData(), m_item->RawData(), m_item->GetDataSize()) == 0); + std::memcmp(s_item->Payload(), m_item->Payload(), m_item->PayloadSize()) == 0); EXPECT_TRUE(*(s_item.get()) == (*(m_item.get()))); delete[] bundle_bytes; diff --git a/streaming/src/test/mock_actor.cc b/streaming/src/test/mock_actor.cc index 4f15fafd3699..3a7646758e26 100644 --- a/streaming/src/test/mock_actor.cc +++ b/streaming/src/test/mock_actor.cc @@ -1,4 +1,5 @@ #define BOOST_BIND_NO_PLACEHOLDERS +#include "common/status.h" #include "data_reader.h" #include "data_writer.h" #include "gtest/gtest.h" @@ -8,8 +9,7 @@ #include "ray/common/test_util.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" -#include "ring_buffer.h" -#include "status.h" +#include "ring_buffer/ring_buffer.h" using namespace std::placeholders; const uint32_t MESSAGE_BOUND_SIZE = 10000; @@ -126,9 +126,13 @@ class StreamingQueueWriterTestSuite : public StreamingQueueTestSuite { for (uint32_t j = 0; j < buffer_len; ++j) { data[j] = j % 128; } - + STREAMING_LOG(DEBUG) << "Write data to queue, count=" << i + << ", queue_id=" << q_id; writer_client->WriteMessageToBufferRing(q_id, data, buffer_len, StreamingMessageType::Message); + if (i % 10 == 0) { + writer_client->BroadcastBarrier(i / 10, nullptr, 0); + } } ++i; } @@ -159,7 +163,8 @@ class StreamingQueueReaderTestSuite : public StreamingQueueTestSuite { for (auto &q_id : queue_id_vec) { queue_last_cp_id[q_id] = 0; } - STREAMING_LOG(INFO) << "Start read message bundle"; + STREAMING_LOG(INFO) << "Start read message bundle, queue_id_size=" + << queue_id_vec.size(); while (true) { std::shared_ptr msg; StreamingStatus st = reader_client->GetBundle(100, msg); @@ -173,8 +178,13 @@ class StreamingQueueReaderTestSuite : public StreamingQueueTestSuite { << "read null pointer message, queue id => " << msg->from.Hex(); if (msg->meta->GetBundleType() == StreamingMessageBundleType::Barrier) { - STREAMING_LOG(DEBUG) << "barrier message recevied => " - << msg->meta->GetMessageBundleTs(); + StreamingBarrierHeader barrier_header; + StreamingMessage::GetBarrierIdFromRawData(msg->data + kMessageHeaderSize, + &barrier_header); + STREAMING_LOG(DEBUG) << "barrier message recevied, time=" + << msg->meta->GetMessageBundleTs() + << ", barrier_id=" << barrier_header.barrier_id + << ", data=" << Util::Byte2hex(msg->data, msg->data_size); std::unordered_map *offset_map; reader_client->GetOffsetInfo(offset_map); @@ -206,12 +216,12 @@ class StreamingQueueReaderTestSuite : public StreamingQueueTestSuite { uint32_t buff_len = i % DEFAULT_STREAMING_MESSAGE_BUFFER_SIZE; if (i > MESSAGE_BOUND_SIZE) break; - EXPECT_EQ(buff_len, item->GetDataSize()); + EXPECT_EQ(buff_len, item->PayloadSize()); uint8_t *compared_data = new uint8_t[buff_len]; - for (uint32_t j = 0; j < item->GetDataSize(); ++j) { + for (uint32_t j = 0; j < item->PayloadSize(); ++j) { compared_data[j] = j % 128; } - EXPECT_EQ(std::memcmp(compared_data, item->RawData(), item->GetDataSize()), 0); + EXPECT_EQ(std::memcmp(compared_data, item->Payload(), item->PayloadSize()), 0); delete[] compared_data; } STREAMING_LOG(DEBUG) << "Received message count => " << recevied_message_cnt; diff --git a/streaming/src/test/mock_transfer_tests.cc b/streaming/src/test/mock_transfer_tests.cc index 62085ac7366a..da0f232bc053 100644 --- a/streaming/src/test/mock_transfer_tests.cc +++ b/streaming/src/test/mock_transfer_tests.cc @@ -77,7 +77,7 @@ TEST_F(StreamingTransferTest, exchange_single_channel_test) { StreamingMessageBundlePtr bundle_ptr = StreamingMessageBundle::FromBytes(msg->data); auto &message_list = bundle_ptr->GetMessageList(); auto &message = message_list.front(); - EXPECT_EQ(std::memcmp(message->RawData(), data, data_size), 0); + EXPECT_EQ(std::memcmp(message->Payload(), data, data_size), 0); } TEST_F(StreamingTransferTest, exchange_multichannel_test) { @@ -94,7 +94,7 @@ TEST_F(StreamingTransferTest, exchange_multichannel_test) { StreamingMessageBundlePtr bundle_ptr = StreamingMessageBundle::FromBytes(msg->data); auto &message_list = bundle_ptr->GetMessageList(); auto &message = message_list.front(); - EXPECT_EQ(std::memcmp(message->RawData(), data, data_size), 0); + EXPECT_EQ(std::memcmp(message->Payload(), data, data_size), 0); } } @@ -125,7 +125,7 @@ TEST_F(StreamingTransferTest, exchange_consumed_test) { int index = 0; for (auto &message : read_message_list) { func(index++); - EXPECT_EQ(std::memcmp(message->RawData(), data.get(), data_size), 0); + EXPECT_EQ(std::memcmp(message->Payload(), data.get(), data_size), 0); } write_thread.join(); } @@ -180,7 +180,7 @@ TEST_F(StreamingTransferTest, flow_control_test) { int index = 0; for (auto &message : read_message_list) { func(index++); - EXPECT_EQ(std::memcmp(message->RawData(), data.get(), data_size), 0); + EXPECT_EQ(std::memcmp(message->Payload(), data.get(), data_size), 0); } write_thread.join(); } diff --git a/streaming/src/test/queue_tests_base.h b/streaming/src/test/queue_tests_base.h index 28be67981ddb..cb168e078c95 100644 --- a/streaming/src/test/queue_tests_base.h +++ b/streaming/src/test/queue_tests_base.h @@ -87,11 +87,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { args.emplace_back(new TaskArgByValue(std::make_shared( msg.ToBytes(), nullptr, std::vector(), true))); std::unordered_map resources; -<<<<<<< HEAD - TaskOptions options(0, resources); -======= TaskOptions options{"", 0, resources}; ->>>>>>> 6a78ba9752dc7f17b0e4b7423898c0facf777d3d std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython("", "", "init", "")}; @@ -107,7 +103,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { args.emplace_back(new TaskArgByValue( std::make_shared(buffer, nullptr, std::vector(), true))); std::unordered_map resources; - TaskOptions options(0, resources); + TaskOptions options("", 0, resources); std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "", test, "execute_test", "")}; @@ -123,11 +119,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { args.emplace_back(new TaskArgByValue( std::make_shared(buffer, nullptr, std::vector(), true))); std::unordered_map resources; -<<<<<<< HEAD - TaskOptions options(1, resources); -======= TaskOptions options{"", 1, resources}; ->>>>>>> 6a78ba9752dc7f17b0e4b7423898c0facf777d3d std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "", "", "check_current_test_status", "")}; diff --git a/streaming/src/test/ring_buffer_tests.cc b/streaming/src/test/ring_buffer_tests.cc index 833eb149cd80..1a3b05c319b6 100644 --- a/streaming/src/test/ring_buffer_tests.cc +++ b/streaming/src/test/ring_buffer_tests.cc @@ -5,7 +5,7 @@ #include "gtest/gtest.h" #include "message/message.h" #include "ray/util/logging.h" -#include "ring_buffer.h" +#include "ring_buffer/ring_buffer.h" using namespace ray; using namespace ray::streaming; @@ -28,8 +28,8 @@ TEST(StreamingRingBufferTest, streaming_message_ring_buffer_test) { while (!ring_buffer.IsEmpty()) { StreamingMessagePtr message_ptr = ring_buffer.Front(); ring_buffer.Pop(); - EXPECT_EQ(message_ptr->GetDataSize(), 3); - EXPECT_EQ(*(message_ptr->RawData()), th++); + EXPECT_EQ(message_ptr->PayloadSize(), 3); + EXPECT_EQ(*(message_ptr->Payload()), th++); } } } @@ -52,7 +52,7 @@ TEST(StreamingRingBufferTest, spsc_test) { while (ring_buffer.IsEmpty()) { } auto &msg = ring_buffer.Front(); - EXPECT_EQ(std::memcmp(msg->RawData(), &count, sizeof(size_t)), 0); + EXPECT_EQ(std::memcmp(msg->Payload(), &count, sizeof(size_t)), 0); ring_buffer.Pop(); count++; } @@ -78,7 +78,7 @@ TEST(StreamingRingBufferTest, mutex_test) { while (ring_buffer.IsEmpty()) { } auto msg = ring_buffer.Front(); - EXPECT_EQ(std::memcmp(msg->RawData(), &count, sizeof(size_t)), 0); + EXPECT_EQ(std::memcmp(msg->Payload(), &count, sizeof(size_t)), 0); ring_buffer.Pop(); count++; } diff --git a/streaming/src/test/run_streaming_queue_test.sh b/streaming/src/test/run_streaming_queue_test.sh index 752c95b2b629..52a0a18b7f35 100755 --- a/streaming/src/test/run_streaming_queue_test.sh +++ b/streaming/src/test/run_streaming_queue_test.sh @@ -53,7 +53,9 @@ STREAMING_TEST_WORKER_EXEC="$RAY_ROOT/bazel-bin/streaming/streaming_test_worker" GCS_SERVER_EXEC="$RAY_ROOT/bazel-bin/gcs_server" # clear env +set +e pgrep "plasma|DefaultDriver|DefaultWorker|AppStarter|redis|http_server|job_agent" | xargs kill -9 &> /dev/null +set -e # Run tests. diff --git a/streaming/src/test/streaming_queue_tests.cc b/streaming/src/test/streaming_queue_tests.cc index f45e2a45cdcd..a8ab4007f6f3 100644 --- a/streaming/src/test/streaming_queue_tests.cc +++ b/streaming/src/test/streaming_queue_tests.cc @@ -7,7 +7,7 @@ #include "queue/queue_client.h" #include "ray/common/test_util.h" #include "ray/core_worker/core_worker.h" -#include "ring_buffer.h" +#include "ring_buffer/ring_buffer.h" #include "test/queue_tests_base.h" using namespace std::placeholders;