Skip to content

Commit

Permalink
[feature-wip](new-scan) Support stream load with csv in new scan fram…
Browse files Browse the repository at this point in the history
…ework (apache#13354)

1. Refactor the file reader creation in FileFactory, for simplicity.
    Previously, FileFactory had too many `create_file_reader` interfaces.
    Now unified into two categories: the interface used by the previous BrokerScanNode,
    and the interface used by the new FileScanNode.
    And separate the creation methods of readers that read `StreamLoadPipe` and other readers that read files.

2. Modify the StreamLoadPlanner on FE side to support using ExternalFileScanNode

3. Now for generic reader, the file reader will be created inside the reader, not passed from the outside.

4. Add some test cases for csv stream load, the behavior is same as the old broker scanner.
  • Loading branch information
morningman authored Oct 17, 2022
1 parent c114d87 commit dbf71ed
Show file tree
Hide file tree
Showing 58 changed files with 3,666 additions and 561 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*.iml
*.swp
*.jar
*.gz
*.log
*.so.tmp
*.flattened-pom.xml
Expand Down
21 changes: 13 additions & 8 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,16 @@ Status BrokerScanner::open_file_reader() {
}
}

RETURN_IF_ERROR(FileFactory::create_file_reader(range.file_type, _state->exec_env(), _profile,
_broker_addresses, _params.properties, range,
start_offset, _cur_file_reader));
return _cur_file_reader->open();
if (range.file_type == TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(range.load_id, _cur_file_reader_s));
_real_reader = _cur_file_reader_s.get();
} else {
RETURN_IF_ERROR(FileFactory::create_file_reader(
range.file_type, _state->exec_env(), _profile, _broker_addresses,
_params.properties, range, start_offset, _cur_file_reader));
_real_reader = _cur_file_reader.get();
}
return _real_reader->open();
}

Status BrokerScanner::create_decompressor(TFileFormatType::type type) {
Expand Down Expand Up @@ -215,12 +221,11 @@ Status BrokerScanner::open_line_reader() {
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE:
_cur_line_reader =
new PlainTextLineReader(_profile, _cur_file_reader.get(), _cur_decompressor, size,
_line_delimiter, _line_delimiter_length);
_cur_line_reader = new PlainTextLineReader(_profile, _real_reader, _cur_decompressor, size,
_line_delimiter, _line_delimiter_length);
break;
case TFileFormatType::FORMAT_PROTO:
_cur_line_reader = new PlainBinaryLineReader(_cur_file_reader.get());
_cur_line_reader = new PlainBinaryLineReader(_real_reader);
break;
default: {
return Status::InternalError("Unknown format type, cannot init line reader, type={}",
Expand Down
7 changes: 6 additions & 1 deletion be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ class BrokerScanner : public BaseScanner {
int _line_delimiter_length;

// Reader
std::shared_ptr<FileReader> _cur_file_reader;
// _cur_file_reader_s is for stream load pipe reader,
// and _cur_file_reader is for other file reader.
// TODO: refactor this to use only shared_ptr or unique_ptr
std::unique_ptr<FileReader> _cur_file_reader;
std::shared_ptr<FileReader> _cur_file_reader_s;
FileReader* _real_reader;
LineReader* _cur_line_reader;
Decompressor* _cur_decompressor;
bool _cur_line_reader_eof;
Expand Down
25 changes: 16 additions & 9 deletions be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
_cur_file_reader(nullptr),
_cur_file_reader_s(nullptr),
_real_reader(nullptr),
_cur_line_reader(nullptr),
_cur_json_reader(nullptr),
_cur_reader_eof(false),
Expand All @@ -61,7 +63,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool*
SCOPED_TIMER(_read_timer);
// Get one line
while (!_scanner_eof) {
if (!_cur_file_reader || _cur_reader_eof) {
if (!_real_reader || _cur_reader_eof) {
RETURN_IF_ERROR(open_next_reader());
// If there isn't any more reader, break this
if (_scanner_eof) {
Expand Down Expand Up @@ -127,11 +129,17 @@ Status JsonScanner::open_file_reader() {
_read_json_by_line = range.read_json_by_line;
}

RETURN_IF_ERROR(FileFactory::create_file_reader(range.file_type, _state->exec_env(), _profile,
_broker_addresses, _params.properties, range,
start_offset, _cur_file_reader));
if (range.file_type == TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(range.load_id, _cur_file_reader_s));
_real_reader = _cur_file_reader_s.get();
} else {
RETURN_IF_ERROR(FileFactory::create_file_reader(
range.file_type, _state->exec_env(), _profile, _broker_addresses,
_params.properties, range, start_offset, _cur_file_reader));
_real_reader = _cur_file_reader.get();
}
_cur_reader_eof = false;
return _cur_file_reader->open();
return _real_reader->open();
}

Status JsonScanner::open_line_reader() {
Expand All @@ -148,7 +156,7 @@ Status JsonScanner::open_line_reader() {
} else {
_skip_next_line = false;
}
_cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size,
_cur_line_reader = new PlainTextLineReader(_profile, _real_reader, nullptr, size,
_line_delimiter, _line_delimiter_length);
_cur_reader_eof = false;
return Status::OK();
Expand All @@ -173,9 +181,8 @@ Status JsonScanner::open_json_reader() {
new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader);
} else {
_cur_json_reader =
new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
fuzzy_parse, &_scanner_eof, _cur_file_reader.get());
_cur_json_reader = new JsonReader(_state, _counter, _profile, strip_outer_array,
num_as_string, fuzzy_parse, &_scanner_eof, _real_reader);
}

RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root));
Expand Down
7 changes: 6 additions & 1 deletion be/src/exec/json_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ class JsonScanner : public BaseScanner {
int _line_delimiter_length;

// Reader
std::shared_ptr<FileReader> _cur_file_reader;
// _cur_file_reader_s is for stream load pipe reader,
// and _cur_file_reader is for other file reader.
// TODO: refactor this to use only shared_ptr or unique_ptr
std::unique_ptr<FileReader> _cur_file_reader;
std::shared_ptr<FileReader> _cur_file_reader_s;
FileReader* _real_reader;
LineReader* _cur_line_reader;
JsonReader* _cur_json_reader;
bool _cur_reader_eof;
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/plain_text_line_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class PlainTextLineReader : public LineReader {
RuntimeProfile* _profile;
FileReader* _file_reader;
Decompressor* _decompressor;
// the min length that should be read.
// -1 means endless(for stream load)
// and only valid if the content is uncompressed
size_t _min_length;
size_t _total_read_bytes;
std::string _line_delimiter;
Expand Down
60 changes: 35 additions & 25 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,39 +71,47 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit
TStreamLoadPutResult k_stream_load_put_result;
#endif

static TFileFormatType::type parse_format(const std::string& format_str,
const std::string& compress_type) {
static void parse_format(const std::string& format_str, const std::string& compress_type_str,
TFileFormatType::type* format_type,
TFileCompressType::type* compress_type) {
if (format_str.empty()) {
return parse_format("CSV", compress_type);
parse_format("CSV", compress_type_str, format_type, compress_type);
return;
}
TFileFormatType::type format_type = TFileFormatType::FORMAT_UNKNOWN;
*compress_type = TFileCompressType::PLAIN;
*format_type = TFileFormatType::FORMAT_UNKNOWN;
if (iequal(format_str, "CSV")) {
if (compress_type.empty()) {
format_type = TFileFormatType::FORMAT_CSV_PLAIN;
}
if (iequal(compress_type, "GZ")) {
format_type = TFileFormatType::FORMAT_CSV_GZ;
} else if (iequal(compress_type, "LZO")) {
format_type = TFileFormatType::FORMAT_CSV_LZO;
} else if (iequal(compress_type, "BZ2")) {
format_type = TFileFormatType::FORMAT_CSV_BZ2;
} else if (iequal(compress_type, "LZ4FRAME")) {
format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
} else if (iequal(compress_type, "LZOP")) {
format_type = TFileFormatType::FORMAT_CSV_LZOP;
} else if (iequal(compress_type, "DEFLATE")) {
format_type = TFileFormatType::FORMAT_CSV_DEFLATE;
if (compress_type_str.empty()) {
*format_type = TFileFormatType::FORMAT_CSV_PLAIN;
} else if (iequal(compress_type_str, "GZ")) {
*format_type = TFileFormatType::FORMAT_CSV_GZ;
*compress_type = TFileCompressType::GZ;
} else if (iequal(compress_type_str, "LZO")) {
*format_type = TFileFormatType::FORMAT_CSV_LZO;
*compress_type = TFileCompressType::LZO;
} else if (iequal(compress_type_str, "BZ2")) {
*format_type = TFileFormatType::FORMAT_CSV_BZ2;
*compress_type = TFileCompressType::BZ2;
} else if (iequal(compress_type_str, "LZ4")) {
*format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
*compress_type = TFileCompressType::LZ4FRAME;
} else if (iequal(compress_type_str, "LZOP")) {
*format_type = TFileFormatType::FORMAT_CSV_LZOP;
*compress_type = TFileCompressType::LZO;
} else if (iequal(compress_type_str, "DEFLATE")) {
*format_type = TFileFormatType::FORMAT_CSV_DEFLATE;
*compress_type = TFileCompressType::DEFLATE;
}
} else if (iequal(format_str, "JSON")) {
if (compress_type.empty()) {
format_type = TFileFormatType::FORMAT_JSON;
if (compress_type_str.empty()) {
*format_type = TFileFormatType::FORMAT_JSON;
}
} else if (iequal(format_str, "PARQUET")) {
format_type = TFileFormatType::FORMAT_PARQUET;
*format_type = TFileFormatType::FORMAT_PARQUET;
} else if (iequal(format_str, "ORC")) {
format_type = TFileFormatType::FORMAT_ORC;
*format_type = TFileFormatType::FORMAT_ORC;
}
return format_type;
return;
}

static bool is_format_support_streaming(TFileFormatType::type format) {
Expand Down Expand Up @@ -275,7 +283,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
//treat as CSV
format_str = BeConsts::CSV;
}
ctx->format = parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE));
parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE), &ctx->format,
&ctx->compress_type);
if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
return Status::InternalError("unknown data format, format={}",
http_req->header(HTTP_FORMAT_KEY));
Expand Down Expand Up @@ -387,6 +396,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.tbl = ctx->table;
request.txnId = ctx->txn_id;
request.formatType = ctx->format;
request.__set_compress_type(ctx->compress_type);
request.__set_header_type(ctx->header_type);
request.__set_loadId(ctx->id.to_thrift());
if (ctx->use_streaming) {
Expand Down
Loading

0 comments on commit dbf71ed

Please sign in to comment.