Skip to content
Merged
3 changes: 2 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ydb/core/kqp/ut/olap KqpOlapJson.CompactionVariants
ydb/core/kqp/ut/olap KqpOlapJson.DuplicationCompactionVariants
ydb/core/kqp/ut/olap KqpOlapJson.SwitchAccessorCompactionVariants
ydb/core/kqp/ut/olap KqpOlapWrite.TierDraftsGCWithRestart
ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewEnumStringBytes

ydb/core/kqp/ut/olap [*/*] chunk chunk
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable-ColumnStore
Expand Down Expand Up @@ -130,6 +130,7 @@ ydb/tests/olap/ttl_tiering [data_migration_when_alter_ttl.py] chunk chunk
ydb/tests/olap/ttl_tiering [ttl_delete_s3.py] chunk chunk
ydb/tests/olap/ttl_tiering data_migration_when_alter_ttl.py.TestDataMigrationWhenAlterTtl.test
ydb/tests/olap/ttl_tiering sole chunk chunk
ydb/tests/olap/ttl_tiering ttl_unavailable_s3.py.TestUnavailableS3.test
ydb/tests/olap/ttl_tiering ttl_delete_s3.py.TestDeleteS3Ttl.test_delete_s3_tiering
ydb/tests/olap/ttl_tiering ttl_delete_s3.py.TestDeleteTtl.test_ttl_delete
ydb/tests/sql/large sole chunk chunk
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/formats/arrow/accessor/abstract/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,13 @@ class IChunkedArray {
return Addresses.size();
}

ui32 GetLocalIndex(const ui32 position) const {
AFL_VERIFY(Contains(position))("pos", position)("start", GlobalStartPosition);
return position - GlobalStartPosition;
ui32 GetLocalIndex(const ui32 global) const {
AFL_VERIFY(Contains(global))("pos", global)("start", GlobalStartPosition);
return global - GlobalStartPosition;
}

ui32 GetGlobalIndex(const ui32 local) const {
return local + GlobalStartPosition;
}

bool Contains(const ui32 position) const {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/formats/arrow/accessor/abstract/constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,12 @@ TConstructorContainer TConstructorContainer::GetDefaultConstructor() {
return result;
}

TString IConstructor::SerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
AFL_VERIFY(columnData);
AFL_VERIFY(columnData->GetType() == Type)("column", columnData->GetType())("current", Type);
AFL_VERIFY(columnData->GetDataType()->Equals(externalInfo.GetColumnType()))("column", columnData->GetDataType()->ToString())(
"external", externalInfo.GetColumnType()->ToString());
return DoSerializeToString(columnData, externalInfo);
}

}
6 changes: 1 addition & 5 deletions ydb/core/formats/arrow/accessor/abstract/constructor.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@ class IConstructor {

virtual ~IConstructor() = default;

TString SerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
AFL_VERIFY(columnData);
AFL_VERIFY(columnData->GetType() == Type)("column", columnData->GetType())("current", Type);
return DoSerializeToString(columnData, externalInfo);
}
TString SerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const;

bool IsEqualWithSameTypeTo(const IConstructor& item) const {
return DoIsEqualWithSameTypeTo(item);
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/formats/arrow/accessor/sparsed/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ class TSparsedArray: public IChunkedArray {
const std::shared_ptr<arrow::Scalar>& defaultValue, const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount);

public:
static EType GetTypeStatic() {
return EType::SparsedArray;
}

virtual void Reallocate() override;

static std::shared_ptr<TSparsedArray> BuildFalseArrayUI8(const ui32 recordsCount) {
Expand Down Expand Up @@ -223,6 +227,10 @@ class TSparsedArray: public IChunkedArray {
return Record;
}

const TSparsedArrayChunk& GetSparsedChunk() const {
return Record;
}

virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 index) const override {
return Record.GetScalar(index);
}
Expand Down
19 changes: 16 additions & 3 deletions ydb/core/formats/arrow/save_load/loader.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "loader.h"

#include <ydb/library/formats/arrow/switch/switch_type.h>
#include <ydb/library/formats/arrow/validation/validation.h>

namespace NKikimr::NArrow::NAccessor {
Expand Down Expand Up @@ -28,8 +29,7 @@ const std::shared_ptr<arrow::Field>& TColumnLoader::GetField() const {
return ResultField;
}

TChunkConstructionData TColumnLoader::BuildAccessorContext(
const ui32 recordsCount, const std::optional<ui32>& notNullCount) const {
TChunkConstructionData TColumnLoader::BuildAccessorContext(const ui32 recordsCount, const std::optional<ui32>& notNullCount) const {
return TChunkConstructionData(recordsCount, DefaultValue, ResultField->type(), Serializer.GetObjectPtr(), notNullCount);
}

Expand All @@ -38,7 +38,8 @@ TConclusion<std::shared_ptr<IChunkedArray>> TColumnLoader::ApplyConclusion(
return BuildAccessor(dataStr, BuildAccessorContext(recordsCount, notNullCount));
}

std::shared_ptr<IChunkedArray> TColumnLoader::ApplyVerified(const TString& dataStr, const ui32 recordsCount, const std::optional<ui32>& notNullCount) const {
std::shared_ptr<IChunkedArray> TColumnLoader::ApplyVerified(
const TString& dataStr, const ui32 recordsCount, const std::optional<ui32>& notNullCount) const {
return BuildAccessor(dataStr, BuildAccessorContext(recordsCount, notNullCount)).DetachResult();
}

Expand All @@ -63,4 +64,16 @@ bool TColumnLoader::IsEqualTo(const TColumnLoader& item) const {
return true;
}

std::optional<NSplitter::TSimpleSerializationStat> TColumnLoader::TryBuildColumnStat() const {
std::optional<NSplitter::TSimpleSerializationStat> result;
SwitchType(ResultField->type()->id(), [&](const auto switcher) {
if constexpr (switcher.IsCType) {
using CType = typename decltype(switcher)::ValueType;
result = NSplitter::TSimpleSerializationStat(std::max<ui32>(1, sizeof(CType) / 2), 1, sizeof(CType));
}
return true;
});
return result;
}

} // namespace NKikimr::NArrow::NAccessor
8 changes: 5 additions & 3 deletions ydb/core/formats/arrow/save_load/loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/formats/arrow/serializer/abstract.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/formats/arrow/splitter/stats.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>

Expand All @@ -19,15 +20,16 @@ class TColumnLoader {
TConclusion<std::shared_ptr<IChunkedArray>> BuildAccessor(const TString& originalData, const TChunkConstructionData& chunkData) const;

public:
std::optional<NSplitter::TSimpleSerializationStat> TryBuildColumnStat() const;

std::shared_ptr<IChunkedArray> BuildDefaultAccessor(const ui32 recordsCount) const;

bool IsEqualTo(const TColumnLoader& item) const;

TString DebugString() const;

TColumnLoader(const NSerialization::TSerializerContainer& serializer,
const NAccessor::TConstructorContainer& accessorConstructor, const std::shared_ptr<arrow::Field>& resultField,
const std::shared_ptr<arrow::Scalar>& defaultValue, const ui32 columnId);
TColumnLoader(const NSerialization::TSerializerContainer& serializer, const NAccessor::TConstructorContainer& accessorConstructor,
const std::shared_ptr<arrow::Field>& resultField, const std::shared_ptr<arrow::Scalar>& defaultValue, const ui32 columnId);

ui32 GetColumnId() const {
return ColumnId;
Expand Down
61 changes: 61 additions & 0 deletions ydb/core/kqp/ut/olap/combinatory/abstract.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,66 @@
#include "abstract.h"
#include <util/string/join.h>

namespace NKikimr::NKqp {

TConclusionStatus ICommand::DeserializeFromString(const TString& description) {
try {
auto lines = StringSplitter(description).SplitBySet("\n").ToList<TString>();
const std::set<TString> props = GetCommandProperties();
TString currentProperty;
std::vector<TString> freeArguments;
THashMap<TString, TString> properties;
for (auto&& l : lines) {
l = Strip(l);
if (!l) {
continue;
}
for (auto&& c : props) {
if (l.StartsWith(c)) {
currentProperty = c;
l = Strip(l.substr(c.size()));
}
if (l.StartsWith(":") || l.StartsWith("=")) {
l = Strip(l.substr(1));
}
}
if (!l) {
continue;
}
if (!currentProperty) {
freeArguments.emplace_back(l);
} else {
properties[currentProperty] += l;
}
}

TPropertiesCollection collection(freeArguments, properties);
auto result = DeserializeProperties(collection);
if (result.IsFail()) {
return TConclusionStatus::Fail(result.GetErrorMessage() + ":\n" + collection.DebugString());
}
return TConclusionStatus::Success();
} catch (...) {
return TConclusionStatus::Fail("exception on ICommand::DeserializeFromString: " + CurrentExceptionMessage());
}
}

TString TPropertiesCollection::DebugString() const {
TStringBuilder sb;
sb << "FREE_ARGUMENTS(" << FreeArguments.size() << "):" << Endl;
for (auto&& i : FreeArguments) {
sb << " " << i << Endl;
}

sb << "PROPERTIES(" << Properties.size() << "):" << Endl;
for (auto&& i : Properties) {
sb << " " << i.first << ":" << i.second << Endl;
}
return sb;
}

TString TPropertiesCollection::JoinFreeArguments(const TString& delimiter /*= "\n"*/) const {
return JoinSeq(delimiter, FreeArguments);
}

} // namespace NKikimr::NKqp
61 changes: 61 additions & 0 deletions ydb/core/kqp/ut/olap/combinatory/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,74 @@

namespace NKikimr::NKqp {

class TPropertiesCollection {
private:
std::vector<TString> FreeArguments;
THashMap<TString, TString> Properties;

public:
TString DebugString() const;

TString JoinFreeArguments(const TString& delimiter = "\n") const;

TPropertiesCollection(const std::vector<TString>& freeArgs, const THashMap<TString, TString>& props)
: FreeArguments(freeArgs)
, Properties(props) {
}

std::optional<TString> GetOptional(const TString& propertyName) const {
auto it = Properties.find(propertyName);
if (it == Properties.end()) {
return std::nullopt;
}
return it->second;
}

TString GetVerified(const TString& propertyName) const {
auto it = Properties.find(propertyName);
AFL_VERIFY(it != Properties.end())("name", propertyName)("props", DebugString());
return it->second;
}

ui32 GetFreeArgumentsCount() const {
return FreeArguments.size();
}

std::optional<TString> GetFreeArgumentOptional(const ui32 idx) const {
if (idx < FreeArguments.size()) {
return std::nullopt;
}
return FreeArguments[idx];
}

TString GetFreeArgumentVerified(const ui32 idx) const {
AFL_VERIFY(idx < FreeArguments.size())("idx", idx)("props", DebugString());
return FreeArguments[idx];
}
};

class ICommand {
private:
virtual TConclusionStatus DoExecute(TKikimrRunner& kikimr) = 0;
virtual std::set<TString> DoGetCommandProperties() const {
return {};
}
virtual TConclusionStatus DoDeserializeProperties(const TPropertiesCollection& /*props*/) {
return TConclusionStatus::Success();
}
TConclusionStatus DeserializeProperties(const TPropertiesCollection& props) {
return DoDeserializeProperties(props);
}

public:
virtual ~ICommand() = default;

std::set<TString> GetCommandProperties() const {
return DoGetCommandProperties();
}

TConclusionStatus DeserializeFromString(const TString& description);

TConclusionStatus Execute(TKikimrRunner& kikimr) {
return DoExecute(kikimr);
}
Expand Down
52 changes: 28 additions & 24 deletions ydb/core/kqp/ut/olap/combinatory/bulk_upsert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,38 @@

namespace NKikimr::NKqp {

bool TBulkUpsertCommand::DeserializeFromString(const TString& info) {
auto lines = StringSplitter(info).SplitBySet("\n").SkipEmpty().ToList<TString>();
if (lines.size() < 2 || lines.size() > 3) {
return false;
TConclusionStatus TBulkUpsertCommand::DoExecute(TKikimrRunner& kikimr) {
if (ArrowBatch->num_rows() < PartsCount) {
return TConclusionStatus::Fail(
"not enough records(" + ::ToString(ArrowBatch->num_rows()) + ") for split in " + ::ToString(PartsCount) + " chunks");
}
TableName = Strip(lines[0]);
ArrowBatch = Base64Decode(Strip(lines[1]));
AFL_VERIFY(!!ArrowBatch);
if (lines.size() == 3) {
if (!Ydb::StatusIds_StatusCode_Parse(Strip(lines[2]), &ExpectedCode)) {
return false;
}
// if (lines[2] == "SUCCESS") {
// } else if (lines[2] = "INTERNAL_ERROR") {
// ExpectedCode = Ydb::StatusIds::INTERNAL_ERROR;
// } else if (lines[2] == "BAD_REQUEST") {
// ExpectedCode = Ydb::StatusIds::BAD_REQUEST;
// } else {
// return false;
// }
ui32 cursor = 0;
for (ui32 i = 0; i < PartsCount; ++i) {
const ui32 size = (i + 1 != PartsCount) ? (ArrowBatch->num_rows() / PartsCount) : (ArrowBatch->num_rows() - cursor);
TLocalHelper lHelper(kikimr);
lHelper.SendDataViaActorSystem(TableName, ArrowBatch->Slice(cursor, size), ExpectedCode);
cursor += size;
}
return true;
AFL_VERIFY(cursor == ArrowBatch->num_rows());
return TConclusionStatus::Success();
}

TConclusionStatus TBulkUpsertCommand::DoExecute(TKikimrRunner& kikimr) {
TLocalHelper lHelper(kikimr);
lHelper.SendDataViaActorSystem(
TableName, NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TNativeSerializer().Deserialize(ArrowBatch)), ExpectedCode);
TConclusionStatus TBulkUpsertCommand::DoDeserializeProperties(const TPropertiesCollection& props) {
if (props.GetFreeArgumentsCount() != 2) {
return TConclusionStatus::Fail("incorrect free arguments count for BULK_UPSERTcommand");
}
TableName = props.GetFreeArgumentVerified(0);
ArrowBatch = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TNativeSerializer().Deserialize(Base64Decode(props.GetFreeArgumentVerified(1))));
if (auto value = props.GetOptional("EXPECT_STATUS")) {
if (!Ydb::StatusIds_StatusCode_Parse(*value, &ExpectedCode)) {
return TConclusionStatus::Fail("cannot parse EXPECT_STATUS from " + *value);
}
}
if (auto value = props.GetOptional("PARTS_COUNT")) {
if (!TryFromString<ui32>(*value, PartsCount)) {
return TConclusionStatus::Fail("cannot parse PARTS_COUNT from " + *value);
}
}
return TConclusionStatus::Success();
}

Expand Down
12 changes: 9 additions & 3 deletions ydb/core/kqp/ut/olap/combinatory/bulk_upsert.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@

#include <ydb/library/conclusion/status.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>

namespace NKikimr::NKqp {

class TBulkUpsertCommand: public ICommand {
private:
TString TableName;
TString ArrowBatch;
std::shared_ptr<arrow::RecordBatch> ArrowBatch;
Ydb::StatusIds_StatusCode ExpectedCode = Ydb::StatusIds::SUCCESS;
ui32 PartsCount = 1;

virtual std::set<TString> DoGetCommandProperties() const override {
return { "EXPECT_STATUS", "PARTS_COUNT" };
}
virtual TConclusionStatus DoDeserializeProperties(const TPropertiesCollection& props) override;

public:
TBulkUpsertCommand() = default;

virtual TConclusionStatus DoExecute(TKikimrRunner& kikimr) override;

bool DeserializeFromString(const TString& info);
};

} // namespace NKikimr::NKqp
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/olap/combinatory/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ TConclusionStatus TOneCompactionCommand::DoExecute(TKikimrRunner& /*kikimr*/) {
TConclusionStatus TWaitCompactionCommand::DoExecute(TKikimrRunner& /*kikimr*/) {
auto controller = NYDBTest::TControllers::GetControllerAs<NYDBTest::NColumnShard::TController>();
AFL_VERIFY(controller);
controller->WaitCompactions(TDuration::Seconds(5));
controller->WaitCompactions(TDuration::Seconds(15));
return TConclusionStatus::Success();
}

Expand Down
Loading
Loading