Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a control framework between FE and BE through heartbeat #2247 #2364

Merged
merged 9 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "olap/utils.h"
#include "service/backend_options.h"
#include "util/thrift_server.h"
#include "runtime/heartbeat_flags.h"

using std::fstream;
using std::nothrow;
Expand Down Expand Up @@ -147,6 +148,11 @@ Status HeartbeatServer::_heartbeat(
_master_info->__set_http_port(master_info.http_port);
}

if (master_info.__isset.heartbeat_flags) {
HeartbeatFlags* heartbeat_flags = ExecEnv::GetInstance()->heartbeat_flags();
heartbeat_flags->update(master_info.heartbeat_flags);
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_olap_engine->report_notify(true);
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include "runtime/mem_tracker.h"
#include "common/resource_tls.h"
#include "agent/cgroups_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/heartbeat_flags.h"

using std::deque;
using std::list;
Expand Down Expand Up @@ -976,7 +978,7 @@ bool SchemaChangeWithSorting::process(
_temp_delta_versions.second),
rowset_reader->version_hash(),
new_tablet,
rowset_reader->rowset()->rowset_meta()->rowset_type(),
StorageEngine::instance()->default_rowset_type(),
&rowset)) {
LOG(WARNING) << "failed to sorting internally.";
result = false;
Expand Down Expand Up @@ -1033,7 +1035,7 @@ bool SchemaChangeWithSorting::process(
Version(_temp_delta_versions.second, _temp_delta_versions.second),
rowset_reader->version_hash(),
new_tablet,
rowset_reader->rowset()->rowset_meta()->rowset_type(),
StorageEngine::instance()->default_rowset_type(),
&rowset)) {
LOG(WARNING) << "failed to sorting internally.";
result = false;
Expand Down Expand Up @@ -1470,7 +1472,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(
writer_context.tablet_id = new_tablet->tablet_id();
writer_context.partition_id = (*base_rowset)->partition_id();
writer_context.tablet_schema_hash = new_tablet->schema_hash();
writer_context.rowset_type = (*base_rowset)->rowset_meta()->rowset_type();
writer_context.rowset_type = StorageEngine::instance()->default_rowset_type();
writer_context.rowset_path_prefix = new_tablet->tablet_path();
writer_context.tablet_schema = &(new_tablet->tablet_schema());
writer_context.rowset_state = PREPARED;
Expand Down Expand Up @@ -1696,7 +1698,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
writer_context.partition_id = new_tablet->partition_id();
writer_context.tablet_schema_hash = new_tablet->schema_hash();
// linked schema change can't change rowset type, therefore we preserve rowset type in schema change now
writer_context.rowset_type = rs_reader->rowset()->rowset_meta()->rowset_type();
writer_context.rowset_type = StorageEngine::instance()->default_rowset_type();
writer_context.rowset_path_prefix = new_tablet->tablet_path();
writer_context.tablet_schema = &(new_tablet->tablet_schema());
writer_context.rowset_state = VISIBLE;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)),
_memtable_flush_executor(nullptr),
_default_rowset_type(ALPHA_ROWSET),
_compaction_rowset_type(ALPHA_ROWSET) {
_compaction_rowset_type(ALPHA_ROWSET),
_heartbeat_flags(nullptr) {
if (_s_instance == nullptr) {
_s_instance = this;
}
Expand Down
21 changes: 19 additions & 2 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "olap/txn_manager.h"
#include "olap/task/engine_task.h"
#include "olap/rowset/rowset_id_generator.h"
#include "runtime/heartbeat_flags.h"

namespace doris {

Expand Down Expand Up @@ -198,9 +199,23 @@ class StorageEngine {

MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor; }

RowsetTypePB default_rowset_type() const { return _default_rowset_type; }
RowsetTypePB default_rowset_type() const {
if (_heartbeat_flags != nullptr && _heartbeat_flags->is_set_default_rowset_type_to_beta()) {
return BETA_ROWSET;
}
return _default_rowset_type;
}

RowsetTypePB compaction_rowset_type() const { return _compaction_rowset_type; }
RowsetTypePB compaction_rowset_type() const {
if (_heartbeat_flags != nullptr && _heartbeat_flags->is_set_default_rowset_type_to_beta()) {
return BETA_ROWSET;
}
return _compaction_rowset_type;
}

void set_heartbeat_flags(HeartbeatFlags* heartbeat_flags) {
_heartbeat_flags = heartbeat_flags;
}

private:

Expand Down Expand Up @@ -363,6 +378,8 @@ class StorageEngine {
// used to control the the process of converting old data
RowsetTypePB _compaction_rowset_type;

HeartbeatFlags* _heartbeat_flags;

DISALLOW_COPY_AND_ASSIGN(StorageEngine);
};

Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class FrontendServiceClient;
class TPaloBrokerServiceClient;
class TExtDataSourceServiceClient;
template<class T> class ClientCache;
class HeartbeatFlags;

// Execution environment for queries/plan fragments.
// Contains all required global structures, and handles to
Expand Down Expand Up @@ -129,6 +130,7 @@ class ExecEnv {

StreamLoadExecutor* stream_load_executor() { return _stream_load_executor; }
RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; }
HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }

private:
Status _init(const std::vector<StorePath>& store_paths);
Expand Down Expand Up @@ -178,6 +180,7 @@ class ExecEnv {
StreamLoadExecutor* _stream_load_executor = nullptr;
RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
SmallFileMgr* _small_file_mgr = nullptr;
HeartbeatFlags* _heartbeat_flags = nullptr;
};


Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include "gen_cpp/TPaloBrokerService.h"
#include "gen_cpp/TExtDataSourceService.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "runtime/heartbeat_flags.h"

namespace doris {

Expand Down Expand Up @@ -121,7 +122,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_init_mem_tracker();

RETURN_IF_ERROR(_load_channel_mgr->init(_mem_tracker->limit()));

_heartbeat_flags = new HeartbeatFlags();
return Status::OK();
}

Expand Down Expand Up @@ -229,6 +230,7 @@ void ExecEnv::_destory() {
delete _stream_load_executor;
delete _routine_load_task_executor;
delete _external_scan_context_mgr;
delete _heartbeat_flags;
_metrics = nullptr;
}

Expand Down
46 changes: 46 additions & 0 deletions be/src/runtime/heartbeat_flags.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <atomic>

#include "gen_cpp/HeartbeatService_constants.h"

namespace doris {

// This class is for parse control flags from heartbeat message
// between FE and BE.
class HeartbeatFlags {
public:
HeartbeatFlags(uint64_t origin_flags) : _flags(origin_flags) { }

HeartbeatFlags() : HeartbeatFlags(0) { }

void update(uint64_t flags) {
_flags = flags;
}

bool is_set_default_rowset_type_to_beta() {
return _flags & g_HeartbeatService_constants.IS_SET_DEFAULT_ROWSET_TO_BETA_BIT;
}

private:
std::atomic<uint64_t> _flags;
};

}
2 changes: 2 additions & 0 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "common/resource_tls.h"
#include "util/thrift_rpc_helper.h"
#include "util/uid_util.h"
#include "runtime/heartbeat_flags.h"

static void help(const char*);

Expand Down Expand Up @@ -166,6 +167,7 @@ int main(int argc, char** argv) {
auto exec_env = doris::ExecEnv::GetInstance();
doris::ExecEnv::init(exec_env, paths);
exec_env->set_storage_engine(engine);
engine->set_heartbeat_flags(exec_env->heartbeat_flags());

doris::ThriftRpcHelper::setup(exec_env);
doris::ThriftServer* be_server = nullptr;
Expand Down
1 change: 1 addition & 0 deletions be/test/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ ADD_BE_TEST(user_function_cache_test)
ADD_BE_TEST(kafka_consumer_pipe_test)
ADD_BE_TEST(routine_load_task_executor_test)
ADD_BE_TEST(small_file_mgr_test)
ADD_BE_TEST(heartbeat_flags_test)

ADD_BE_TEST(result_queue_mgr_test)
ADD_BE_TEST(memory_scratch_sink_test)
Expand Down
42 changes: 42 additions & 0 deletions be/test/runtime/heartbeat_flags_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include "runtime/heartbeat_flags.h"

namespace doris {

class HeartbeatFlagsTest : public testing::Test {
private:
HeartbeatFlags _flags;
};

TEST_F(HeartbeatFlagsTest, normal) {
ASSERT_FALSE(_flags.is_set_default_rowset_type_to_beta());
_flags.update(1);
ASSERT_TRUE(_flags.is_set_default_rowset_type_to_beta());
_flags.update(2);
ASSERT_FALSE(_flags.is_set_default_rowset_type_to_beta());
}

}

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
4 changes: 4 additions & 0 deletions docs/documentation/cn/administrator-guide/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,7 @@ SET forward_to_master = concat('tr', 'u', 'e');
* `wait_timeout`

用于设置空闲连接的连接时长。当一个空闲连接在该时长内与 Doris 没有任何交互,则 Doris 会主动断开这个链接。默认为 8 小时,单位为秒。

* `default_rowset_type`

用于设置计算节点存储引擎默认的存储格式。当前支持的存储格式包括:alpha/beta。
4 changes: 4 additions & 0 deletions docs/documentation/en/administrator-guide/variables_EN.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,7 @@ SET forward_to_master = concat('tr', 'u', 'e');
* `wait_timeout`

The length of the connection used to set up an idle connection. When an idle connection does not interact with Doris for that length of time, Doris will actively disconnect the link. The default is 8 hours, in seconds.

* `default_rowset_type`

Used for setting the default storage format of Backends storage engine. Valid options: alpha/beta
12 changes: 9 additions & 3 deletions fe/src/main/java/org/apache/doris/analysis/SetVar.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.doris.system.HeartbeatFlags;

// change one variable.
public class SetVar {
private static final Logger LOG = LogManager.getLogger(SetVar.class);

private String variable;
private Expr value;
Expand Down Expand Up @@ -135,6 +133,14 @@ else if (result instanceof IntLiteral) {
throw new AnalysisException("Invalid resource group, now we support {low, normal, high}.");
}
}
if (variable.equalsIgnoreCase(SessionVariable.DEFAULT_ROWSET_TYPE)) {
if (type != SetType.GLOBAL) {
throw new AnalysisException("default_rowset_type must be global. use set global");
}
if (result != null && !HeartbeatFlags.isValidRowsetType(result.getStringValue())) {
throw new AnalysisException("Invalid rowset type, now we support {alpha, beta}.");
}
}
}

public String toSql() {
Expand Down
5 changes: 5 additions & 0 deletions fe/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class SessionVariable implements Serializable, Writable {
* Using only the exec_mem_limit variable does not make a good distinction of memory limit between the two parts.
*/
public static final String LOAD_MEM_LIMIT = "load_mem_limit";
public static final String DEFAULT_ROWSET_TYPE = "default_rowset_type";

// max memory used on every backend.
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
Expand Down Expand Up @@ -213,6 +214,10 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = LOAD_MEM_LIMIT)
private long loadMemLimit = 0L;

// the default rowset type flag which will be passed to Backends througth heartbeat
@VariableMgr.VarAttr(name = DEFAULT_ROWSET_TYPE)
public static String defaultRowsetType = "alpha";

public long getMaxExecMemByte() {
return maxExecMemByte;
}
Expand Down
52 changes: 52 additions & 0 deletions fe/src/main/java/org/apache/doris/system/HeartbeatFlags.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.system;

import org.apache.doris.analysis.SysVariableDesc;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.thrift.HeartbeatServiceConstants;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

// This class is to manage the control flag in heartbeat message between FE and BE.
// The control flag is for FE to control some behaviors of BE.
// Now the flag is represented by 64-bit long type, each bit can be used to control
// one behavior. The first bit is used for set default rowset type to beta flag.
public class HeartbeatFlags {
private static final Logger LOG = LogManager.getLogger(HeartbeatFlags.class);

public static boolean isValidRowsetType(String rowsetType) {
return rowsetType.equalsIgnoreCase("alpha") || rowsetType.equalsIgnoreCase("beta");
}

public long getHeartbeatFlags () {
long heartbeatFlags = 0;
try {
String defaultRowsetType = VariableMgr.getValue(null, new SysVariableDesc(SessionVariable.DEFAULT_ROWSET_TYPE));
if (defaultRowsetType.equalsIgnoreCase("beta")) {
heartbeatFlags |= HeartbeatServiceConstants.IS_SET_DEFAULT_ROWSET_TO_BETA_BIT;
}
} catch (AnalysisException e) {
LOG.warn("parse default rowset type failed.error:{}", e);
}

return heartbeatFlags;
}
}
Loading