Skip to content

Commit aaf632b

Browse files
committed
WIP
1 parent 93bd594 commit aaf632b

File tree

17 files changed

+1384
-116
lines changed

17 files changed

+1384
-116
lines changed

ydb/core/protos/out/out.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
#include <ydb/public/api/protos/ydb_table.pb.h>
2-
31
#include <ydb/core/protos/blobstorage.pb.h>
42
#include <ydb/core/protos/blobstorage_vdisk_internal.pb.h>
53
#include <ydb/core/protos/blobstorage_vdisk_config.pb.h>
@@ -254,6 +252,10 @@ Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvStatisticsResponse::EStatus, stream, value)
254252
stream << NKikimrStat::TEvStatisticsResponse::EStatus_Name(value);
255253
}
256254

257-
Y_DECLARE_OUT_SPEC(, Ydb::Table::IndexBuildState_State, stream, value) {
258-
stream << IndexBuildState_State_Name(value);
255+
Y_DECLARE_OUT_SPEC(, NKikimrIndexBuilder::EBuildStatus, stream, value) {
256+
stream << NKikimrIndexBuilder::EBuildStatus_Name(value);
257+
}
258+
259+
Y_DECLARE_OUT_SPEC(, NKikimrTxDataShard::TEvLocalKMeansRequest_EState, stream, value) {
260+
stream << NKikimrTxDataShard::TEvLocalKMeansRequest_EState_Name(value);
259261
}

ydb/core/protos/tx_datashard.proto

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import "ydb/core/protos/subdomains.proto";
1717
import "ydb/core/protos/query_stats.proto";
1818
import "ydb/public/api/protos/ydb_issue_message.proto";
1919
import "ydb/public/api/protos/ydb_status_codes.proto";
20+
import "ydb/public/api/protos/ydb_table.proto";
2021
import "ydb/library/yql/dq/actors/protos/dq_events.proto";
2122
import "ydb/library/yql/dq/actors/protos/dq_stats.proto";
2223
import "ydb/library/yql/dq/proto/dq_tasks.proto";
@@ -1486,6 +1487,71 @@ message TEvSampleKResponse {
14861487
repeated bytes Rows = 11;
14871488
}
14881489

1490+
message TEvLocalKMeansRequest {
1491+
optional uint64 Id = 1;
1492+
1493+
optional uint64 TabletId = 2;
1494+
optional NKikimrProto.TPathID PathId = 3;
1495+
1496+
optional uint64 SnapshotTxId = 4;
1497+
optional uint64 SnapshotStep = 5;
1498+
1499+
optional uint64 SeqNoGeneration = 6;
1500+
optional uint64 SeqNoRound = 7;
1501+
1502+
optional Ydb.Table.VectorIndexSettings Settings = 8;
1503+
1504+
optional uint64 Seed = 9;
1505+
optional uint32 K = 10;
1506+
1507+
enum EState {
1508+
SAMPLE = 1;
1509+
KMEANS = 2;
1510+
UPLOAD_MAIN_TO_TMP = 3;
1511+
UPLOAD_MAIN_TO_POSTING = 4;
1512+
UPLOAD_TMP_TO_TMP = 5;
1513+
UPLOAD_TMP_TO_POSTING = 6;
1514+
DONE = 7;
1515+
};
1516+
optional EState Upload = 11;
1517+
// State != DONE
1518+
optional EState State = 12;
1519+
// State != KMEANS || DoneRounds < NeedsRounds
1520+
optional uint32 DoneRounds = 13;
1521+
optional uint32 NeedsRounds = 14;
1522+
1523+
// id of parent cluster
1524+
optional uint32 Parent = 15;
1525+
// [Child ... Child + K] ids reserved for our clusters
1526+
optional uint32 Child = 16;
1527+
1528+
optional string LevelName = 17;
1529+
optional string PostingName = 18;
1530+
1531+
optional string EmbeddingColumn = 19;
1532+
repeated string DataColumns = 20;
1533+
}
1534+
1535+
message TEvLocalKMeansProgressResponse {
1536+
optional uint64 Id = 1;
1537+
1538+
optional uint64 TabletId = 2;
1539+
optional NKikimrProto.TPathID PathId = 3;
1540+
1541+
optional uint64 RequestSeqNoGeneration = 4;
1542+
optional uint64 RequestSeqNoRound = 5;
1543+
1544+
optional NKikimrIndexBuilder.EBuildStatus Status = 6;
1545+
repeated Ydb.Issue.IssueMessage Issues = 7;
1546+
1547+
// TODO(mbkkt) implement slow-path (reliable-path)
1548+
// optional uint64 RowsDelta = 8;
1549+
// optional uint64 BytesDelta = 9;
1550+
1551+
// optional TEvLocalKMeansRequest.EState State = 10;
1552+
// optional uint32 DoneRounds = 11;
1553+
}
1554+
14891555
message TEvCdcStreamScanRequest {
14901556
message TLimits {
14911557
optional uint32 BatchMaxBytes = 1 [default = 512000];

ydb/core/tablet_flat/flat_scan_lead.h

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,14 @@ namespace NTable {
99

1010
struct TLead {
1111
void To(TTagsRef tags, TArrayRef<const TCell> key, ESeek seek)
12+
{
13+
To(key, seek);
14+
SetTags(tags);
15+
}
16+
17+
void To(TArrayRef<const TCell> key, ESeek seek)
1218
{
1319
Valid = true;
14-
Tags.assign(tags.begin(), tags.end());
1520
Relation = seek;
1621
Key = TSerializedCellVec(key);
1722
StopKey = { };
@@ -24,6 +29,10 @@ namespace NTable {
2429
StopKeyInclusive = inclusive;
2530
}
2631

32+
void SetTags(TTagsRef tags) {
33+
Tags.assign(tags.begin(), tags.end());
34+
}
35+
2736
explicit operator bool() const noexcept
2837
{
2938
return Valid;
@@ -34,12 +43,12 @@ namespace NTable {
3443
Valid = false;
3544
}
3645

37-
bool Valid = false;
3846
ESeek Relation = ESeek::Exact;
47+
bool Valid = false;
48+
bool StopKeyInclusive = true;
3949
TVector<ui32> Tags;
4050
TSerializedCellVec Key;
4151
TSerializedCellVec StopKey;
42-
bool StopKeyInclusive = true;
4352
};
4453

4554
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#include "ydb/core/scheme/scheme_tablecell.h"
2+
#include "ydb/core/tx/datashard/upload_stats.h"
3+
#include "ydb/core/tx/tx_proxy/upload_rows.h"
4+
5+
namespace NKikimr::NDataShard {
6+
7+
using TTypes = NTxProxy::TUploadTypes;
8+
using TRows = NTxProxy::TUploadRows;
9+
10+
class TBufferData: public IStatHolder, public TNonCopyable {
11+
public:
12+
TBufferData()
13+
: Rows{std::make_shared<TRows>()}
14+
{
15+
}
16+
17+
ui64 GetRows() const override final {
18+
return Rows->size();
19+
}
20+
21+
std::shared_ptr<TRows> GetRowsData() const {
22+
return Rows;
23+
}
24+
25+
ui64 GetBytes() const override final {
26+
return ByteSize;
27+
}
28+
29+
void FlushTo(TBufferData& other) {
30+
Y_ABORT_UNLESS(this != &other);
31+
Y_ABORT_UNLESS(other.IsEmpty());
32+
other.Rows.swap(Rows);
33+
other.ByteSize = std::exchange(ByteSize, 0);
34+
other.LastKey = std::exchange(LastKey, {});
35+
}
36+
37+
void Clear() {
38+
Rows->clear();
39+
ByteSize = 0;
40+
LastKey = {};
41+
}
42+
43+
void AddRow(TSerializedCellVec&& key, TSerializedCellVec&& targetPk, TString&& targetValue) {
44+
Rows->emplace_back(std::move(targetPk), std::move(targetValue));
45+
ByteSize += Rows->back().first.GetBuffer().size() + Rows->back().second.size();
46+
LastKey = std::move(key);
47+
}
48+
49+
bool IsEmpty() const {
50+
return Rows->empty();
51+
}
52+
53+
size_t Size() const {
54+
return Rows->size();
55+
}
56+
57+
bool IsReachLimits(const TUploadLimits& Limits) {
58+
// TODO(mbkkt) why [0..BatchRowsLimit) but [0..BatchBytesLimit]
59+
return Rows->size() >= Limits.BatchRowsLimit || ByteSize > Limits.BatchBytesLimit;
60+
}
61+
62+
auto&& ExtractLastKey() {
63+
return std::move(LastKey);
64+
}
65+
66+
const auto& GetLastKey() const {
67+
return LastKey;
68+
}
69+
70+
private:
71+
std::shared_ptr<TRows> Rows;
72+
ui64 ByteSize = 0;
73+
TSerializedCellVec LastKey;
74+
};
75+
76+
}

ydb/core/tx/datashard/build_index.cpp

Lines changed: 3 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "range_ops.h"
33
#include "scan_common.h"
44
#include "upload_stats.h"
5+
#include "buffer_data.h"
56

67
#include <ydb/core/base/appdata.h>
78
#include <ydb/core/base/counters.h>
@@ -27,31 +28,6 @@ namespace NKikimr::NDataShard {
2728
#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, stream)
2829
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, stream)
2930

30-
using TColumnsTypes = THashMap<TString, NScheme::TTypeInfo>;
31-
using TTypes = NTxProxy::TUploadTypes;
32-
using TRows = NTxProxy::TUploadRows;
33-
34-
static TColumnsTypes GetAllTypes(const TUserTable& tableInfo) {
35-
TColumnsTypes result;
36-
37-
for (const auto& it : tableInfo.Columns) {
38-
result[it.second.Name] = it.second.Type;
39-
}
40-
41-
return result;
42-
}
43-
44-
static void ProtoYdbTypeFromTypeInfo(Ydb::Type* type, const NScheme::TTypeInfo typeInfo) {
45-
if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) {
46-
auto* typeDesc = typeInfo.GetTypeDesc();
47-
auto* pg = type->mutable_pg_type();
48-
pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc));
49-
pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc));
50-
} else {
51-
type->set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId());
52-
}
53-
}
54-
5531
static std::shared_ptr<TTypes> BuildTypes(const TUserTable& tableInfo, const NKikimrIndexBuilder::TColumnBuildSettings& buildSettings) {
5632
auto types = GetAllTypes(tableInfo);
5733

@@ -119,74 +95,6 @@ bool BuildExtraColumns(TVector<TCell>& cells, const NKikimrIndexBuilder::TColumn
11995
return true;
12096
}
12197

122-
class TBufferData: public IStatHolder, public TNonCopyable {
123-
public:
124-
TBufferData()
125-
: Rows(new TRows)
126-
{
127-
}
128-
129-
ui64 GetRows() const override final {
130-
return Rows->size();
131-
}
132-
133-
std::shared_ptr<TRows> GetRowsData() const {
134-
return Rows;
135-
}
136-
137-
ui64 GetBytes() const override final {
138-
return ByteSize;
139-
}
140-
141-
void FlushTo(TBufferData& other) {
142-
if (this == &other) {
143-
return;
144-
}
145-
146-
Y_ABORT_UNLESS(other.Rows);
147-
Y_ABORT_UNLESS(other.IsEmpty());
148-
149-
other.Rows.swap(Rows);
150-
other.ByteSize = ByteSize;
151-
other.LastKey = std::move(LastKey);
152-
153-
Clear();
154-
}
155-
156-
void Clear() {
157-
Rows->clear();
158-
ByteSize = 0;
159-
LastKey = {};
160-
}
161-
162-
void AddRow(TSerializedCellVec&& key, TSerializedCellVec&& targetPk, TString&& targetValue) {
163-
Rows->emplace_back(std::move(targetPk), std::move(targetValue));
164-
ByteSize += Rows->back().first.GetBuffer().size() + Rows->back().second.size();
165-
LastKey = std::move(key);
166-
}
167-
168-
bool IsEmpty() const {
169-
return Rows->empty();
170-
}
171-
172-
bool IsReachLimits(const TUploadLimits& Limits) {
173-
return Rows->size() >= Limits.BatchRowsLimit || ByteSize > Limits.BatchBytesLimit;
174-
}
175-
176-
void ExtractLastKey(TSerializedCellVec& out) {
177-
out = std::move(LastKey);
178-
}
179-
180-
const TSerializedCellVec& GetLastKey() const {
181-
return LastKey;
182-
}
183-
184-
private:
185-
std::shared_ptr<TRows> Rows;
186-
ui64 ByteSize = 0;
187-
TSerializedCellVec LastKey;
188-
};
189-
19098
template <NKikimrServices::TActivity::EType Activity>
19199
class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable::IScan {
192100
using TThis = TBuildScanUpload<Activity>;
@@ -382,11 +290,7 @@ class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable
382290
<< " WriteBuf empty: " << WriteBuf.IsEmpty()
383291
<< " " << Debug());
384292

385-
if (ReadBuf.IsEmpty()) {
386-
return EScan::Feed;
387-
}
388-
389-
if (WriteBuf.IsEmpty()) {
293+
if (!ReadBuf.IsEmpty() && WriteBuf.IsEmpty()) {
390294
ReadBuf.FlushTo(WriteBuf);
391295
Upload();
392296
}
@@ -433,7 +337,7 @@ class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable
433337

434338
if (UploadStatus.IsSuccess()) {
435339
Stats.Aggr(&WriteBuf);
436-
WriteBuf.ExtractLastKey(LastUploadedKey);
340+
LastUploadedKey = WriteBuf.ExtractLastKey();
437341

438342
//send progress
439343
TAutoPtr<TEvDataShard::TEvBuildIndexProgressResponse> progress = new TEvDataShard::TEvBuildIndexProgressResponse;

ydb/core/tx/datashard/datashard.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,9 @@ struct TEvDataShard {
332332
EvSampleKRequest,
333333
EvSampleKResponse,
334334

335+
EvLocalKMeansRequest,
336+
EvLocalKMeansProgressResponse,
337+
335338
EvEnd
336339
};
337340

@@ -1454,6 +1457,18 @@ struct TEvDataShard {
14541457
TEvDataShard::EvSampleKResponse> {
14551458
};
14561459

1460+
struct TEvLocalKMeansRequest
1461+
: public TEventPB<TEvLocalKMeansRequest,
1462+
NKikimrTxDataShard::TEvLocalKMeansRequest,
1463+
TEvDataShard::EvLocalKMeansRequest> {
1464+
};
1465+
1466+
struct TEvLocalKMeansProgressResponse
1467+
: public TEventPB<TEvLocalKMeansProgressResponse,
1468+
NKikimrTxDataShard::TEvLocalKMeansProgressResponse,
1469+
TEvDataShard::EvLocalKMeansProgressResponse> {
1470+
};
1471+
14571472
struct TEvKqpScan
14581473
: public TEventPB<TEvKqpScan,
14591474
NKikimrTxDataShard::TEvKqpScan,

0 commit comments

Comments
 (0)