Skip to content

Commit

Permalink
[improve](group commit) Add a swicth to wait internal group commit lo… (
Browse files Browse the repository at this point in the history
apache#26734)

* [improve](group commit) Add a swicth to make internal group commit load finish

* modify group commit tvf plan
  • Loading branch information
mymeiyi authored and 胥剑旭 committed Dec 14, 2023
1 parent 4339e4f commit fc3fa42
Show file tree
Hide file tree
Showing 16 changed files with 62 additions and 62 deletions.
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,7 @@ DEFINE_String(group_commit_replay_wal_dir, "./wal");
DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
DEFINE_Int32(group_commit_sync_wal_batch, "10");
DEFINE_Bool(wait_internal_group_commit_finish, "false");

// the count of thread to group commit insert
DEFINE_Int32(group_commit_insert_threads, "10");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,7 @@ DECLARE_String(group_commit_replay_wal_dir);
DECLARE_Int32(group_commit_replay_wal_retry_num);
DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds);
DECLARE_Int32(group_commit_sync_wal_batch);
DECLARE_Bool(wait_internal_group_commit_finish);

// This config can be set to limit thread number in group commit insert thread pool.
DECLARE_mInt32(group_commit_insert_threads);
Expand Down
3 changes: 2 additions & 1 deletion be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ int HttpStreamAction::on_header(HttpRequest* req) {
ctx->load_type = TLoadType::MANUL_LOAD;
ctx->load_src_type = TLoadSourceType::RAW;

ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true");
ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true") ||
config::wait_internal_group_commit_finish;

ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? true : false;

Expand Down
5 changes: 3 additions & 2 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,9 @@ int StreamLoadAction::on_header(HttpRequest* req) {
url_decode(req->param(HTTP_TABLE_KEY), &ctx->table);
ctx->label = req->header(HTTP_LABEL_KEY);
Status st = Status::OK();
if (iequal(req->header(HTTP_GROUP_COMMIT), "true")) {
if (!ctx->label.empty()) {
if (iequal(req->header(HTTP_GROUP_COMMIT), "true") ||
config::wait_internal_group_commit_finish) {
if (iequal(req->header(HTTP_GROUP_COMMIT), "true") && !ctx->label.empty()) {
st = Status::InternalError("label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
Expand Down
38 changes: 22 additions & 16 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace doris {

Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block) {
DCHECK(block->get_schema_version() == schema_version);
std::unique_lock l(*_mutex);
std::unique_lock l(mutex);
RETURN_IF_ERROR(_status);
while (*_all_block_queues_bytes > config::group_commit_max_queue_size) {
_put_cond.wait_for(
Expand All @@ -49,7 +49,7 @@ Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block)
Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, bool* eos) {
*find_block = false;
*eos = false;
std::unique_lock l(*_mutex);
std::unique_lock l(mutex);
if (!need_commit) {
auto left_milliseconds = config::group_commit_interval_ms -
std::chrono::duration_cast<std::chrono::milliseconds>(
Expand Down Expand Up @@ -99,15 +99,15 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
}

void LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
std::unique_lock l(*_mutex);
std::unique_lock l(mutex);
if (_load_ids.find(load_id) != _load_ids.end()) {
_load_ids.erase(load_id);
_get_cond.notify_all();
}
}

Status LoadBlockQueue::add_load_id(const UniqueId& load_id) {
std::unique_lock l(*_mutex);
std::unique_lock l(mutex);
if (need_commit) {
return Status::InternalError("block queue is set need commit, id=" +
load_instance_id.to_string());
Expand All @@ -118,7 +118,7 @@ Status LoadBlockQueue::add_load_id(const UniqueId& load_id) {

void LoadBlockQueue::cancel(const Status& st) {
DCHECK(!st.ok());
std::unique_lock l(*_mutex);
std::unique_lock l(mutex);
_status = st;
while (!_block_queue.empty()) {
{
Expand Down Expand Up @@ -250,7 +250,8 @@ Status GroupCommitTable::_create_group_commit_load(
<< ", is_pipeline=" << is_pipeline;
{
load_block_queue = std::make_shared<LoadBlockQueue>(
instance_id, label, txn_id, schema_version, _all_block_queues_bytes);
instance_id, label, txn_id, schema_version, _all_block_queues_bytes,
result.wait_internal_group_commit_finish);
std::unique_lock l(_lock);
_load_block_queues.emplace(instance_id, load_block_queue);
_need_plan_fragment = false;
Expand All @@ -269,16 +270,6 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
const std::string& label, int64_t txn_id,
const TUniqueId& instance_id, Status& status,
bool prepare_failed, RuntimeState* state) {
{
std::lock_guard<doris::Mutex> l(_lock);
if (prepare_failed || !status.ok()) {
auto it = _load_block_queues.find(instance_id);
if (it != _load_block_queues.end()) {
it->second->cancel(status);
}
}
_load_block_queues.erase(instance_id);
}
Status st;
Status result_status;
if (status.ok()) {
Expand Down Expand Up @@ -317,6 +308,21 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
10000L);
result_status = Status::create(result.status);
}
{
std::lock_guard<doris::Mutex> l(_lock);
auto it = _load_block_queues.find(instance_id);
if (it != _load_block_queues.end()) {
auto& load_block_queue = it->second;
if (prepare_failed || !status.ok()) {
load_block_queue->cancel(status);
}
if (load_block_queue->wait_internal_group_commit_finish) {
std::unique_lock l2(load_block_queue->mutex);
load_block_queue->internal_group_commit_finish_cv.notify_all();
}
}
_load_block_queues.erase(instance_id);
}
if (!st.ok()) {
LOG(WARNING) << "request finish error, db_id=" << db_id << ", table_id=" << table_id
<< ", label=" << label << ", txn_id=" << txn_id
Expand Down
9 changes: 6 additions & 3 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ class LoadBlockQueue {
public:
LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, int64_t txn_id,
int64_t schema_version,
std::shared_ptr<std::atomic_size_t> all_block_queues_bytes)
std::shared_ptr<std::atomic_size_t> all_block_queues_bytes,
bool wait_internal_group_commit_finish)
: load_instance_id(load_instance_id),
label(label),
txn_id(txn_id),
schema_version(schema_version),
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
_start_time(std::chrono::steady_clock::now()),
_all_block_queues_bytes(all_block_queues_bytes) {
_mutex = std::make_shared<doris::Mutex>();
_single_block_queue_bytes = std::make_shared<std::atomic_size_t>(0);
};

Expand All @@ -60,11 +61,13 @@ class LoadBlockQueue {
int64_t txn_id;
int64_t schema_version;
bool need_commit = false;
bool wait_internal_group_commit_finish = false;
doris::Mutex mutex;
doris::ConditionVariable internal_group_commit_finish_cv;

private:
std::chrono::steady_clock::time_point _start_time;

std::shared_ptr<doris::Mutex> _mutex;
doris::ConditionVariable _put_cond;
doris::ConditionVariable _get_cond;
// the set of load ids of all blocks in this queue
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/stream_load/new_load_stream_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class NewLoadStreamMgr {
_stream_map.emplace(id, stream);
}

LOG(INFO) << "put stream load pipe: " << id;
VLOG_NOTICE << "put stream load pipe: " << id;
return Status::OK();
}

Expand All @@ -63,15 +63,15 @@ class NewLoadStreamMgr {
return iter->second;
}
}
LOG(INFO) << "stream load pipe does not exist: " << id;
VLOG_NOTICE << "stream load pipe does not exist: " << id;
return nullptr;
}

void remove(const UniqueId& id) {
std::lock_guard<std::mutex> l(_lock);
if (auto iter = _stream_map.find(id); iter != _stream_map.end()) {
_stream_map.erase(iter);
LOG(INFO) << "remove stream load pipe: " << id;
VLOG_NOTICE << "remove stream load pipe: " << id;
}
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/sink/group_commit_block_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) {
loaded_rows);
state->set_num_rows_load_total(loaded_rows + state->num_rows_load_unselected() +
state->num_rows_load_filtered());
if (_load_block_queue && _load_block_queue->wait_internal_group_commit_finish) {
std::unique_lock l(_load_block_queue->mutex);
_load_block_queue->internal_group_commit_finish_cv.wait(l);
}
return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,12 @@ public class Config extends ConfigBase {
"Default timeout for insert load job, in seconds."})
public static int insert_load_default_timeout_second = 14400; // 4 hour

@ConfField(mutable = true, masterOnly = true, description = {
"等内部攒批真正写入完成才返回;insert into和stream load默认开启攒批",
"Wait for the internal batch to be written before returning; "
+ "insert into and stream load use group commit by default."})
public static boolean wait_internal_group_commit_finish = false;

@ConfField(mutable = true, masterOnly = true, description = {"Stream load 的默认超时时间,单位是秒。",
"Default timeout for stream load job, in seconds."})
public static int stream_load_default_timeout_second = 86400 * 3; // 3days
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
Expand Down Expand Up @@ -248,21 +249,8 @@ public void getTables(Analyzer analyzer, Map<Long, TableIf> tableMap, Set<String
OlapTable olapTable = (OlapTable) table;
tblName.setDb(olapTable.getDatabase().getFullName());
tblName.setTbl(olapTable.getName());
if (olapTable.getDeleteSignColumn() != null) {
List<Column> columns = Lists.newArrayList(olapTable.getBaseSchema(false));
// The same order as GroupCommitTableValuedFunction#getTableColumns
// delete sign col
columns.add(olapTable.getDeleteSignColumn());
// version col
Column versionColumn = olapTable.getFullSchema().stream().filter(Column::isVersionColumn).findFirst()
.orElse(null);
if (versionColumn != null) {
columns.add(versionColumn);
}
// sequence col
if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() == null) {
columns.add(olapTable.getSequenceCol());
}
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
List<Column> columns = Lists.newArrayList(olapTable.getBaseSchema(true));
targetColumnNames = columns.stream().map(c -> c.getName()).collect(Collectors.toList());
}
}
Expand Down Expand Up @@ -1086,7 +1074,7 @@ public void analyzeGroupCommit(Analyzer analyzer) {
LOG.warn("analyze group commit failed", e);
return;
}
if (ConnectContext.get().getSessionVariable().enableInsertGroupCommit
if (ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
&& targetTable instanceof OlapTable
&& !ConnectContext.get().isTxnModel()
&& getQueryStmt() instanceof SelectStmt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
}

OlapTableSink sink = ((OlapTableSink) planner.getFragments().get(0).getSink());
if (ctx.getSessionVariable().enableInsertGroupCommit) {
if (ctx.getSessionVariable().isEnableInsertGroupCommit()) {
// group commit
if (analyzeGroupCommit(sink, physicalOlapTableSink)) {
handleGroupCommit(ctx, sink, physicalOlapTableSink);
Expand Down Expand Up @@ -421,7 +421,7 @@ private void handleGroupCommit(ConnectContext ctx, OlapTableSink sink,
}

private boolean analyzeGroupCommit(OlapTableSink sink, PhysicalOlapTableSink<?> physicalOlapTableSink) {
return ConnectContext.get().getSessionVariable().enableInsertGroupCommit
return ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
&& physicalOlapTableSink.getTargetTable() instanceof OlapTable
&& !ConnectContext.get().isTxnModel()
&& sink.getFragment().getPlanRoot() instanceof UnionNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2918,4 +2918,8 @@ public void checkSqlDialect(String sqlDialect) {
throw new UnsupportedOperationException("sqlDialect value is invalid, the invalid value is " + sqlDialect);
}
}

public boolean isEnableInsertGroupCommit() {
return enableInsertGroupCommit || Config.wait_internal_group_commit_finish;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ private void parseByLegacy() throws AnalysisException, DdlException {

analyzeVariablesInStmt();
}
if (context.getSessionVariable().enableInsertGroupCommit && parsedStmt instanceof NativeInsertStmt) {
if (context.getSessionVariable().isEnableInsertGroupCommit() && parsedStmt instanceof NativeInsertStmt) {
NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) parsedStmt;
nativeInsertStmt.analyzeGroupCommit(new Analyzer(context.getEnv(), context));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2169,6 +2169,7 @@ private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResu
result.setDbId(parsedStmt.getTargetTable().getDatabase().getId());
result.setTableId(parsedStmt.getTargetTable().getId());
result.setBaseSchemaVersion(((OlapTable) parsedStmt.getTargetTable()).getBaseSchemaVersion());
result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish);
} catch (UserException e) {
LOG.warn("exec sql error", e);
throw new UserException("exec sql error" + e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,27 +66,10 @@ public List<Column> getTableColumns() throws AnalysisException {
throw new AnalysisException("Only support OLAP table, but table type of table_id "
+ tableId + " is " + table.getType());
}
List<Column> tableColumns = table.getBaseSchema(false);
List<Column> tableColumns = table.getBaseSchema(true);
for (int i = 1; i <= tableColumns.size(); i++) {
fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getType(), true));
}
OlapTable olapTable = (OlapTable) table;
// delete sign column
Column deleteSignColumn = olapTable.getDeleteSignColumn();
if (deleteSignColumn != null) {
fileColumns.add(new Column("c" + (fileColumns.size() + 1), deleteSignColumn.getType(), true));
}
// version column
Column versionColumn = olapTable.getFullSchema().stream().filter(Column::isVersionColumn).findFirst()
.orElse(null);
if (versionColumn != null) {
fileColumns.add(new Column("c" + (fileColumns.size() + 1), deleteSignColumn.getType(), true));
}
// sequence column
if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() == null) {
Column sequenceCol = olapTable.getSequenceCol();
fileColumns.add(new Column("c" + (fileColumns.size() + 1), sequenceCol.getType(), true));
}
return fileColumns;
}

Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ struct TStreamLoadPutResult {
4: optional i64 base_schema_version
5: optional i64 db_id
6: optional i64 table_id
7: optional bool wait_internal_group_commit_finish = false
}

struct TStreamLoadMultiTablePutResult {
Expand Down

0 comments on commit fc3fa42

Please sign in to comment.