Skip to content

Commit

Permalink
Revert "[BugFix] json stream load failed with Transfer-Encoding: chun…
Browse files Browse the repository at this point in the history
…ked set #5330 (#5399)" (#5493)

This reverts commit 661de2f.
  • Loading branch information
rickif authored Apr 25, 2022
1 parent 3a5d1fe commit 2a5c43f
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 88 deletions.
10 changes: 10 additions & 0 deletions be/src/exec/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ class FileReader {
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) = 0;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) = 0;

/**
* This interface is used read a whole message, For example: read a message from kafka.
*
* if read eof then return Status::OK and length is set 0 and buf is set NULL,
* other return readed bytes.
*/
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* buf_cap, size_t* buf_sz,
size_t padding = 0) {
return Status::NotSupported("Not support");
};
virtual int64_t size() = 0;
virtual Status seek(int64_t position) = 0;
virtual Status tell(int64_t* position) = 0;
Expand Down
3 changes: 2 additions & 1 deletion be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_loadId(ctx->id.to_thrift());
if (ctx->use_streaming) {
auto pipe =
std::make_shared<StreamLoadPipe>(1024 * 1024 /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */);
std::make_shared<StreamLoadPipe>(1024 * 1024 /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
ctx->body_bytes /* total_length */);
RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe));
request.fileType = TFileType::FILE_STREAM;
ctx->body_sink = pipe;
Expand Down
68 changes: 36 additions & 32 deletions be/src/runtime/stream_load/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ namespace starrocks {
// Data in pip is stored in chunks.
class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024)
: _max_buffered_bytes(max_buffered_bytes), _min_chunk_size(min_chunk_size) {}
StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024,
int64_t total_length = -1)
: _max_buffered_bytes(max_buffered_bytes), _min_chunk_size(min_chunk_size), _total_length(total_length) {}
~StreamLoadPipe() override = default;

Status open() override { return Status::OK(); }
Expand Down Expand Up @@ -81,42 +82,37 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
return _append(buf);
}

/* read_one_messages returns data that is written by append in one time.
* buf: the buffer to return data, and would be expaneded if the capacity is not enough.
* buf_cap: the capacity of buffer, and would be reset if the capacity is not enough.
* buf_sz: the actual size of data to return.
* padding: the extra space reserved in the buffer capacity.
*/
Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* buf_cap, size_t* buf_sz, size_t padding) {
std::unique_lock<std::mutex> l(_lock);
while (!_cancelled && !_finished && _buf_queue.empty()) {
_get_cond.wait(l);
}
// cancelled
if (_cancelled) {
return Status::InternalError("cancelled");
}

// finished
if (_buf_queue.empty()) {
DCHECK(_finished);
// If _total_length == -1, this should be a Kafka routine load task,
// just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data.
// Otherwise, this should be a stream load task that needs to read the specified amount of data.
Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* buf_cap, size_t* buf_sz, size_t padding) override {
if (_total_length < -1) {
std::stringstream ss;
ss << "invalid, _total_length is: " << _total_length;
return Status::InternalError(ss.str());
} else if (_total_length == 0) {
// no data
*buf_sz = 0;
return Status::OK();
}
auto raw = _buf_queue.front();
auto raw_sz = raw->remaining();

if (*buf_cap < raw_sz + padding) {
buf->reset(new uint8_t[raw_sz + padding]);
*buf_cap = raw_sz + padding;
if (_total_length == -1) {
return _read_next_buffer(buf, buf_cap, buf_sz, padding);
}
raw->get_bytes((char*)(buf->get()), raw_sz);
*buf_sz = raw_sz;

_buf_queue.pop_front();
_buffered_bytes -= raw->limit;
_put_cond.notify_one();
return Status::OK();
// _total_length > 0, read the entire data

if (*buf_cap < _total_length + padding) {
buf->reset(new uint8_t[_total_length + padding]);
*buf_cap = _total_length + padding;
}
*buf_sz = _total_length;
bool eof = false;
Status st = read(buf->get(), buf_sz, &eof);
if (eof) {
*buf_sz = 0;
}
return st;
}

Status read(uint8_t* data, size_t* data_size, bool* eof) override {
Expand Down Expand Up @@ -248,6 +244,14 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
size_t _buffered_bytes{0};
size_t _max_buffered_bytes;
size_t _min_chunk_size;
// The total amount of data expected to be read.
// In some scenarios, such as loading json format data through stream load,
// the data needs to be completely read before it can be parsed,
// so the total size of the data needs to be known.
// The default is -1, which means that the data arrives in a stream
// and the length is unknown.
// size_t is unsigned, so use int64_t
int64_t _total_length = -1;
std::deque<ByteBufferPtr> _buf_queue;
std::condition_variable _put_cond;
std::condition_variable _get_cond;
Expand Down
61 changes: 6 additions & 55 deletions be/test/runtime/stream_load_pipe_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

#include <thread>

#include "testutil/parallel_test.h"
#include "util/monotime.h"

namespace starrocks {
Expand All @@ -37,7 +36,7 @@ class StreamLoadPipeTest : public testing::Test {
void SetUp() override {}
};

PARALLEL_TEST(StreamLoadPipeTest, append_buffer) {
TEST_F(StreamLoadPipeTest, append_buffer) {
StreamLoadPipe pipe(66, 64);

auto appender = [&pipe] {
Expand Down Expand Up @@ -74,7 +73,7 @@ PARALLEL_TEST(StreamLoadPipeTest, append_buffer) {
t1.join();
}

PARALLEL_TEST(StreamLoadPipeTest, append_bytes) {
TEST_F(StreamLoadPipeTest, append_bytes) {
StreamLoadPipe pipe(66, 64);

auto appender = [&pipe] {
Expand Down Expand Up @@ -104,7 +103,7 @@ PARALLEL_TEST(StreamLoadPipeTest, append_bytes) {
t1.join();
}

PARALLEL_TEST(StreamLoadPipeTest, append_bytes2) {
TEST_F(StreamLoadPipeTest, append_bytes2) {
StreamLoadPipe pipe(66, 64);

auto appender = [&pipe] {
Expand Down Expand Up @@ -143,7 +142,7 @@ PARALLEL_TEST(StreamLoadPipeTest, append_bytes2) {
t1.join();
}

PARALLEL_TEST(StreamLoadPipeTest, append_mix) {
TEST_F(StreamLoadPipeTest, append_mix) {
StreamLoadPipe pipe(66, 64);

auto appender = [&pipe] {
Expand Down Expand Up @@ -202,7 +201,7 @@ PARALLEL_TEST(StreamLoadPipeTest, append_mix) {
t1.join();
}

PARALLEL_TEST(StreamLoadPipeTest, cancel) {
TEST_F(StreamLoadPipeTest, cancel) {
StreamLoadPipe pipe(66, 64);

auto appender = [&pipe] {
Expand All @@ -224,7 +223,7 @@ PARALLEL_TEST(StreamLoadPipeTest, cancel) {
t1.join();
}

PARALLEL_TEST(StreamLoadPipeTest, close) {
TEST_F(StreamLoadPipeTest, close) {
StreamLoadPipe pipe(66, 64);

auto appender = [&pipe] {
Expand Down Expand Up @@ -260,52 +259,4 @@ PARALLEL_TEST(StreamLoadPipeTest, close) {
t1.join();
}

PARALLEL_TEST(StreamLoadPipeTest, read_one_message) {
StreamLoadPipe pipe(66, 64);

auto appender = [&pipe] {
int k = 0;
auto byte_buf = ByteBuffer::allocate(64);
char buf[64];
for (int j = 0; j < 64; ++j) {
buf[j] = '0' + (k++ % 10);
}
byte_buf->put_bytes(buf, 64);
byte_buf->flip();
// 1st append
pipe.append(byte_buf);

// 2nd append
pipe.append(buf, sizeof(buf));

pipe.finish();
};
std::thread t1(appender);

std::unique_ptr<uint8_t[]> buf;
size_t buf_cap = 0;
size_t buf_sz = 0;
// 1st message
auto st = pipe.read_one_message(&buf, &buf_cap, &buf_sz, 0);
ASSERT_TRUE(st.ok());
ASSERT_EQ(64, buf_sz);
for (int i = 0; i < buf_sz; ++i) {
ASSERT_EQ('0' + (i % 10), buf[i]);
}

// 2nd message
st = pipe.read_one_message(&buf, &buf_cap, &buf_sz, 0);
ASSERT_TRUE(st.ok());
ASSERT_EQ(64, buf_sz);
for (int i = 0; i < buf_sz; ++i) {
ASSERT_EQ('0' + (i % 10), buf[i]);
}

st = pipe.read_one_message(&buf, &buf_cap, &buf_sz, 0);
ASSERT_TRUE(st.ok());
ASSERT_EQ(0, buf_sz);

t1.join();
}

} // namespace starrocks

0 comments on commit 2a5c43f

Please sign in to comment.