diff --git a/be/src/exec/file_reader.h b/be/src/exec/file_reader.h index 21a541056b87a9..3c3f22e391a603 100644 --- a/be/src/exec/file_reader.h +++ b/be/src/exec/file_reader.h @@ -40,6 +40,16 @@ class FileReader { virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) = 0; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) = 0; + /** + * This interface is used read a whole message, For example: read a message from kafka. + * + * if read eof then return Status::OK and length is set 0 and buf is set NULL, + * other return readed bytes. + */ + virtual Status read_one_message(std::unique_ptr* buf, size_t* buf_cap, size_t* buf_sz, + size_t padding = 0) { + return Status::NotSupported("Not support"); + }; virtual int64_t size() = 0; virtual Status seek(int64_t position) = 0; virtual Status tell(int64_t* position) = 0; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 42713711d7e374..9b36ae8c6ce084 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -349,7 +349,8 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.__set_loadId(ctx->id.to_thrift()); if (ctx->use_streaming) { auto pipe = - std::make_shared(1024 * 1024 /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */); + std::make_shared(1024 * 1024 /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, + ctx->body_bytes /* total_length */); RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe)); request.fileType = TFileType::FILE_STREAM; ctx->body_sink = pipe; diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index 49346fdd93fa3f..a763cfbb8fa981 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -36,8 +36,9 @@ namespace starrocks { // Data in pip is stored in chunks. class StreamLoadPipe : public MessageBodySink, public FileReader { public: - StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024) - : _max_buffered_bytes(max_buffered_bytes), _min_chunk_size(min_chunk_size) {} + StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024, + int64_t total_length = -1) + : _max_buffered_bytes(max_buffered_bytes), _min_chunk_size(min_chunk_size), _total_length(total_length) {} ~StreamLoadPipe() override = default; Status open() override { return Status::OK(); } @@ -81,42 +82,37 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { return _append(buf); } - /* read_one_messages returns data that is written by append in one time. - * buf: the buffer to return data, and would be expaneded if the capacity is not enough. - * buf_cap: the capacity of buffer, and would be reset if the capacity is not enough. - * buf_sz: the actual size of data to return. - * padding: the extra space reserved in the buffer capacity. - */ - Status read_one_message(std::unique_ptr* buf, size_t* buf_cap, size_t* buf_sz, size_t padding) { - std::unique_lock l(_lock); - while (!_cancelled && !_finished && _buf_queue.empty()) { - _get_cond.wait(l); - } - // cancelled - if (_cancelled) { - return Status::InternalError("cancelled"); - } - - // finished - if (_buf_queue.empty()) { - DCHECK(_finished); + // If _total_length == -1, this should be a Kafka routine load task, + // just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data. + // Otherwise, this should be a stream load task that needs to read the specified amount of data. + Status read_one_message(std::unique_ptr* buf, size_t* buf_cap, size_t* buf_sz, size_t padding) override { + if (_total_length < -1) { + std::stringstream ss; + ss << "invalid, _total_length is: " << _total_length; + return Status::InternalError(ss.str()); + } else if (_total_length == 0) { + // no data *buf_sz = 0; return Status::OK(); } - auto raw = _buf_queue.front(); - auto raw_sz = raw->remaining(); - if (*buf_cap < raw_sz + padding) { - buf->reset(new uint8_t[raw_sz + padding]); - *buf_cap = raw_sz + padding; + if (_total_length == -1) { + return _read_next_buffer(buf, buf_cap, buf_sz, padding); } - raw->get_bytes((char*)(buf->get()), raw_sz); - *buf_sz = raw_sz; - _buf_queue.pop_front(); - _buffered_bytes -= raw->limit; - _put_cond.notify_one(); - return Status::OK(); + // _total_length > 0, read the entire data + + if (*buf_cap < _total_length + padding) { + buf->reset(new uint8_t[_total_length + padding]); + *buf_cap = _total_length + padding; + } + *buf_sz = _total_length; + bool eof = false; + Status st = read(buf->get(), buf_sz, &eof); + if (eof) { + *buf_sz = 0; + } + return st; } Status read(uint8_t* data, size_t* data_size, bool* eof) override { @@ -248,6 +244,14 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { size_t _buffered_bytes{0}; size_t _max_buffered_bytes; size_t _min_chunk_size; + // The total amount of data expected to be read. + // In some scenarios, such as loading json format data through stream load, + // the data needs to be completely read before it can be parsed, + // so the total size of the data needs to be known. + // The default is -1, which means that the data arrives in a stream + // and the length is unknown. + // size_t is unsigned, so use int64_t + int64_t _total_length = -1; std::deque _buf_queue; std::condition_variable _put_cond; std::condition_variable _get_cond; diff --git a/be/test/runtime/stream_load_pipe_test.cpp b/be/test/runtime/stream_load_pipe_test.cpp index 048cb93e832753..813e0b5e740c31 100644 --- a/be/test/runtime/stream_load_pipe_test.cpp +++ b/be/test/runtime/stream_load_pipe_test.cpp @@ -25,7 +25,6 @@ #include -#include "testutil/parallel_test.h" #include "util/monotime.h" namespace starrocks { @@ -37,7 +36,7 @@ class StreamLoadPipeTest : public testing::Test { void SetUp() override {} }; -PARALLEL_TEST(StreamLoadPipeTest, append_buffer) { +TEST_F(StreamLoadPipeTest, append_buffer) { StreamLoadPipe pipe(66, 64); auto appender = [&pipe] { @@ -74,7 +73,7 @@ PARALLEL_TEST(StreamLoadPipeTest, append_buffer) { t1.join(); } -PARALLEL_TEST(StreamLoadPipeTest, append_bytes) { +TEST_F(StreamLoadPipeTest, append_bytes) { StreamLoadPipe pipe(66, 64); auto appender = [&pipe] { @@ -104,7 +103,7 @@ PARALLEL_TEST(StreamLoadPipeTest, append_bytes) { t1.join(); } -PARALLEL_TEST(StreamLoadPipeTest, append_bytes2) { +TEST_F(StreamLoadPipeTest, append_bytes2) { StreamLoadPipe pipe(66, 64); auto appender = [&pipe] { @@ -143,7 +142,7 @@ PARALLEL_TEST(StreamLoadPipeTest, append_bytes2) { t1.join(); } -PARALLEL_TEST(StreamLoadPipeTest, append_mix) { +TEST_F(StreamLoadPipeTest, append_mix) { StreamLoadPipe pipe(66, 64); auto appender = [&pipe] { @@ -202,7 +201,7 @@ PARALLEL_TEST(StreamLoadPipeTest, append_mix) { t1.join(); } -PARALLEL_TEST(StreamLoadPipeTest, cancel) { +TEST_F(StreamLoadPipeTest, cancel) { StreamLoadPipe pipe(66, 64); auto appender = [&pipe] { @@ -224,7 +223,7 @@ PARALLEL_TEST(StreamLoadPipeTest, cancel) { t1.join(); } -PARALLEL_TEST(StreamLoadPipeTest, close) { +TEST_F(StreamLoadPipeTest, close) { StreamLoadPipe pipe(66, 64); auto appender = [&pipe] { @@ -260,52 +259,4 @@ PARALLEL_TEST(StreamLoadPipeTest, close) { t1.join(); } -PARALLEL_TEST(StreamLoadPipeTest, read_one_message) { - StreamLoadPipe pipe(66, 64); - - auto appender = [&pipe] { - int k = 0; - auto byte_buf = ByteBuffer::allocate(64); - char buf[64]; - for (int j = 0; j < 64; ++j) { - buf[j] = '0' + (k++ % 10); - } - byte_buf->put_bytes(buf, 64); - byte_buf->flip(); - // 1st append - pipe.append(byte_buf); - - // 2nd append - pipe.append(buf, sizeof(buf)); - - pipe.finish(); - }; - std::thread t1(appender); - - std::unique_ptr buf; - size_t buf_cap = 0; - size_t buf_sz = 0; - // 1st message - auto st = pipe.read_one_message(&buf, &buf_cap, &buf_sz, 0); - ASSERT_TRUE(st.ok()); - ASSERT_EQ(64, buf_sz); - for (int i = 0; i < buf_sz; ++i) { - ASSERT_EQ('0' + (i % 10), buf[i]); - } - - // 2nd message - st = pipe.read_one_message(&buf, &buf_cap, &buf_sz, 0); - ASSERT_TRUE(st.ok()); - ASSERT_EQ(64, buf_sz); - for (int i = 0; i < buf_sz; ++i) { - ASSERT_EQ('0' + (i % 10), buf[i]); - } - - st = pipe.read_one_message(&buf, &buf_cap, &buf_sz, 0); - ASSERT_TRUE(st.ok()); - ASSERT_EQ(0, buf_sz); - - t1.join(); -} - } // namespace starrocks