Skip to content

Commit

Permalink
[fix](merge-on-write) handle create rowset error to avoid null pointe…
Browse files Browse the repository at this point in the history
…r exception (apache#30474)
  • Loading branch information
sollhui authored Jan 29, 2024
1 parent de536a1 commit a571e4c
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 9 deletions.
18 changes: 10 additions & 8 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
(_context.partial_update_info && _context.partial_update_info->is_partial_update)) {
return Status::OK();
}
auto rowset = _build_tmp();
auto* beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
RowsetSharedPtr rowset_ptr;
RETURN_IF_ERROR(_build_tmp(rowset_ptr));
auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get());
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(beta_rowset->load_segments(segment_id, segment_id + 1, &segments));
std::vector<RowsetSharedPtr> specified_rowsets;
Expand All @@ -183,7 +184,7 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
}
OlapStopWatch watch;
RETURN_IF_ERROR(BaseTablet::calc_delete_bitmap(
_context.tablet, rowset, segments, specified_rowsets,
_context.tablet, rowset_ptr, segments, specified_rowsets,
_context.mow_context->delete_bitmap, _context.mow_context->max_version, nullptr));
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
Expand Down Expand Up @@ -648,19 +649,20 @@ void BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta) {
rowset_meta->set_creation_time(time(nullptr));
}

RowsetSharedPtr BaseBetaRowsetWriter::_build_tmp() {
Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) {
std::shared_ptr<RowsetMeta> tmp_rs_meta = std::make_shared<RowsetMeta>();
tmp_rs_meta->init(_rowset_meta.get());
_build_rowset_meta(tmp_rs_meta.get());

RowsetSharedPtr rowset;
auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir,
tmp_rs_meta, &rowset);
tmp_rs_meta, &rowset_ptr);
DBUG_EXECUTE_IF("BaseBetaRowsetWriter::_build_tmp.create_rowset_failed",
{ status = Status::InternalError("create rowset failed"); });
if (!status.ok()) {
LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
return nullptr;
return status;
}
return rowset;
return Status::OK();
}

Status BaseBetaRowsetWriter::_create_file_writer(std::string path, io::FileWriterPtr& file_writer) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class BaseBetaRowsetWriter : public RowsetWriter {
virtual Status _check_segment_number_limit();
virtual int64_t _num_seg() const;
// build a tmp rowset for load segment to calc delete_bitmap for this segment
RowsetSharedPtr _build_tmp();
Status _build_tmp(RowsetSharedPtr& rowset_ptr);

std::atomic<int32_t> _num_segment; // number of consecutive flushed segments
roaring::Roaring _segment_set; // bitmap set to record flushed segment id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_rowset_writer_fault", "nonConcurrent") {
sql """ DROP TABLE IF EXISTS `baseall` """
sql """
CREATE TABLE IF NOT EXISTS `baseall` (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double null comment "",
`k9` float null comment "",
`k12` string null comment "",
`k13` largeint(40) null comment ""
) engine=olap
UNIQUE KEY (k0)
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
"""

GetDebugPoint().clearDebugPointsForAllBEs()
def injection = "BaseBetaRowsetWriter::_build_tmp.create_rowset_failed"
try {
GetDebugPoint().enableDebugPointForAllBEs(injection)
streamLoad {
table "baseall"
db "regression_test_fault_injection_p0"
set 'column_separator', ','
file "baseall.txt"
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
}
}
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains(error_msg))
} finally {
GetDebugPoint().disableDebugPointForAllBEs(injection)
}
sql """ DROP TABLE IF EXISTS `baseall` """
}

0 comments on commit a571e4c

Please sign in to comment.