Skip to content

Commit

Permalink
[Feature] Select outfile support parquet format (apache#5938)
Browse files Browse the repository at this point in the history
`Select outfile into` currently only supports to export data with CSV format.
This patch extends the feature to supports parquet format.

Usage:
LocaFile:
```
SELECT citycode FROM table1 INTO OUTFILE "file:///root/doris/" FORMAT AS PARQUET PROPERTIES 
("schema"="required,int32,siteid;", "parquet.compression"="snappy");
```

BrokerFile:
```
SELECT siteid FROM table1 INTO OUTFILE "hdfs://host/test_sql_prc_2019_02_19/" FORMAT AS PARQUET
PROPERTIES ( 
"broker.name" = "hdfs_broker",
"broker.hadoop.security.authentication" = "kerberos",
"broker.kerberos_principal" = "test",
"broker.kerberos_keytab_content" = "base64" ,
"schema"="required,int32,siteid;"
);
```

Field `schema` is required, which defines the schema of a parquet file.
Prefix `parquet.` is the parquet file properties, like compression, version, enable_dictionary.
  • Loading branch information
xinghuayu007 authored Jun 10, 2021
1 parent 4d64612 commit e245aee
Show file tree
Hide file tree
Showing 9 changed files with 621 additions and 43 deletions.
389 changes: 379 additions & 10 deletions be/src/exec/parquet_writer.cpp

Large diffs are not rendered by default.

39 changes: 33 additions & 6 deletions be/src/exec/parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,17 @@
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
#include "runtime/tuple.h"
#include "runtime/row_batch.h"
#include "exprs/expr_context.h"

namespace doris {

class ExprContext;
class FileWriter;
class RowBatch;

class ParquetOutputStream : public arrow::io::OutputStream {
public:
ParquetOutputStream(FileWriter* file_writer);
ParquetOutputStream(FileWriter* file_writer, const int64_t& written_len);
virtual ~ParquetOutputStream();

arrow::Status Write(const void* data, int64_t nbytes) override;
Expand All @@ -55,26 +56,52 @@ class ParquetOutputStream : public arrow::io::OutputStream {

bool closed() const override { return _is_closed; }

int64_t get_written_len();

void set_written_len(int64_t written_len);

private:
FileWriter* _file_writer; // not owned
int64_t _cur_pos; // current write position
int64_t _cur_pos = 0; // current write position
bool _is_closed = false;
int64_t _written_len = 0;
};

// a wrapper of parquet output stream
class ParquetWriterWrapper {
public:
ParquetWriterWrapper(FileWriter* file_writer,
const std::vector<ExprContext*>& output_expr_ctxs);
const std::vector<ExprContext*>& output_expr_ctxs,
const std::map<std::string, std::string>& properties,
const std::vector<std::vector<std::string>>& schema);
virtual ~ParquetWriterWrapper();

Status write(const RowBatch& row_batch);

Status init_parquet_writer();

Status _write_one_row(TupleRow* row);

void close();

void parse_properties(const std::map<std::string, std::string>& propertie_map);

Status parse_schema(const std::vector<std::vector<std::string>>& schema);

parquet::RowGroupWriter* get_rg_writer();

int64_t written_len();

private:
ParquetOutputStream* _outstream;
std::shared_ptr<ParquetOutputStream> _outstream;
std::shared_ptr<parquet::WriterProperties> _properties;
std::shared_ptr<parquet::schema::GroupNode> _schema;
std::unique_ptr<parquet::ParquetFileWriter> _writer;
const std::vector<ExprContext*>& _output_expr_ctxs;
std::vector<std::vector<std::string>> _str_schema;
int64_t _cur_writed_rows = 0;
parquet::RowGroupWriter* _rg_writer;
const int64_t _max_row_per_group = 10;
};

} // namespace doris
22 changes: 15 additions & 7 deletions be/src/runtime/file_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,20 @@ Status FileResultWriter::_create_file_writer(const std::string& file_name) {
_file_opts->broker_properties, file_name, 0 /*start offset*/);
}
RETURN_IF_ERROR(_file_writer->open());

switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
// just use file writer is enough
break;
case TFileFormatType::FORMAT_PARQUET:
_parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs);
_parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs,
_file_opts->file_properties, _file_opts->schema);
break;
default:
return Status::InternalError(
strings::Substitute("unsupported file format: $0", _file_opts->file_format));
}
LOG(INFO) << "create file for exporting query result. file name: " << file_name
<< ". query id: " << print_id(_state->query_id());
<< ". query id: " << print_id(_state->query_id()) << " format:" << _file_opts->file_format;
return Status::OK();
}

Expand Down Expand Up @@ -167,7 +167,7 @@ Status FileResultWriter::append_row_batch(const RowBatch* batch) {

SCOPED_TIMER(_append_row_batch_timer);
if (_parquet_writer != nullptr) {
RETURN_IF_ERROR(_parquet_writer->write(*batch));
RETURN_IF_ERROR(_write_parquet_file(*batch));
} else {
RETURN_IF_ERROR(_write_csv_file(*batch));
}
Expand All @@ -176,6 +176,13 @@ Status FileResultWriter::append_row_batch(const RowBatch* batch) {
return Status::OK();
}

Status FileResultWriter::_write_parquet_file(const RowBatch& batch) {
RETURN_IF_ERROR(_parquet_writer->write(batch));
// split file if exceed limit
RETURN_IF_ERROR(_create_new_file_if_exceed_size());
return Status::OK();
}

Status FileResultWriter::_write_csv_file(const RowBatch& batch) {
int num_rows = batch.num_rows();
for (int i = 0; i < num_rows; ++i) {
Expand Down Expand Up @@ -345,11 +352,12 @@ Status FileResultWriter::_create_new_file_if_exceed_size() {
Status FileResultWriter::_close_file_writer(bool done, bool only_close) {
if (_parquet_writer != nullptr) {
_parquet_writer->close();
_current_written_bytes = _parquet_writer->written_len();
COUNTER_UPDATE(_written_data_bytes, _current_written_bytes);
delete _parquet_writer;
_parquet_writer = nullptr;
if (!done) {
//TODO(cmy): implement parquet writer later
}
delete _file_writer;
_file_writer = nullptr;
} else if (_file_writer != nullptr) {
_file_writer->close();
delete _file_writer;
Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/file_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ struct ResultFileOptions {
std::vector<TNetworkAddress> broker_addresses;
std::map<std::string, std::string> broker_properties;
std::string success_file_name = "";
std::vector<std::vector<std::string>> schema;
std::map<std::string, std::string> file_properties;

ResultFileOptions(const TResultFileSinkOptions& t_opt) {
file_path = t_opt.file_path;
Expand All @@ -60,6 +62,12 @@ struct ResultFileOptions {
if (t_opt.__isset.success_file_name) {
success_file_name = t_opt.success_file_name;
}
if (t_opt.__isset.schema) {
schema = t_opt.schema;
}
if (t_opt.__isset.file_properties) {
file_properties = t_opt.file_properties;
}
}
};

Expand All @@ -82,6 +90,7 @@ class FileResultWriter final : public ResultWriter {

private:
Status _write_csv_file(const RowBatch& batch);
Status _write_parquet_file(const RowBatch& batch);
Status _write_one_row_as_csv(TupleRow* row);

// if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer
Expand Down
Loading

0 comments on commit e245aee

Please sign in to comment.