Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions contrib/pax_storage/src/cpp/access/pax_dml_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void CPaxDmlStateLocal::Reset() { cbdb::pax_memory_context = nullptr; }
CPaxDmlStateLocal::CPaxDmlStateLocal()
: last_oid_(InvalidOid), cb_{.func = DmlStateResetCallback, .arg = NULL} {}

std::shared_ptr<CPaxDmlStateLocal::DmlStateValue>
pg_attribute_always_inline std::shared_ptr<CPaxDmlStateLocal::DmlStateValue>
CPaxDmlStateLocal::RemoveDmlState(const Oid &oid) {
std::shared_ptr<CPaxDmlStateLocal::DmlStateValue> value;

Expand All @@ -121,7 +121,7 @@ CPaxDmlStateLocal::RemoveDmlState(const Oid &oid) {
return value;
}

std::shared_ptr<CPaxDmlStateLocal::DmlStateValue>
pg_attribute_always_inline std::shared_ptr<CPaxDmlStateLocal::DmlStateValue>
CPaxDmlStateLocal::FindDmlState(const Oid &oid) {
Assert(OidIsValid(oid));

Expand Down
6 changes: 0 additions & 6 deletions contrib/pax_storage/src/cpp/comm/cbdb_wrappers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,6 @@ void cbdb::MemoryCtxRegisterResetCallback(MemoryContext context,
CBDB_WRAP_END;
}

Oid cbdb::RelationGetRelationId(Relation rel) {
CBDB_WRAP_START;
{ return RelationGetRelid(rel); }
CBDB_WRAP_END;
}

#ifdef RUN_GTEST
Datum cbdb::DatumFromCString(const char *src, size_t length) {
CBDB_WRAP_START;
Expand Down
6 changes: 4 additions & 2 deletions contrib/pax_storage/src/cpp/comm/cbdb_wrappers.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ void MemoryCtxDelete(MemoryContext memory_context);
void MemoryCtxRegisterResetCallback(MemoryContext context,
MemoryContextCallback *cb);

Oid RelationGetRelationId(Relation rel);

static inline void *DatumToPointer(Datum d) noexcept {
return DatumGetPointer(d);
}
Expand Down Expand Up @@ -164,6 +162,10 @@ static inline float8 DatumToFloat8(Datum d) noexcept {
return DatumGetFloat8(d);
}

static pg_attribute_always_inline Oid RelationGetRelationId(Relation rel) noexcept {
return RelationGetRelid(rel);
}

BpChar *BpcharInput(const char *s, size_t len, int32 atttypmod);
VarChar *VarcharInput(const char *s, size_t len, int32 atttypmod);
text *CstringToText(const char *s, size_t len);
Expand Down
6 changes: 5 additions & 1 deletion contrib/pax_storage/src/cpp/storage/orc/orc_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,11 @@ std::vector<std::pair<int, Datum>> OrcWriter::PrepareWriteTuple(
// Numeric always need ensure that with the 4B header, otherwise it will
// be converted twice in the vectorization path.
if (required_stats_cols[i] || VARATT_IS_COMPRESSED(tts_value_vl) ||
VARATT_IS_EXTERNAL(tts_value_vl) || attrs->atttypid == NUMERICOID) {
VARATT_IS_EXTERNAL(tts_value_vl)
#ifdef VEC_BUILD
|| attrs->atttypid == NUMERICOID
#endif
) {
// still detoast the origin toast
detoast_vl = cbdb::PgDeToastDatum(tts_value_vl);
Assert(detoast_vl != nullptr);
Expand Down
19 changes: 16 additions & 3 deletions contrib/pax_storage/src/cpp/storage/pax.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
#include "storage/vec/pax_vec_reader.h"
#endif

#define PAX_SPLIT_STRATEGY_CHECK_INTERVAL (16)

namespace paxc {
class IndexUpdaterInternal {
public:
Expand Down Expand Up @@ -280,14 +282,25 @@ void TableWriter::Open() {
// insert tuple into the aux table before inserting any tuples.
cbdb::InsertMicroPartitionPlaceHolder(RelationGetRelid(relation_),
current_blockno_);
cur_physical_size_ = 0;
}

void TableWriter::WriteTuple(TupleTableSlot *slot) {
Assert(writer_);
Assert(strategy_);
// should check split strategy before write tuple
// otherwise, may got a empty file in the disk
if (strategy_->ShouldSplit(writer_->PhysicalSize(), num_tuples_)) {
// Because of the CTID constraint, we have to strictly enforce the accuracy of
// the tuple count and make sure it doesn't exceed
// PAX_MAX_NUM_TUPLES_PER_FILE. That's why we kept this precise check here.

// On the other hand,the biggest performance hit here is the PhysicalSize()
// function.So to reduce the overhead of calling it so often,
// we only update the file size every PAX_SPLIT_STRATEGY_CHECK_INTERVAL
// tuples.
if ((num_tuples_ % PAX_SPLIT_STRATEGY_CHECK_INTERVAL) == 0) {
cur_physical_size_ = writer_->PhysicalSize();
}

if (strategy_->ShouldSplit(cur_physical_size_, num_tuples_)) {
writer_->Close();
writer_ = nullptr;
Open();
Expand Down
1 change: 1 addition & 0 deletions contrib/pax_storage/src/cpp/storage/pax.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class TableWriter {
std::vector<std::tuple<ColumnEncoding_Kind, int>> encoding_opts_;

bool is_dfs_table_space_;
size_t cur_physical_size_ = 0;
};

class TableReader final {
Expand Down
36 changes: 32 additions & 4 deletions contrib/pax_storage/src/cpp/storage/vec/pax_porc_adpater.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

#ifdef VEC_BUILD

#include <stdlib.h>

#include "comm/vec_numeric.h"
#include "storage/columns/pax_column_traits.h"
#include "storage/toast/pax_toast.h"
Expand All @@ -38,6 +40,22 @@
#endif
namespace pax {

static inline struct varlena *VarlenaShortTo4B(struct varlena *attr) {
Assert(attr != nullptr);
Assert(VARATT_IS_SHORT(attr));
Size data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
Size new_size = data_size + VARHDRSZ;

struct varlena *new_attr =
reinterpret_cast<struct varlena *>(malloc(new_size));

Assert(new_attr != nullptr);

SET_VARSIZE(new_attr, new_size);
memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
return new_attr;
}

static void CopyFixedRawBufferWithNull(
PaxColumn *column, std::shared_ptr<Bitmap8> visibility_map_bitset,
size_t bitset_index_begin, size_t range_begin, size_t range_lens,
Expand Down Expand Up @@ -235,16 +253,22 @@ static void CopyDecimalBuffer(PaxColumn *column,
out_data_buffer->Brush(type_len);
} else {
Numeric numeric;
bool should_free = false;
size_t num_len = 0;
std::tie(buffer, buffer_len) =
column->GetBuffer(data_index_begin + non_null_offset);

auto vl = (struct varlena *)DatumGetPointer(buffer);
Assert(!(VARATT_IS_EXTERNAL(vl) || VARATT_IS_COMPRESSED(vl) ||
VARATT_IS_SHORT(vl)));
Assert(!(VARATT_IS_EXTERNAL(vl) || VARATT_IS_COMPRESSED(vl)));
num_len = VARSIZE_ANY_EXHDR(vl);
// direct cast
numeric = (Numeric)(buffer);
// it has been detoasted in OrcWriter::PrepareWriteTuple, except numeric
// type with short header should be detoasted to 4B header
if (unlikely(VARATT_IS_SHORT(vl))) {
numeric = VarlenaShortTo4B(vl);
should_free = true;
} else { // direct cast
numeric = (Numeric)(buffer);
}

char *dest_buff = out_data_buffer->GetAvailableBuffer();
Assert(out_data_buffer->Available() >= (size_t)type_len);
Expand All @@ -253,6 +277,10 @@ static void CopyDecimalBuffer(PaxColumn *column,
(int64 *)(dest_buff + sizeof(int64)));
out_data_buffer->Brush(type_len);
non_null_offset++;

if (should_free) {
free(numeric);
}
}
}

Expand Down
Loading