Skip to content

Commit e49ab88

Browse files
authored
Merge d5b2de6 into fbfd87d
2 parents fbfd87d + d5b2de6 commit e49ab88

File tree

921 files changed

+44976
-17010
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

921 files changed

+44976
-17010
lines changed

.github/config/muted_test.txt

-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ ydb-core-blobstorage-ut_blobstorage/SpaceCheckForDiskReassign::*
33
ydb-services-ydb-sdk_sessions_pool_ut/YdbSdkSessionsPool::StressTestSync10
44
ydb-tests-functional-kqp-kqp_query_session/KqpQuerySession::NoLocalAttach
55
ydb-core-blobstorage-ut_blobstorage/VDiskAssimilation::Test
6-
ydb-core-tx-columnshard-ut_schema/TColumnShardTestSchema::ForgetAfterFail
7-
ydb-core-tx-columnshard-ut_schema/TColumnShardTestSchema::RebootForgetAfterFail
86
ydb-library-yql-sql-pg-ut/PgSqlParsingAutoparam::AutoParamValues_DifferentTypes
97
ydb-core-blobstorage-ut_blobstorage/[6/10]*
108
ydb/core/blobstorage/ut_blobstorage/Defragmentation::DoesItWork

.github/config/muted_ya.txt

+6-7
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
1818
ydb/core/kafka_proxy/ut KafkaProtocol.CreatePartitionsScenario
1919
ydb/core/kafka_proxy/ut KafkaProtocol.ProduceScenario
2020
ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata
21-
ydb/core/kqp/ut/federated_query/generic *
22-
ydb/core/kqp/ut/olap *
21+
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
22+
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
23+
ydb/core/kqp/ut/olap KqpOlap.ScanQueryOltpAndOlap
24+
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
25+
ydb/core/kqp/ut/olap KqpOlap.YqlScriptOltpAndOlap
2326
ydb/core/kqp/ut/pg KqpPg.CreateIndex
2427
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
2528
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
@@ -32,9 +35,7 @@ ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
3235
ydb/core/kqp/ut/service KqpQueryServiceScripts.ForgetScriptExecutionRace
3336
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
3437
ydb/core/kqp/ut/service [38/50]*
35-
ydb/core/tx/columnshard/ut_schema TColumnShardTestSchema.ForgetAfterFail
36-
ydb/core/tx/columnshard/ut_schema TColumnShardTestSchema.RebootForgetAfterFail
37-
ydb/core/tx/columnshard/engines/ut *
38+
ydb/core/persqueue/ut TPQTest.*DirectRead*
3839
ydb/core/tx/coordinator/ut Coordinator.RestoreTenantConfiguration
3940
ydb/core/tx/datashard/ut_change_exchange Cdc.InitialScanDebezium
4041
ydb/core/tx/replication/ydb_proxy/ut YdbProxyTests.ReadTopic
@@ -80,8 +81,6 @@ ydb/tests/fq/s3 *
8081
ydb/tests/fq/yds *
8182
ydb/tests/functional/audit *
8283
ydb/tests/functional/blobstorage test_replication.py.TestReplicationAfterNodesRestart.test_replication*
83-
ydb/tests/functional/clickbench test.py.test_plans*
84-
ydb/tests/functional/clickbench test.py.test_run*
8584
ydb/tests/functional/kqp/kqp_indexes ConsistentIndexRead.InteractiveTx
8685
ydb/tests/functional/kqp/kqp_query_session KqpQuerySession.NoLocalAttach
8786
ydb/tests/functional/postgresql test_postgres.py.TestPostgresSuite.test_postgres_suite*

ydb/core/base/blobstorage.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -981,14 +981,16 @@ struct TEvBlobStorage {
981981
bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier?
982982
mutable NLWTrace::TOrbit Orbit;
983983
std::shared_ptr<TExecutionRelay> ExecutionRelay;
984+
const TString StorageId;
984985

985986
TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags,
986-
ui32 groupId, float approximateFreeSpaceShare)
987+
ui32 groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
987988
: Status(status)
988989
, Id(id)
989990
, StatusFlags(statusFlags)
990991
, GroupId(groupId)
991992
, ApproximateFreeSpaceShare(approximateFreeSpaceShare)
993+
, StorageId(storageId)
992994
{}
993995

994996
TString Print(bool isFull) const {

ydb/core/formats/arrow/arrow_batch_builder.cpp

+12-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#include "arrow_batch_builder.h"
2+
#include "switch/switch_type.h"
23
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h>
34
#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h>
4-
55
namespace NKikimr::NArrow {
66

77
namespace {
@@ -195,12 +195,18 @@ TArrowBatchBuilder::TArrowBatchBuilder(arrow::Compression::type codec, const std
195195
WriteOptions.use_threads = false;
196196
}
197197

198-
bool TArrowBatchBuilder::Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbColumns) {
198+
arrow::Status TArrowBatchBuilder::Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbColumns) {
199199
YdbSchema = ydbColumns;
200200
auto schema = MakeArrowSchema(ydbColumns, NotNullColumns);
201-
auto status = arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), RowsToReserve, &BatchBuilder);
201+
if (!schema.ok()) {
202+
return arrow::Status::FromArgs(schema.status().code(), "Cannot make arrow schema: ", schema.status().ToString());
203+
}
204+
auto status = arrow::RecordBatchBuilder::Make(*schema, arrow::default_memory_pool(), RowsToReserve, &BatchBuilder);
202205
NumRows = NumBytes = 0;
203-
return status.ok();
206+
if (!status.ok()) {
207+
return arrow::Status::FromArgs(schema.status().code(), "Cannot make arrow builder: ", status.ToString());
208+
}
209+
return arrow::Status::OK();
204210
}
205211

206212
void TArrowBatchBuilder::AppendCell(const TCell& cell, ui32 colNum) {
@@ -259,7 +265,7 @@ void TArrowBatchBuilder::ReserveData(ui32 columnNo, size_t size) {
259265
Y_ABORT_UNLESS(columnNo < YdbSchema.size());
260266
auto type = YdbSchema[columnNo].second;
261267

262-
SwitchYqlTypeToArrowType(type, [&](const auto& type) {
268+
Y_ABORT_UNLESS(SwitchYqlTypeToArrowType(type, [&](const auto& type) {
263269
using TWrap = std::decay_t<decltype(type)>;
264270
using TBuilder = typename arrow::TypeTraits<typename TWrap::T>::BuilderType;
265271

@@ -270,7 +276,7 @@ void TArrowBatchBuilder::ReserveData(ui32 columnNo, size_t size) {
270276
Y_ABORT_UNLESS(status.ok());
271277
}
272278
return true;
273-
});
279+
}));
274280
}
275281

276282
std::shared_ptr<arrow::RecordBatch> TArrowBatchBuilder::FlushBatch(bool reinitialize) {

ydb/core/formats/arrow/arrow_batch_builder.h

+7-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "arrow_helpers.h"
33
#include <ydb/core/formats/factory.h>
44
#include <ydb/core/scheme/scheme_tablecell.h>
5+
#include <ydb/library/conclusion/status.h>
56

67
namespace NKikimr::NArrow {
78

@@ -155,8 +156,11 @@ class TArrowBatchBuilder : public NKikimr::IBlockBuilder {
155156
ui64 maxRowsInBlock, ui64 maxBytesInBlock, TString& err) override {
156157
Y_UNUSED(maxRowsInBlock);
157158
Y_UNUSED(maxBytesInBlock);
158-
Y_UNUSED(err);
159-
return Start(columns);
159+
const auto result = Start(columns);
160+
if (!result.ok()) {
161+
err = result.ToString();
162+
}
163+
return result.ok();
160164
}
161165

162166
void AddRow(const NKikimr::TDbTupleRef& key, const NKikimr::TDbTupleRef& value) override;
@@ -175,7 +179,7 @@ class TArrowBatchBuilder : public NKikimr::IBlockBuilder {
175179
return NumBytes;
176180
}
177181

178-
bool Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns);
182+
arrow::Status Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns);
179183
std::shared_ptr<arrow::RecordBatch> FlushBatch(bool reinitialize);
180184
std::shared_ptr<arrow::RecordBatch> GetBatch() const { return Batch; }
181185

ydb/core/formats/arrow/arrow_filter.cpp

+13-17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#include "arrow_filter.h"
22
#include "switch_type.h"
3+
#include "common/container.h"
4+
#include "common/adapter.h"
5+
36
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
47
#include <contrib/libs/apache/arrow/cpp/src/arrow/chunked_array.h>
58
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_vector.h>
@@ -307,7 +310,7 @@ NKikimr::NArrow::TColumnFilter TColumnFilter::MakePredicateFilter(const arrow::D
307310
return NArrow::TColumnFilter(std::move(bits));
308311
}
309312

310-
template <arrow::Datum::Kind kindExpected, class TData>
313+
template <class TData>
311314
bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) {
312315
if (!batch || !batch->num_rows()) {
313316
return false;
@@ -322,33 +325,26 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const
322325
}
323326
}
324327
if (filter.IsTotalDenyFilter()) {
325-
batch = batch->Slice(0, 0);
328+
batch = NAdapter::TDataBuilderPolicy<TData>::GetEmptySame(batch);
326329
return true;
327330
}
328331
if (filter.IsTotalAllowFilter()) {
329332
return true;
330333
}
331-
auto res = arrow::compute::Filter(batch, filter.BuildArrowFilter(batch->num_rows(), startPos, count));
332-
Y_VERIFY_S(res.ok(), res.status().message());
333-
Y_ABORT_UNLESS((*res).kind() == kindExpected);
334-
if constexpr (kindExpected == arrow::Datum::TABLE) {
335-
batch = (*res).table();
336-
return batch->num_rows();
337-
}
338-
if constexpr (kindExpected == arrow::Datum::RECORD_BATCH) {
339-
batch = (*res).record_batch();
340-
return batch->num_rows();
341-
}
342-
AFL_VERIFY(false);
343-
return false;
334+
batch = NAdapter::TDataBuilderPolicy<TData>::ApplyArrowFilter(batch, filter.BuildArrowFilter(batch->num_rows(), startPos, count));
335+
return batch->num_rows();
336+
}
337+
338+
bool TColumnFilter::Apply(std::shared_ptr<TGeneralContainer>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
339+
return ApplyImpl(*this, batch, startPos, count);
344340
}
345341

346342
bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
347-
return ApplyImpl<arrow::Datum::TABLE>(*this, batch, startPos, count);
343+
return ApplyImpl(*this, batch, startPos, count);
348344
}
349345

350346
bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
351-
return ApplyImpl<arrow::Datum::RECORD_BATCH>(*this, batch, startPos, count);
347+
return ApplyImpl(*this, batch, startPos, count);
352348
}
353349

354350
void TColumnFilter::Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const {

ydb/core/formats/arrow/arrow_filter.h

+7
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
namespace NKikimr::NArrow {
1010

11+
class TGeneralContainer;
12+
1113
enum class ECompareType {
1214
LESS = 1,
1315
LESS_OR_EQUAL,
@@ -62,6 +64,10 @@ class TColumnFilter {
6264
return Filter.capacity() * sizeof(ui32) + Count * sizeof(bool);
6365
}
6466

67+
static ui64 GetPredictedMemorySize(const ui32 recordsCount) {
68+
return 2 /* capacity */ * recordsCount * (sizeof(ui32) + sizeof(bool));
69+
}
70+
6571
class TIterator {
6672
private:
6773
i64 InternalPosition = 0;
@@ -172,6 +178,7 @@ class TColumnFilter {
172178
// It makes a filter using composite predicate
173179
static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType);
174180

181+
bool Apply(std::shared_ptr<TGeneralContainer>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
175182
bool Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
176183
bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
177184
void Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const;

0 commit comments

Comments
 (0)