Skip to content

Commit

Permalink
Support use local file to accelerate the broker load
Browse files Browse the repository at this point in the history
  • Loading branch information
mchades committed Oct 9, 2022
1 parent 00ef1b7 commit 9f96381
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 23 deletions.
52 changes: 46 additions & 6 deletions be/src/exec/vectorized/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
#include "fs/fs.h"
#include "fs/fs_broker.h"
#include "fs/fs_hdfs.h"
#include "fs/fs_util.h"
#include "gutil/strings/substitute.h"
#include "io/compressed_input_stream.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "storage/olap_define.h"
#include "storage/storage_engine.h"
#include "util/compression/stream_compression.h"

namespace starrocks::vectorized {
Expand Down Expand Up @@ -270,6 +273,34 @@ Status FileScanner::create_sequential_file(const TBrokerRangeDesc& range_desc, c
return Status::OK();
}

std::string FileScanner::create_tmp_file_path() {
timeval tv;
gettimeofday(&tv, nullptr);
struct tm tm;
time_t cur_sec = tv.tv_sec;
localtime_r(&cur_sec, &tm);
char buf[64];
strftime(buf, 64, "%Y%m%d%H%M%S", &tm);

auto stores = StorageEngine::instance()->get_stores();
std::stringstream ss;
gettimeofday(&tv, nullptr); // for ensuring file path random
ss << stores[cur_sec % stores.size()]->path() << TMP_PREFIX << "/" << buf << "." << tv.tv_usec << ".tmp";
return ss.str();
}

StatusOr<std::shared_ptr<TempRandomAccessFile>> FileScanner::create_tmp_file(std::unique_ptr<SequentialFile> broker_file) {
std::string tmp_file_path = create_tmp_file_path();
LOG(INFO) << "broker load cache file: " << tmp_file_path;

ASSIGN_OR_RETURN(auto tmp_sequential_file, FileSystem::Default()->new_writable_file(tmp_file_path));
auto res = fs::copy(broker_file.get(), tmp_sequential_file.get(), 10 * 1024 * 1024);

std::shared_ptr<RandomAccessFile> tmp_file;
ASSIGN_OR_RETURN(tmp_file, FileSystem::Default()->new_random_access_file(tmp_file_path));
return std::make_shared<TempRandomAccessFile>(tmp_file_path, tmp_file);
}

Status FileScanner::create_random_access_file(const TBrokerRangeDesc& range_desc, const TNetworkAddress& address,
const TBrokerScanRangeParams& params, CompressionTypePB compression,
std::shared_ptr<RandomAccessFile>* file) {
Expand All @@ -282,15 +313,24 @@ Status FileScanner::create_random_access_file(const TBrokerRangeDesc& range_desc
case TFileType::FILE_BROKER: {
if (params.__isset.use_broker && !params.use_broker) {
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateUniqueFromString(range_desc.path, FSOptions(&params)));
ASSIGN_OR_RETURN(auto file, fs->new_random_access_file(RandomAccessFileOptions(), range_desc.path));
src_file = std::shared_ptr<RandomAccessFile>(std::move(file));
break;
if (params.__isset.use_local_cache && params.use_local_cache) {
ASSIGN_OR_RETURN(auto file, fs->new_sequential_file(range_desc.path));
ASSIGN_OR_RETURN(src_file, create_tmp_file(std::move(file)));
} else {
ASSIGN_OR_RETURN(auto file, fs->new_random_access_file(RandomAccessFileOptions(), range_desc.path));
src_file = std::shared_ptr<RandomAccessFile>(std::move(file));
}
} else {
BrokerFileSystem fs_broker(address, params.properties);
ASSIGN_OR_RETURN(auto broker_file, fs_broker.new_random_access_file(range_desc.path));
src_file = std::shared_ptr<RandomAccessFile>(std::move(broker_file));
break;
if (params.__isset.use_local_cache && params.use_local_cache) {
ASSIGN_OR_RETURN(auto broker_file, fs_broker.new_sequential_file(range_desc.path));
ASSIGN_OR_RETURN(src_file, create_tmp_file(std::move(broker_file)));
} else {
ASSIGN_OR_RETURN(auto broker_file, fs_broker.new_random_access_file(range_desc.path));
src_file = std::shared_ptr<RandomAccessFile>(std::move(broker_file));
}
}
break;
}
case TFileType::FILE_STREAM:
return Status::NotSupported("Does not support create random-access file from file stream");
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/vectorized/file_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
namespace starrocks {
class SequentialFile;
class RandomAccessFile;
class TempRandomAccessFile;
} // namespace starrocks

namespace starrocks::vectorized {
Expand Down Expand Up @@ -58,6 +59,8 @@ class FileScanner {
// materialize is used to transform source chunk depicted by src_slot_descriptors into destination
// chunk depicted by dest_slot_descriptors
StatusOr<ChunkPtr> materialize(const starrocks::vectorized::ChunkPtr& src, starrocks::vectorized::ChunkPtr& cast);
std::string create_tmp_file_path();
StatusOr<std::shared_ptr<TempRandomAccessFile>> create_tmp_file(std::unique_ptr<SequentialFile> broker_file);

protected:
RuntimeState* _state;
Expand Down
20 changes: 19 additions & 1 deletion be/src/fs/fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
namespace starrocks {

class RandomAccessFile;
class TempRandomAccessFile;
class WritableFile;
class SequentialFile;
class ResultFileOptions;
Expand Down Expand Up @@ -252,7 +253,7 @@ class SequentialFile final : public io::InputStreamWrapper {
};

// A `RandomAccessFile` is an `io::SeekableInputStream` with a name.
class RandomAccessFile final : public io::SeekableInputStreamWrapper {
class RandomAccessFile : public io::SeekableInputStreamWrapper {
public:
explicit RandomAccessFile(std::shared_ptr<io::SeekableInputStream> stream, std::string name)
: io::SeekableInputStreamWrapper(stream.get(), kDontTakeOwnership),
Expand All @@ -268,6 +269,23 @@ class RandomAccessFile final : public io::SeekableInputStreamWrapper {
std::string _name;
};

// Wrap `RandomAccessFile`
class TempRandomAccessFile : public RandomAccessFile {
public:
TempRandomAccessFile(std::string filename, std::shared_ptr<RandomAccessFile> file)
: RandomAccessFile(file->stream(), filename), _filename(filename), _file(std::move(file)) {}
~TempRandomAccessFile() { FileSystem::Default()->delete_file(_filename); }

std::shared_ptr<io::SeekableInputStream> stream() { return _file->stream(); }

// Return name of this file
const std::string& filename() const { return _file->filename(); }

private:
std::string _filename;
std::shared_ptr<RandomAccessFile> _file;
};

// A file abstraction for sequential writing. The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/analysis/LoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class LoadStmt extends DdlStmt {
public static final String TIMEZONE = "timezone";
public static final String PARTIAL_UPDATE = "partial_update";
public static final String PRIORITY = "priority";
public static final String USE_LOCAL_CACHE = "use_local_cache";

// for load data from Baidu Object Store(BOS)
public static final String BOS_ENDPOINT = "bos_endpoint";
Expand Down Expand Up @@ -112,6 +113,7 @@ public class LoadStmt extends DdlStmt {
.add(TIMEZONE)
.add(PARTIAL_UPDATE)
.add(PRIORITY)
.add(USE_LOCAL_CACHE)
.build();

public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
Expand Down Expand Up @@ -256,6 +258,15 @@ public static void checkProperties(Map<String, String> properties) throws DdlExc
throw new DdlException(PRIORITY + " should in HIGHEST/HIGH/NORMAL/LOW/LOWEST");
}
}

// local cache
final String useLocalCache = properties.get(USE_LOCAL_CACHE);
if (useLocalCache != null) {
if (!useLocalCache.equalsIgnoreCase("true")
&& !useLocalCache.equalsIgnoreCase("false")) {
throw new DdlException(USE_LOCAL_CACHE + " is not a boolean");
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachme
LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
brokerFileGroups, getDeadlineMs(), loadMemLimit,
strictMode, transactionId, this, timezone, timeoutSecond, createTimestamp, partialUpdate,
sessionVariables, context, TLoadJobType.Broker, priority);
sessionVariables, context, TLoadJobType.Broker, priority, useLocalCache);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
task.init(loadId, attachment.getFileStatusByTable(aggKey), attachment.getFileNumByTable(aggKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
// reuse deleteFlag as partialUpdate
// @Deprecated
// protected boolean deleteFlag = false;
protected boolean useLocalCache = false; // default is false

protected long createTimestamp = -1;
protected long loadStartTimestamp = -1;
Expand Down Expand Up @@ -230,7 +231,7 @@ public void initLoadProgress(TUniqueId loadId, Set<TUniqueId> fragmentIds, List<

public void updateProgess(Long beId, TUniqueId loadId, TUniqueId fragmentId,
long sinkRows, long sinkBytes, long sourceRows, long sourceBytes, boolean isDone) {
loadingStatus.getLoadStatistic().updateLoadProgress(beId, loadId, fragmentId, sinkRows,
loadingStatus.getLoadStatistic().updateLoadProgress(beId, loadId, fragmentId, sinkRows,
sinkBytes, sourceRows, sourceBytes, isDone);
}

Expand Down Expand Up @@ -323,6 +324,10 @@ protected void setJobProperties(Map<String, String> properties) throws DdlExcept
if (properties.containsKey(LoadStmt.PRIORITY)) {
priority = LoadPriority.priorityByName(properties.get(LoadStmt.PRIORITY));
}

if (properties.containsKey(LoadStmt.USE_LOCAL_CACHE)) {
useLocalCache = Boolean.parseBoolean(properties.get(LoadStmt.USE_LOCAL_CACHE));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class LoadLoadingTask extends LoadTask {
private final long timeoutS;
private final Map<String, String> sessionVariables;
private final TLoadJobType loadJobType;
private boolean useLocalCache;

private LoadingTaskPlanner planner;
private ConnectContext context;
Expand All @@ -78,7 +79,7 @@ public LoadLoadingTask(Database db, OlapTable table, BrokerDesc brokerDesc, List
long jobDeadlineMs, long execMemLimit, boolean strictMode,
long txnId, LoadTaskCallback callback, String timezone,
long timeoutS, long createTimestamp, boolean partialUpdate, Map<String, String> sessionVariables,
ConnectContext context, TLoadJobType loadJobType, int priority) {
ConnectContext context, TLoadJobType loadJobType, int priority, boolean useLocalCache) {
super(callback, TaskType.LOADING, priority);
this.db = db;
this.table = table;
Expand All @@ -97,12 +98,13 @@ public LoadLoadingTask(Database db, OlapTable table, BrokerDesc brokerDesc, List
this.sessionVariables = sessionVariables;
this.context = context;
this.loadJobType = loadJobType;
this.useLocalCache = useLocalCache;
}

public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList, int fileNum) throws UserException {
this.loadId = loadId;
planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups,
strictMode, timezone, timeoutS, createTimestamp, partialUpdate);
strictMode, timezone, timeoutS, createTimestamp, partialUpdate, useLocalCache);
planner.plan(loadId, fileStatusList, fileNum);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class LoadingTaskPlanner {
private final boolean partialUpdate;
private final int parallelInstanceNum;
private final long startTime;
private final boolean useLocalCache;

// Something useful
// ConnectContext here is just a dummy object to avoid some NPE problem, like ctx.getDatabase()
Expand All @@ -103,7 +104,7 @@ public class LoadingTaskPlanner {
public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
boolean strictMode, String timezone, long timeoutS,
long startTime, boolean partialUpdate) {
long startTime, boolean partialUpdate, boolean useLocalCache) {
this.loadJobId = loadJobId;
this.txnId = txnId;
this.dbId = dbId;
Expand All @@ -116,6 +117,7 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table
this.partialUpdate = partialUpdate;
this.parallelInstanceNum = Config.load_parallel_instance_num;
this.startTime = startTime;
this.useLocalCache = useLocalCache;
}

public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded)
Expand Down Expand Up @@ -175,6 +177,7 @@ public void buildDirectPlan(TUniqueId loadId, List<List<TBrokerFileStatus>> file
fileStatusesList, filesAdded);
scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, parallelInstanceNum);
scanNode.setUseVectorizedLoad(true);
scanNode.setUseLocalCache(useLocalCache);
scanNode.init(analyzer);
scanNode.finalizeStats(analyzer);
scanNodes.add(scanNode);
Expand Down Expand Up @@ -217,6 +220,7 @@ public void buildShufflePlan(TUniqueId loadId, List<List<TBrokerFileStatus>> fil
fileStatusesList, filesAdded);
scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, parallelInstanceNum);
scanNode.setUseVectorizedLoad(true);
scanNode.setUseLocalCache(useLocalCache);
scanNode.init(analyzer);
scanNode.finalizeStats(analyzer);
scanNodes.add(scanNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) {
// 3. use vectorized engine
private boolean useVectorizedLoad;

private boolean useLocalCache;

private static class ParamCreateContext {
public BrokerFileGroup fileGroup;
public TBrokerScanRangeParams params;
Expand All @@ -160,6 +162,7 @@ public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
this.filesAdded = filesAdded;
this.parallelInstanceNum = 1;
this.useVectorizedLoad = false;
this.useLocalCache = false;
}

@Override
Expand Down Expand Up @@ -229,6 +232,10 @@ public void setUseVectorizedLoad(boolean useVectorizedLoad) {
this.useVectorizedLoad = useVectorizedLoad;
}

public void setUseLocalCache(boolean useLocalCache) {
this.useLocalCache = useLocalCache;
}

// Called from init, construct source tuple information
private void initParams(ParamCreateContext context)
throws UserException {
Expand Down Expand Up @@ -266,6 +273,7 @@ private void initParams(ParamCreateContext context)
params.setStrict_mode(strictMode);
params.setProperties(brokerDesc.getProperties());
params.setUse_broker(brokerDesc.hasBroker());
params.setUse_local_cache(useLocalCache);
initColumns(context);
initWhereExpr(fileGroup.getWhereExpr(), analyzer);
}
Expand Down
Loading

0 comments on commit 9f96381

Please sign in to comment.