Skip to content

Commit

Permalink
[Feature] Support select *** into outfile without broker
Browse files Browse the repository at this point in the history
Signed-off-by: xyz <a997647204@gmail.com>
  • Loading branch information
xiaoyong-z committed Aug 3, 2022
1 parent 7b005f8 commit 87927b4
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 29 deletions.
9 changes: 7 additions & 2 deletions be/src/fs/fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace starrocks {
class RandomAccessFile;
class WritableFile;
class SequentialFile;
class ResultFileOptions;
struct WritableFileOptions;
struct RandomAccessFileOptions;

Expand All @@ -37,10 +38,14 @@ struct SpaceInfo {
};

struct FSOptions {
FSOptions(const TBrokerScanRangeParams* scan_range_params = nullptr, const TExportSink* export_sink = nullptr)
: scan_range_params(scan_range_params), export_sink(export_sink) {}
FSOptions(const TBrokerScanRangeParams* scan_range_params = nullptr, const TExportSink* export_sink = nullptr,
const ResultFileOptions* result_file_options = nullptr)
: scan_range_params(scan_range_params),
export_sink(export_sink),
result_file_options(result_file_options) {}
const TBrokerScanRangeParams* scan_range_params;
const TExportSink* export_sink;
const ResultFileOptions* result_file_options;
};

class FileSystem {
Expand Down
5 changes: 5 additions & 0 deletions be/src/fs/fs_hdfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <atomic>

#include "gutil/strings/substitute.h"
#include "runtime/file_result_writer.h"
#include "runtime/hdfs/hdfs_fs_cache.h"
#include "udf/java/utils.h"
#include "util/hdfs_util.h"
Expand Down Expand Up @@ -327,6 +328,10 @@ StatusOr<std::unique_ptr<WritableFile>> HdfsFileSystem::new_writable_file(const
flags |= O_CREAT;

int hdfs_write_buffer_size = 0;
// result_file_options and export_sink can't both to be non-nullptr at the same time
if (_options.result_file_options != nullptr) {
hdfs_write_buffer_size = _options.result_file_options->write_buffer_size_kb;
}
if (_options.export_sink != nullptr && _options.export_sink->__isset.hdfs_write_buffer_size_kb) {
hdfs_write_buffer_size = _options.export_sink->hdfs_write_buffer_size_kb;
}
Expand Down
39 changes: 28 additions & 11 deletions be/src/runtime/file_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "formats/csv/converter.h"
#include "formats/csv/output_stream.h"
#include "fs/fs_broker.h"
#include "fs/fs_posix.h"
#include "gen_cpp/InternalService_types.h"
#include "runtime/runtime_state.h"
#include "util/date_func.h"
Expand All @@ -38,16 +39,7 @@ namespace starrocks {

FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts,
const std::vector<ExprContext*>& output_expr_ctxs, RuntimeProfile* parent_profile)
: _file_opts(file_opts), _output_expr_ctxs(output_expr_ctxs), _parent_profile(parent_profile) {
if (_file_opts->is_local_file) {
_fs = FileSystem::Default();
} else {
// TODO(@c1oudman) Do you only need first element of broker addresses?
_fs = new BrokerFileSystem(*_file_opts->broker_addresses.begin(), _file_opts->broker_properties,
config::broker_write_timeout_seconds * 1000);
_owned_fs.reset(_fs);
}
}
: _file_opts(file_opts), _output_expr_ctxs(output_expr_ctxs), _parent_profile(parent_profile) {}

FileResultWriter::~FileResultWriter() {
_close_file_writer(true);
Expand All @@ -57,7 +49,6 @@ Status FileResultWriter::init(RuntimeState* state) {
_state = state;
_init_profile();

RETURN_IF_ERROR(_create_file_writer());
return Status::OK();
}

Expand All @@ -74,6 +65,24 @@ void FileResultWriter::_init_profile() {
Status FileResultWriter::_create_file_writer() {
std::string file_name = _get_next_file_name();
WritableFileOptions opts{.sync_on_close = false, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE};

if (_fs == nullptr) {
if (_file_opts->is_local_file) {
_fs = new_fs_posix();
} else {
if (_file_opts->use_broker) {
_fs.reset(new BrokerFileSystem(*_file_opts->broker_addresses.begin(), _file_opts->broker_properties,
config::broker_write_timeout_seconds * 1000));
} else {
ASSIGN_OR_RETURN(_fs, FileSystem::CreateUniqueFromString(_file_opts->file_path,
FSOptions(nullptr, nullptr, _file_opts)));
}
}
}
if (_fs == nullptr) {
return Status::InternalError(
strings::Substitute("file system initialize failed for file $0", _file_opts->file_path));
}
ASSIGN_OR_RETURN(auto writable_file, _fs->new_writable_file(opts, file_name));

switch (_file_opts->file_format) {
Expand All @@ -93,6 +102,14 @@ Status FileResultWriter::_create_file_writer() {
return Status::OK();
}

Status FileResultWriter::open(RuntimeState* state) {
// Move the _create_file_writer from init to open, because libhdfs depends on JNI.
// In init() function, we are in bthread environment, bthread and JNI have a conflict
// In open() function, we are in pthread environemnt, pthread and JNI doesn't have a conflict
RETURN_IF_ERROR(_create_file_writer());
return Status::OK();
}

// file name format as: my_prefix_0.csv
std::string FileResultWriter::_get_next_file_name() {
std::stringstream ss;
Expand Down
17 changes: 15 additions & 2 deletions be/src/runtime/file_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ struct ResultFileOptions {
size_t max_file_size_bytes = 1 * 1024 * 1024 * 1024; // 1GB
std::vector<TNetworkAddress> broker_addresses;
std::map<std::string, std::string> broker_properties;
int write_buffer_size_kb;
const THdfsProperties* hdfs_properties;
bool use_broker;

ResultFileOptions(const TResultFileSinkOptions& t_opt) {
file_path = t_opt.file_path;
Expand All @@ -56,6 +59,16 @@ struct ResultFileOptions {
broker_addresses = t_opt.broker_addresses;
is_local_file = false;
}
if (t_opt.__isset.hdfs_write_buffer_size_kb) {
write_buffer_size_kb = t_opt.hdfs_write_buffer_size_kb;
}
if (t_opt.__isset.hdfs_properties) {
hdfs_properties = &t_opt.hdfs_properties;
is_local_file = false;
}
if (t_opt.__isset.use_broker) {
use_broker = t_opt.use_broker;
}
if (t_opt.__isset.broker_properties) {
broker_properties = t_opt.broker_properties;
}
Expand All @@ -73,6 +86,7 @@ class FileResultWriter final : public ResultWriter {
Status init(RuntimeState* state) override;
Status append_chunk(vectorized::Chunk* chunk) override;
Status close() override;
Status open(RuntimeState* state) override;

private:
void _init_profile();
Expand All @@ -90,8 +104,7 @@ class FileResultWriter final : public ResultWriter {
const ResultFileOptions* _file_opts;
const std::vector<ExprContext*>& _output_expr_ctxs;

FileSystem* _fs;
std::unique_ptr<FileSystem> _owned_fs;
std::unique_ptr<FileSystem> _fs;
std::unique_ptr<FileBuilder> _file_builder;

// the suffix idx of export file name, start at 0
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/result_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ Status ResultSink::prepare(RuntimeState* state) {
}

Status ResultSink::open(RuntimeState* state) {
RETURN_IF_ERROR(_writer->open(state));
return Expr::open(_output_expr_ctxs, state);
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class ResultWriter {

virtual Status init(RuntimeState* state) = 0;

virtual Status open(RuntimeState* state) { return Status::OK(); };

// convert one chunk to mysql result and
// append this chunk to the result sink
virtual Status append_chunk(vectorized::Chunk* chunk) = 0;
Expand Down
46 changes: 38 additions & 8 deletions fe/fe-core/src/main/java/com/starrocks/analysis/OutFileClause.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.Config;
import com.starrocks.common.UserException;
import com.starrocks.common.util.ParseUtil;
import com.starrocks.common.util.PrintableMap;
import com.starrocks.fs.HdfsUtil;
import com.starrocks.thrift.TFileFormatType;
import com.starrocks.thrift.THdfsProperties;
import com.starrocks.thrift.TResultFileSinkOptions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -46,6 +50,8 @@ public class OutFileClause implements ParseNode {
private static final String PROP_COLUMN_SEPARATOR = "column_separator";
private static final String PROP_LINE_DELIMITER = "line_delimiter";
private static final String PROP_MAX_FILE_SIZE = "max_file_size";
private static final String VIEW_FS_PREFIX = "viewfs://";
private static final String HDFS_PREFIX = "hdfs://";

private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1024 * 1024 * 1024L; // 1GB
private static final long MIN_FILE_SIZE_BYTES = 5 * 1024 * 1024L; // 5MB
Expand Down Expand Up @@ -117,7 +123,7 @@ private void analyzeProperties() throws AnalysisException {
}

Set<String> processedPropKeys = Sets.newHashSet();
getBrokerProperties(processedPropKeys);
getBrokerProperties(filePath, processedPropKeys);
if (brokerDesc == null) {
return;
}
Expand Down Expand Up @@ -153,12 +159,20 @@ private void analyzeProperties() throws AnalysisException {
}
}

private void getBrokerProperties(Set<String> processedPropKeys) {
private void getBrokerProperties(String filePath, Set<String> processedPropKeys) {
boolean outfile_without_broker = false;
if (!properties.containsKey(PROP_BROKER_NAME)) {
return;
if (filePath.startsWith(HDFS_PREFIX) || filePath.startsWith(VIEW_FS_PREFIX)) {
outfile_without_broker = true;
} else {
return;
}
}
String brokerName = null;
if (!outfile_without_broker) {
brokerName = properties.get(PROP_BROKER_NAME);
processedPropKeys.add(PROP_BROKER_NAME);
}
String brokerName = properties.get(PROP_BROKER_NAME);
processedPropKeys.add(PROP_BROKER_NAME);

Map<String, String> brokerProps = Maps.newHashMap();
Iterator<Map.Entry<String, String>> iter = properties.entrySet().iterator();
Expand All @@ -169,8 +183,11 @@ private void getBrokerProperties(Set<String> processedPropKeys) {
processedPropKeys.add(entry.getKey());
}
}

brokerDesc = new BrokerDesc(brokerName, brokerProps);
if (!outfile_without_broker) {
brokerDesc = new BrokerDesc(brokerName, brokerProps);
} else {
brokerDesc = new BrokerDesc(brokerProps);
}
}

private boolean isCsvFormat() {
Expand All @@ -197,14 +214,27 @@ public String toSql() {
return sb.toString();
}

public TResultFileSinkOptions toSinkOptions() {
public TResultFileSinkOptions toSinkOptions() throws AnalysisException {
TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions(filePath, fileFormatType);
if (isCsvFormat()) {
sinkOptions.setColumn_separator(columnSeparator);
sinkOptions.setRow_delimiter(rowDelimiter);
}
sinkOptions.setMax_file_size_bytes(maxFileSizeBytes);
if (brokerDesc != null) {
if (!brokerDesc.hasBroker()) {
sinkOptions.setUse_broker(false);
sinkOptions.setHdfs_write_buffer_size_kb(Config.hdfs_write_buffer_size_kb);
THdfsProperties hdfsProperties = new THdfsProperties();
try {
HdfsUtil.getTProperties(filePath, brokerDesc, hdfsProperties);
} catch (UserException e) {
throw new AnalysisException(e.getMessage());
}
sinkOptions.setHdfs_properties(hdfsProperties);
} else {
sinkOptions.setUse_broker(true);
}
sinkOptions.setBroker_properties(brokerDesc.getProperties());
// broker_addresses of sinkOptions will be set in Coordinator.
// Because we need to choose the nearest broker with the result sink node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.starrocks.analysis.OutFileClause;
import com.starrocks.common.AnalysisException;
import com.starrocks.thrift.TDataSink;
import com.starrocks.thrift.TDataSinkType;
import com.starrocks.thrift.TExplainLevel;
Expand Down Expand Up @@ -83,14 +84,14 @@ public boolean isOutputFileSink() {
}

public boolean needBroker() {
return !Strings.isNullOrEmpty(brokerName);
return fileSinkOptions.isSetUse_broker() && fileSinkOptions.use_broker;
}

public String getBrokerName() {
return brokerName;
}

public void setOutfileInfo(OutFileClause outFileClause) {
public void setOutfileInfo(OutFileClause outFileClause) throws AnalysisException {
sinkType = TResultSinkType.FILE;
fileSinkOptions = outFileClause.toSinkOptions();
brokerName = outFileClause.getBrokerDesc() == null ? null : outFileClause.getBrokerDesc().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private void unLock(Map<String, Database> dbs) {

// if query stmt has OUTFILE clause, set info into ResultSink.
// this should be done after fragments are generated.
private void setOutfileSink(QueryStatement queryStmt, ExecPlan plan) {
private void setOutfileSink(QueryStatement queryStmt, ExecPlan plan) throws AnalysisException {
if (!queryStmt.hasOutFileClause()) {
return;
}
Expand Down
13 changes: 10 additions & 3 deletions gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,14 @@ struct TResultFileSinkOptions {
4: optional string row_delimiter // only for csv
5: optional i64 max_file_size_bytes
6: optional list<Types.TNetworkAddress> broker_addresses; // only for remote file
7: optional map<string, string> broker_properties // only for remote file
7: optional map<string, string> broker_properties // only for remote file.
// If use_broker is set, we will write hdfs thourgh broker
// If use_broker is not set, we will write through libhdfs/S3 directly
8: optional bool use_broker = false
// hdfs_write_buffer_size_kb for writing through lib hdfs directly
9: optional i32 hdfs_write_buffer_size_kb = 0
// properties from hdfs-site.xml, core-site.xml and load_properties
10: optional PlanNodes.THdfsProperties hdfs_properties
}

struct TMemoryScratchSink {
Expand Down Expand Up @@ -130,9 +137,9 @@ struct TExportSink {

// If use_broker is set, we will write hdfs thourgh broker
// If use_broker is not set, we will write through libhdfs/S3 directly
7: optional bool use_broker = false;
7: optional bool use_broker = false
// hdfs_write_buffer_size_kb for writing through lib hdfs directly
8: optional i32 hdfs_write_buffer_size_kb = 0;
8: optional i32 hdfs_write_buffer_size_kb = 0
// properties from hdfs-site.xml, core-site.xml and load_properties
9: optional PlanNodes.THdfsProperties hdfs_properties

Expand Down

0 comments on commit 87927b4

Please sign in to comment.