Skip to content

Commit

Permalink
[Feature] Support light schema change of adding and dropping columns (S…
Browse files Browse the repository at this point in the history
…tarRocks#26246)

Fixes StarRocks#24341
This PR enables light schema change to be supported in

---------

Signed-off-by: zhangqiang <qiangzh95@gmail.com>
Co-authored-by: jijundu <jijundu@tencent.com>
Co-authored-by: zhangqiang <qiangzh95@gmail.com>
  • Loading branch information
3 people authored Aug 24, 2023
1 parent ae58f0b commit 52caf55
Show file tree
Hide file tree
Showing 191 changed files with 2,969 additions and 961 deletions.
4 changes: 2 additions & 2 deletions be/src/connector/binlog_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ BinlogMetaFieldMap BinlogDataSource::_build_binlog_meta_fields(ColumnId start_ci
}

StatusOr<Schema> BinlogDataSource::_build_binlog_schema() {
BinlogMetaFieldMap binlog_meta_map = _build_binlog_meta_fields(_tablet->tablet_schema().num_columns());
BinlogMetaFieldMap binlog_meta_map = _build_binlog_meta_fields(_tablet->tablet_schema()->num_columns());
std::vector<uint32_t> data_column_cids;
std::vector<uint32_t> meta_column_slot_index;
Fields meta_fields;
Expand Down Expand Up @@ -211,7 +211,7 @@ StatusOr<Schema> BinlogDataSource::_build_binlog_schema() {
return Status::InternalError("failed to build binlog schema, no materialized data slot!");
}

const TabletSchema& tablet_schema = _tablet->tablet_schema();
const TabletSchemaCSPtr& tablet_schema = _tablet->tablet_schema();
Schema schema = ChunkHelper::convert_schema(tablet_schema, data_column_cids);
for (int32_t i = 0; i < meta_column_slot_index.size(); i++) {
uint32_t index = meta_column_slot_index[i];
Expand Down
6 changes: 3 additions & 3 deletions be/src/connector/lake_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ Status LakeDataSource::init_reader_params(const std::vector<OlapScanRange*>& key
std::vector<uint32_t>& reader_columns) {
const TLakeScanNode& thrift_lake_scan_node = _provider->_t_lake_scan_node;
bool skip_aggregation = thrift_lake_scan_node.is_preaggregation;
auto parser = _obj_pool.add(new PredicateParser(*_tablet_schema));
auto parser = _obj_pool.add(new PredicateParser(_tablet_schema));
_params.is_pipeline = true;
_params.reader_type = READER_QUERY;
_params.skip_aggregation = skip_aggregation;
Expand Down Expand Up @@ -458,14 +458,14 @@ Status LakeDataSource::init_tablet_reader(RuntimeState* runtime_state) {
RETURN_IF_ERROR(init_unused_output_columns(thrift_lake_scan_node.unused_output_column_name));
RETURN_IF_ERROR(init_scanner_columns(scanner_columns));
RETURN_IF_ERROR(init_reader_params(_scanner_ranges, scanner_columns, reader_columns));
starrocks::Schema child_schema = ChunkHelper::convert_schema(*_tablet_schema, reader_columns);
starrocks::Schema child_schema = ChunkHelper::convert_schema(_tablet_schema, reader_columns);

ASSIGN_OR_RETURN(auto tablet, ExecEnv::GetInstance()->lake_tablet_manager()->get_tablet(_scan_range.tablet_id));
ASSIGN_OR_RETURN(_reader, tablet.new_reader(_version, std::move(child_schema)));
if (reader_columns.size() == scanner_columns.size()) {
_prj_iter = _reader;
} else {
starrocks::Schema output_schema = ChunkHelper::convert_schema(*_tablet_schema, scanner_columns);
starrocks::Schema output_schema = ChunkHelper::convert_schema(_tablet_schema, scanner_columns);
_prj_iter = new_projection_iterator(output_schema, _reader);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ StatusOr<bool> OlapScanNode::_could_tablet_internal_parallel(
StatusOr<bool> OlapScanNode::_could_split_tablet_physically(const std::vector<TScanRangeParams>& scan_ranges) const {
// Keys type needn't merge or aggregate.
ASSIGN_OR_RETURN(TabletSharedPtr first_tablet, get_tablet(&(scan_ranges[0].scan_range.internal_scan_range)));
KeysType keys_type = first_tablet->tablet_schema().keys_type();
KeysType keys_type = first_tablet->tablet_schema()->keys_type();
const auto skip_aggr = thrift_olap_scan_node().is_preaggregation;
bool is_keys_type_matched = keys_type == PRIMARY_KEYS || keys_type == DUP_KEYS ||
((keys_type == UNIQUE_KEYS || keys_type == AGG_KEYS) && skip_aggr);
Expand Down
33 changes: 23 additions & 10 deletions be/src/exec/pipeline/scan/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Status OlapChunkSource::_init_reader_params(const std::vector<std::unique_ptr<Ol
std::vector<uint32_t>& reader_columns) {
const TOlapScanNode& thrift_olap_scan_node = _scan_node->thrift_olap_scan_node();
bool skip_aggregation = thrift_olap_scan_node.is_preaggregation;
auto parser = _obj_pool.add(new PredicateParser(_tablet->tablet_schema()));
auto parser = _obj_pool.add(new PredicateParser(_tablet_schema));
_params.is_pipeline = true;
_params.reader_type = READER_QUERY;
_params.skip_aggregation = skip_aggregation;
Expand Down Expand Up @@ -211,11 +211,11 @@ Status OlapChunkSource::_init_reader_params(const std::vector<std::unique_ptr<Ol
if (skip_aggregation) {
reader_columns = scanner_columns;
} else {
for (size_t i = 0; i < _tablet->num_key_columns(); i++) {
for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) {
reader_columns.push_back(i);
}
for (auto index : scanner_columns) {
if (!_tablet->tablet_schema().column(index).is_key()) {
if (!_tablet_schema->column(index).is_key()) {
reader_columns.push_back(index);
}
}
Expand All @@ -229,7 +229,7 @@ Status OlapChunkSource::_init_reader_params(const std::vector<std::unique_ptr<Ol
Status OlapChunkSource::_init_scanner_columns(std::vector<uint32_t>& scanner_columns) {
for (auto slot : *_slots) {
DCHECK(slot->is_materialized());
int32_t index = _tablet->field_index(slot->col_name());
int32_t index = _tablet_schema->field_index(slot->col_name());
if (index < 0) {
std::stringstream ss;
ss << "invalid field name: " << slot->col_name();
Expand All @@ -253,7 +253,7 @@ Status OlapChunkSource::_init_scanner_columns(std::vector<uint32_t>& scanner_col

Status OlapChunkSource::_init_unused_output_columns(const std::vector<std::string>& unused_output_columns) {
for (const auto& col_name : unused_output_columns) {
int32_t index = _tablet->field_index(col_name);
int32_t index = _tablet_schema->field_index(col_name);
if (index < 0) {
std::stringstream ss;
ss << "invalid field name: " << col_name;
Expand Down Expand Up @@ -294,20 +294,33 @@ Status OlapChunkSource::_init_olap_reader(RuntimeState* runtime_state) {
std::vector<uint32_t> reader_columns;

RETURN_IF_ERROR(_get_tablet(_scan_range));

auto tablet_schema_ptr = _tablet->tablet_schema();
_tablet_schema = TabletSchema::copy(tablet_schema_ptr);

// if column_desc come from fe, reset tablet schema
if (!_scan_node->thrift_olap_scan_node().columns_desc.empty() &&
_scan_node->thrift_olap_scan_node().columns_desc[0].col_unique_id >= 0) {
_tablet_schema->clear_columns();
for (const auto& column_desc : _scan_node->thrift_olap_scan_node().columns_desc) {
_tablet_schema->append_column(TabletColumn(column_desc));
}
}

RETURN_IF_ERROR(_init_global_dicts(&_params));
RETURN_IF_ERROR(_init_unused_output_columns(thrift_olap_scan_node.unused_output_column_name));
RETURN_IF_ERROR(_init_scanner_columns(scanner_columns));
RETURN_IF_ERROR(_init_reader_params(_scan_ctx->key_ranges(), scanner_columns, reader_columns));
const TabletSchema& tablet_schema = _tablet->tablet_schema();
starrocks::Schema child_schema = ChunkHelper::convert_schema(tablet_schema, reader_columns);

starrocks::Schema child_schema = ChunkHelper::convert_schema(_tablet_schema, reader_columns);
RETURN_IF_ERROR(_init_column_access_paths(&child_schema));

_reader = std::make_shared<TabletReader>(_tablet, Version(_morsel->from_version(), _version),
std::move(child_schema), _morsel->rowsets());
std::move(child_schema), _morsel->rowsets(), &_tablet_schema);
if (reader_columns.size() == scanner_columns.size()) {
_prj_iter = _reader;
} else {
starrocks::Schema output_schema = ChunkHelper::convert_schema(tablet_schema, scanner_columns);
starrocks::Schema output_schema = ChunkHelper::convert_schema(_tablet_schema, scanner_columns);
_prj_iter = new_projection_iterator(output_schema, _reader);
}

Expand Down Expand Up @@ -349,7 +362,7 @@ Status OlapChunkSource::_init_global_dicts(TabletReaderParams* params) {
auto iter = global_dict_map.find(slot->id());
if (iter != global_dict_map.end()) {
auto& dict_map = iter->second.first;
int32_t index = _tablet->field_index(slot->col_name());
int32_t index = _tablet_schema->field_index(slot->col_name());
DCHECK(index >= 0);
global_dict->emplace(index, const_cast<GlobalDictMap*>(&dict_map));
}
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/pipeline/scan/olap_chunk_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class OlapChunkSource final : public ChunkSource {

ObjectPool _obj_pool;
TabletSharedPtr _tablet;
std::shared_ptr<TabletSchema> _tablet_schema;
int64_t _version = 0;

RuntimeState* _runtime_state = nullptr;
Expand Down
14 changes: 14 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "column/column_helper.h"
#include "exprs/expr.h"
#include "runtime/mem_pool.h"
#include "storage/tablet_schema.h"
#include "types/constexpr.h"
#include "util/string_parser.hpp"

Expand Down Expand Up @@ -46,6 +47,9 @@ void OlapTableIndexSchema::to_protobuf(POlapTableIndexSchema* pindex) const {
for (auto slot : slots) {
pindex->add_columns(slot->col_name());
}
for (auto column : columns) {
column->to_schema_pb(pindex->add_columns_desc());
}
}

Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
Expand All @@ -69,6 +73,11 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
index->slots.emplace_back(it->second);
}
}
for (auto& pcolumn_desc : p_index.columns_desc()) {
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_pb(pcolumn_desc);
index->columns.emplace_back(tc);
}
_indexes.emplace_back(index);
}

Expand Down Expand Up @@ -99,6 +108,11 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
index->slots.emplace_back(it->second);
}
}
for (auto& tcolumn_desc : t_index.columns_desc) {
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_thrift(tcolumn_desc);
index->columns.emplace_back(tc);
}
_indexes.emplace_back(index);
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/descriptors.pb.h"
#include "runtime/descriptors.h"
#include "storage/tablet_schema.h"
#include "util/random.h"

namespace starrocks {
Expand All @@ -36,6 +37,7 @@ struct OlapTableIndexSchema {
int64_t index_id;
std::vector<SlotDescriptor*> slots;
int32_t schema_hash;
std::vector<TabletColumn*> columns;

void to_protobuf(POlapTableIndexSchema* pindex) const;
};
Expand Down
27 changes: 19 additions & 8 deletions be/src/exec/tablet_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,28 @@ Status TabletScanner::init(RuntimeState* runtime_state, const TabletScannerParam

RETURN_IF_ERROR(Expr::clone_if_not_exists(runtime_state, &_pool, *params.conjunct_ctxs, &_conjunct_ctxs));
RETURN_IF_ERROR(_get_tablet(params.scan_range));

auto tablet_schema_ptr = _tablet->tablet_schema();
_tablet_schema = TabletSchema::copy(tablet_schema_ptr);

// if column_desc come from fe, reset tablet schema
if (!_parent->_olap_scan_node.columns_desc.empty() && _parent->_olap_scan_node.columns_desc[0].col_unique_id >= 0) {
_tablet_schema->clear_columns();
for (const auto& column_desc : _parent->_olap_scan_node.columns_desc) {
_tablet_schema->append_column(TabletColumn(column_desc));
}
}

RETURN_IF_ERROR(_init_unused_output_columns(*params.unused_output_columns));
RETURN_IF_ERROR(_init_return_columns());
RETURN_IF_ERROR(_init_global_dicts());
RETURN_IF_ERROR(_init_reader_params(params.key_ranges));
const TabletSchema& tablet_schema = _tablet->tablet_schema();
Schema child_schema = ChunkHelper::convert_schema(tablet_schema, _reader_columns);
Schema child_schema = ChunkHelper::convert_schema(_tablet_schema, _reader_columns);
_reader = std::make_shared<TabletReader>(_tablet, Version(0, _version), std::move(child_schema));
if (_reader_columns.size() == _scanner_columns.size()) {
_prj_iter = _reader;
} else {
Schema output_schema = ChunkHelper::convert_schema(tablet_schema, _scanner_columns);
Schema output_schema = ChunkHelper::convert_schema(_tablet_schema, _scanner_columns);
_prj_iter = new_projection_iterator(output_schema, _reader);
}

Expand Down Expand Up @@ -181,11 +192,11 @@ Status TabletScanner::_init_reader_params(const std::vector<OlapScanRange*>* key
if (_skip_aggregation) {
_reader_columns = _scanner_columns;
} else {
for (size_t i = 0; i < _tablet->num_key_columns(); i++) {
for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) {
_reader_columns.push_back(i);
}
for (auto index : _scanner_columns) {
if (!_tablet->tablet_schema().column(index).is_key()) {
if (!_tablet_schema->column(index).is_key()) {
_reader_columns.push_back(index);
}
}
Expand All @@ -202,7 +213,7 @@ Status TabletScanner::_init_return_columns() {
if (!slot->is_materialized()) {
continue;
}
int32_t index = _tablet->field_index(slot->col_name());
int32_t index = _tablet_schema->field_index(slot->col_name());
if (index < 0) {
auto msg = strings::Substitute("Invalid column name: $0", slot->col_name());
LOG(WARNING) << msg;
Expand All @@ -224,7 +235,7 @@ Status TabletScanner::_init_return_columns() {

Status TabletScanner::_init_unused_output_columns(const std::vector<std::string>& unused_output_columns) {
for (const auto& col_name : unused_output_columns) {
int32_t index = _tablet->field_index(col_name);
int32_t index = _tablet_schema->field_index(col_name);
if (index < 0) {
auto msg = strings::Substitute("Invalid column name: $0", col_name);
LOG(WARNING) << msg;
Expand All @@ -248,7 +259,7 @@ Status TabletScanner::_init_global_dicts() {
auto iter = global_dict_map.find(slot->id());
if (iter != global_dict_map.end()) {
auto& dict_map = iter->second.first;
int32_t index = _tablet->field_index(slot->col_name());
int32_t index = _tablet_schema->field_index(slot->col_name());
DCHECK(index >= 0);
global_dict->emplace(index, const_cast<GlobalDictMap*>(&dict_map));
}
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/tablet_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class TabletScanner {
std::shared_ptr<TabletReader> _reader;

TabletSharedPtr _tablet;
TabletSchemaSPtr _tablet_schema;
int64_t _version = 0;

// output columns of `this` TabletScanner, i.e, the final output columns of `get_chunk`.
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ SlotDescriptor::SlotDescriptor(SlotId id, std::string name, TypeDescriptor type)
_parent(0),
_null_indicator_offset(0, 0),
_col_name(std::move(name)),
_col_unique_id(-1),
_slot_idx(0),
_slot_size(_type.get_slot_size()),
_is_materialized(false),
Expand All @@ -77,6 +78,7 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc)
_parent(tdesc.parent),
_null_indicator_offset(tdesc.nullIndicatorByte, tdesc.nullIndicatorBit),
_col_name(tdesc.colName),
_col_unique_id(tdesc.col_unique_id),
_slot_idx(tdesc.slotIdx),
_slot_size(_type.get_slot_size()),
_is_materialized(tdesc.isMaterialized),
Expand All @@ -89,6 +91,7 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
_parent(pdesc.parent()),
_null_indicator_offset(pdesc.null_indicator_byte(), pdesc.null_indicator_bit()),
_col_name(pdesc.col_name()),
_col_unique_id(-1),
_slot_idx(pdesc.slot_idx()),
_slot_size(_type.get_slot_size()),
_is_materialized(pdesc.is_materialized()),
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class SlotDescriptor {

std::string debug_string() const;

int32_t col_unique_id() const { return _col_unique_id; }

private:
friend class DescriptorTbl;
friend class TupleDescriptor;
Expand All @@ -119,6 +121,7 @@ class SlotDescriptor {
const TupleId _parent;
const NullIndicatorOffset _null_indicator_offset;
const std::string _col_name;
const int32_t _col_unique_id;

// the idx of the slot in the tuple descriptor (0-based).
// this is provided by the FE
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/local_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ Status LocalTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& pa
options.timeout_ms = params.timeout_ms();
options.write_quorum = params.write_quorum();
options.miss_auto_increment_column = params.miss_auto_increment_column();
options.ptable_schema_param = params.schema();
if (params.is_replicated_storage()) {
for (auto& replica : tablet.replicas()) {
options.replicas.emplace_back(replica);
Expand Down
4 changes: 2 additions & 2 deletions be/src/script/script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static std::string tablet_set_tablet_state(Tablet& tablet, int state) {
}

static const TabletSchema& tablet_tablet_schema(Tablet& tablet) {
return tablet.tablet_schema();
return tablet.unsafe_tablet_schema_ref();
}

static uint64_t tablet_tablet_id(Tablet& tablet) {
Expand Down Expand Up @@ -421,7 +421,7 @@ class StorageEngineRef {
{
auto& cls = m.klass<Rowset>("Rowset");
REG_METHOD(Rowset, rowset_id_str);
REG_METHOD(Rowset, schema);
REG_METHOD(Rowset, schema_ref);
REG_METHOD(Rowset, start_version);
REG_METHOD(Rowset, end_version);
REG_METHOD(Rowset, creation_time);
Expand Down
21 changes: 19 additions & 2 deletions be/src/storage/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,20 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
bool equal(int64_t tablet_id, int32_t schema_hash);

// properties encapsulated in TabletSchema
const TabletSchema& tablet_schema() const;
virtual const TabletSchema& unsafe_tablet_schema_ref() const;

virtual const TabletSchemaCSPtr tablet_schema() const;

bool set_tablet_schema_into_rowset_meta() {
bool flag = false;
for (const RowsetMetaSharedPtr& rowset_meta : _tablet_meta->all_rs_metas()) {
if (!rowset_meta->get_meta_pb().has_tablet_schema()) {
rowset_meta->set_tablet_schema(tablet_schema());
flag = true;
}
}
return flag;
}

protected:
virtual void on_shutdown() {}
Expand Down Expand Up @@ -172,8 +185,12 @@ inline bool BaseTablet::equal(int64_t id, int32_t hash) {
return tablet_id() == id && schema_hash() == hash;
}

inline const TabletSchema& BaseTablet::tablet_schema() const {
inline const TabletSchema& BaseTablet::unsafe_tablet_schema_ref() const {
return _tablet_meta->tablet_schema();
}

inline const TabletSchemaCSPtr BaseTablet::tablet_schema() const {
return _tablet_meta->tablet_schema_ptr();
}

} /* namespace starrocks */
Loading

0 comments on commit 52caf55

Please sign in to comment.