Skip to content

Commit 93ba92e

Browse files
committed
Merge remote-tracking branch 'upstream/main' into YQ-3763-Unmute-yds-tests
2 parents dcaab53 + 341b429 commit 93ba92e

File tree

75 files changed

+2007
-232
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+2007
-232
lines changed

ydb/apps/dstool/lib/common.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,15 @@ def build_pdisk_map(base_config):
684684
return pdisk_map
685685

686686

687+
def build_donors_per_pdisk_map(base_config):
688+
donors_per_vdisk = defaultdict(int)
689+
for vslot in base_config.VSlot:
690+
for donor in vslot.Donors:
691+
pdisk_id = get_pdisk_id(donor.VSlotId)
692+
donors_per_vdisk[pdisk_id] += 1
693+
return donors_per_vdisk
694+
695+
687696
def build_pdisk_static_slots_map(base_config):
688697
pdisk_static_slots_map = {
689698
get_pdisk_id(pdisk): pdisk.NumStaticSlots

ydb/apps/dstool/lib/dstool_cmd_cluster_balance.py

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,53 @@
1010
def add_options(p):
1111
p.add_argument('--max-replicating-pdisks', type=int, help='Limit number of maximum replicating PDisks in the cluster')
1212
p.add_argument('--only-from-overpopulated-pdisks', action='store_true', help='Move vdisks out only from pdisks with over expected slot count')
13+
p.add_argument('--sort-by', choices=['slots', 'space_ratio'], default='slots', help='First to reassign disks with the most slots or with the highest space ratio')
14+
p.add_argument('--storage-pool', type=str, help='Storage pool to balance')
15+
p.add_argument('--max-donors-per-pdisk', type=int, default=0, help='Limit number of donors per pdisk')
1316
common.add_basic_format_options(p)
1417

1518

19+
def build_pdisk_statistics(base_config, pdisk_map, vsolts):
20+
pdisks_statistics = {
21+
pdisk_id: {
22+
"PDiskId": pdisk_id,
23+
"AvailableSize": pdisk.PDiskMetrics.AvailableSize,
24+
"TotalSize": pdisk.PDiskMetrics.TotalSize,
25+
"CandidateVSlots": [],
26+
"DonorVSlots": [],
27+
}
28+
for pdisk_id, pdisk in pdisk_map.items()
29+
if pdisk.PDiskMetrics.TotalSize > 0 # pdisk works
30+
}
31+
for vslot in vsolts:
32+
pdisk_id = common.get_pdisk_id(vslot.VSlotId)
33+
pdisks_statistics[pdisk_id]["CandidateVSlots"].append(vslot)
34+
for vslot in base_config.VSlot:
35+
for donor in vslot.Donors:
36+
pdisk_id = common.get_pdisk_id(donor.VSlotId)
37+
pdisks_statistics[pdisk_id]["DonorVSlots"].append(donor)
38+
return pdisks_statistics
39+
40+
1641
def do(args):
1742
while True:
1843
common.flush_cache()
1944

2045
base_config = common.fetch_base_config()
46+
storage_pools = common.fetch_storage_pools()
2147
node_mon_map = common.fetch_node_mon_map({vslot.VSlotId.NodeId for vslot in base_config.VSlot})
2248
vslot_map = common.build_vslot_map(base_config)
2349
pdisk_map = common.build_pdisk_map(base_config)
2450
pdisk_usage = common.build_pdisk_usage_map(base_config, count_donors=False)
2551
pdisk_usage_w_donors = common.build_pdisk_usage_map(base_config, count_donors=True)
2652

53+
storage_pool_names_map = common.build_storage_pool_names_map(storage_pools)
54+
group_id_to_storage_pool_name_map = {
55+
group_id: storage_pool_names_map[(group.BoxId, group.StoragePoolId)]
56+
for group_id, group in common.build_group_map(base_config).items()
57+
if (group.BoxId, group.StoragePoolId) != (0, 0) # static group
58+
}
59+
2760
vdisks_groups_count_map = defaultdict(int)
2861
for group in base_config.Group:
2962
num = sum(vslot.Status == 'READY' for vslot in common.vslots_of_group(group, vslot_map)) - len(group.VSlotId)
@@ -82,12 +115,26 @@ def do(args):
82115

83116
candidate_vslots = []
84117
if healthy_vslots_from_overpopulated_pdisks:
85-
common.print_if_not_quiet(args, f'Found {len(healthy_vslots_from_overpopulated_pdisks)} vdisks from overpopulated pdisks', sys.stdout)
118+
common.print_if_not_quiet(args, f'Found {len(healthy_vslots_from_overpopulated_pdisks)} vdisks in healthy groups from overpopulated pdisks', sys.stdout)
86119
candidate_vslots = healthy_vslots_from_overpopulated_pdisks
87120
elif healthy_vslots and not args.only_from_overpopulated_pdisks:
88-
common.print_if_not_quiet(args, f'Found {len(healthy_vslots)} vdisks suitable for relocation', sys.stdout)
121+
common.print_if_not_quiet(args, f'Found {len(healthy_vslots)} vdisks in healthy groups', sys.stdout)
89122
candidate_vslots = healthy_vslots
90-
else: # candidate_vslots is empty
123+
124+
if args.storage_pool is not None:
125+
existing_storage_pools = set(group_id_to_storage_pool_name_map.values())
126+
if args.storage_pool not in existing_storage_pools:
127+
print(f"Storage pool {args.storage_pool} not found in existing storage pools: {existing_storage_pools}")
128+
sys.exit(1)
129+
candidate_vslots = [vslot for vslot in candidate_vslots if group_id_to_storage_pool_name_map[vslot.GroupId] == args.storage_pool]
130+
common.print_if_not_quiet(args, f'Found {len(candidate_vslots)} vdisks in {args.storage_pool} sotrage pool', sys.stdout)
131+
132+
if args.max_donors_per_pdisk > 0:
133+
donors_per_pdisk = common.build_donors_per_pdisk_map(base_config)
134+
candidate_vslots = [vslot for vslot in candidate_vslots if donors_per_pdisk[common.get_pdisk_id(vslot.VSlotId)] < args.max_donors_per_pdisk]
135+
common.print_if_not_quiet(args, f'Found {len(candidate_vslots)} vdisks with donors per pdisk < {args.max_donors_per_pdisk}', sys.stdout)
136+
137+
if len(candidate_vslots) == 0:
91138
common.print_if_not_quiet(args, 'No vdisks suitable for relocation found, waiting..', sys.stdout)
92139
time.sleep(10)
93140
continue
@@ -182,14 +229,30 @@ def add_reassign_cmd(request, vslot):
182229
return True
183230
# end of do_reassign()
184231

185-
vslots_by_pdisk_slot_usage = defaultdict(list)
186-
for vslot in candidate_vslots:
187-
pdisk_id = common.get_pdisk_id(vslot.VSlotId)
188-
pdisk_slot_usage = pdisk_usage[pdisk_id]
189-
vslots_by_pdisk_slot_usage[pdisk_slot_usage].append(vslot)
190-
191-
# check vslots from pdisks with the highest slot usage first
192-
for pdisk_slot_usage, vslots in sorted(vslots_by_pdisk_slot_usage.items(), reverse=True):
232+
vslots_ordered_groups_to_reassign = None
233+
if args.sort_by == 'slots':
234+
vslots_by_pdisk_slot_usage = defaultdict(list)
235+
for vslot in candidate_vslots:
236+
pdisk_id = common.get_pdisk_id(vslot.VSlotId)
237+
pdisk_slot_usage = pdisk_usage[pdisk_id]
238+
vslots_by_pdisk_slot_usage[pdisk_slot_usage].append(vslot)
239+
vslots_ordered_groups_to_reassign = [vslots for _, vslots in sorted(vslots_by_pdisk_slot_usage.items(), reverse=True)]
240+
elif args.sort_by == 'space_ratio':
241+
pdisks = {
242+
pdisk_id: {
243+
"FreeSpaceRatio": float(pdisk.PDiskMetrics.AvailableSize) / float(pdisk.PDiskMetrics.TotalSize),
244+
"CandidateVSlots": [],
245+
}
246+
for pdisk_id, pdisk in pdisk_map.items()
247+
if pdisk.PDiskMetrics.TotalSize > 0 # pdisk works
248+
}
249+
for vslot in candidate_vslots:
250+
pdisk_id = common.get_pdisk_id(vslot.VSlotId)
251+
pdisks[pdisk_id]["CandidateVSlots"].append(vslot)
252+
print({pdisk: (len(info["CandidateVSlots"]), info["FreeSpaceRatio"]) for pdisk, info in pdisks.items()})
253+
vslots_ordered_groups_to_reassign = [info["CandidateVSlots"] for _, info in sorted(list(pdisks.items()), key=lambda x: x[1]["FreeSpaceRatio"])]
254+
255+
for vslots in vslots_ordered_groups_to_reassign:
193256
random.shuffle(vslots)
194257
for vslot in vslots:
195258
if do_reassign(vslot, False):

ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp

Lines changed: 62 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,26 +25,49 @@ void TStrategyBase::EvaluateCurrentLayout(TLogContext &logCtx, TBlobState &state
2525
ui32 lostDisks = 0;
2626
ui32 unknownDisks = 0;
2727

28+
TString parts;
29+
TStringOutput s(parts);
30+
2831
const ui32 totalPartCount = info.Type.TotalPartCount();
2932
for (ui32 diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) {
33+
if (diskIdx) {
34+
s << ' ';
35+
}
36+
3037
TBlobState::TDisk &disk = state.Disks[diskIdx];
31-
bool isHandoff = (diskIdx >= totalPartCount);
32-
ui32 beginPartIdx = (isHandoff ? 0 : diskIdx);
33-
ui32 endPartIdx = (isHandoff ? totalPartCount : (diskIdx + 1));
38+
const bool isHandoff = diskIdx >= totalPartCount;
39+
const ui32 beginPartIdx = isHandoff ? 0 : diskIdx;
40+
const ui32 endPartIdx = isHandoff ? totalPartCount : (diskIdx + 1);
3441
EDiskEvaluation diskEvaluation = ((considerSlowAsError && disk.IsSlow) ? EDE_ERROR : EDE_UNKNOWN);
3542
for (ui32 partIdx = beginPartIdx; partIdx < endPartIdx; ++partIdx) {
36-
TBlobState::ESituation partSituation = disk.DiskParts[partIdx].Situation;
37-
if (partSituation == TBlobState::ESituation::Error) {
38-
DSP_LOG_DEBUG_SX(logCtx, "BPG41", "Id# " << state.Id.ToString()
39-
<< " Restore Disk# " << diskIdx << " Part# " << partIdx << " Error");
40-
diskEvaluation = EDE_ERROR;
41-
}
42-
if (partSituation == TBlobState::ESituation::Lost) {
43-
DSP_LOG_DEBUG_SX(logCtx, "BPG65", "Id# " << state.Id.ToString()
44-
<< " Restore Disk# " << diskIdx << " Part# " << partIdx << " Lost");
45-
if (diskEvaluation != EDE_ERROR) {
46-
diskEvaluation = EDE_LOST;
47-
}
43+
switch (disk.DiskParts[partIdx].Situation) {
44+
case TBlobState::ESituation::Unknown:
45+
s << '?';
46+
break;
47+
48+
case TBlobState::ESituation::Error:
49+
s << 'E';
50+
diskEvaluation = EDE_ERROR;
51+
break;
52+
53+
case TBlobState::ESituation::Absent:
54+
s << '-';
55+
break;
56+
57+
case TBlobState::ESituation::Lost:
58+
s << 'L';
59+
if (diskEvaluation != EDE_ERROR) {
60+
diskEvaluation = EDE_LOST;
61+
}
62+
break;
63+
64+
case TBlobState::ESituation::Present:
65+
s << '+';
66+
break;
67+
68+
case TBlobState::ESituation::Sent:
69+
s << 'S';
70+
break;
4871
}
4972
}
5073
if (diskEvaluation == EDE_ERROR) {
@@ -57,23 +80,25 @@ void TStrategyBase::EvaluateCurrentLayout(TLogContext &logCtx, TBlobState &state
5780
// If there are some error disks at the same moment, the group should be actually disintegrated.
5881
} else {
5982
for (ui32 partIdx = beginPartIdx; partIdx < endPartIdx; ++partIdx) {
60-
TBlobState::ESituation partSituation = disk.DiskParts[partIdx].Situation;
61-
if (partSituation == TBlobState::ESituation::Present) {
62-
DSP_LOG_DEBUG_SX(logCtx, "BPG42", "Request# "
63-
<< " Id# " << state.Id.ToString()
64-
<< " Disk# " << diskIdx << " Part# " << partIdx << " Present");
65-
presentLayout.AddItem(diskIdx, partIdx, info.Type);
66-
optimisticLayout.AddItem(diskIdx, partIdx, info.Type);
67-
altruisticLayout.AddItem(diskIdx, partIdx, info.Type);
68-
diskEvaluation = EDE_NORMAL;
69-
} else if (partSituation == TBlobState::ESituation::Unknown
70-
|| partSituation == TBlobState::ESituation::Sent) {
71-
DSP_LOG_DEBUG_SX(logCtx, "BPG43", "Id# " << state.Id.ToString()
72-
<< " Disk# " << diskIdx << " Part# " << partIdx << " Unknown");
73-
optimisticLayout.AddItem(diskIdx, partIdx, info.Type);
74-
altruisticLayout.AddItem(diskIdx, partIdx, info.Type);
75-
} else if (partSituation == TBlobState::ESituation::Absent) {
76-
diskEvaluation = EDE_NORMAL;
83+
switch (disk.DiskParts[partIdx].Situation) {
84+
case TBlobState::ESituation::Present:
85+
presentLayout.AddItem(diskIdx, partIdx, info.Type);
86+
optimisticLayout.AddItem(diskIdx, partIdx, info.Type);
87+
altruisticLayout.AddItem(diskIdx, partIdx, info.Type);
88+
[[fallthrough]];
89+
case TBlobState::ESituation::Absent:
90+
diskEvaluation = EDE_NORMAL;
91+
break;
92+
93+
case TBlobState::ESituation::Unknown:
94+
case TBlobState::ESituation::Sent:
95+
optimisticLayout.AddItem(diskIdx, partIdx, info.Type);
96+
altruisticLayout.AddItem(diskIdx, partIdx, info.Type);
97+
break;
98+
99+
case TBlobState::ESituation::Error:
100+
case TBlobState::ESituation::Lost:
101+
Y_ABORT("impossible case");
77102
}
78103
}
79104
}
@@ -100,12 +125,14 @@ void TStrategyBase::EvaluateCurrentLayout(TLogContext &logCtx, TBlobState &state
100125
*altruisticState = info.BlobState(altruisticReplicas, lostDisks);
101126

102127
DSP_LOG_DEBUG_SX(logCtx, "BPG44", "Id# " << state.Id.ToString()
128+
<< " considerSlowAsError# " << considerSlowAsError
129+
<< " Parts# {" << parts << '}'
103130
<< " pessimisticReplicas# " << pessimisticReplicas
104-
<< " altruisticState# " << TBlobStorageGroupInfo::BlobStateToString(*altruisticState)
131+
<< " p.State# " << TBlobStorageGroupInfo::BlobStateToString(*pessimisticState)
105132
<< " optimisticReplicas# " << optimisticReplicas
106-
<< " optimisticState# " << TBlobStorageGroupInfo::BlobStateToString(*optimisticState)
133+
<< " o.State# " << TBlobStorageGroupInfo::BlobStateToString(*optimisticState)
107134
<< " altruisticReplicas# " << altruisticReplicas
108-
<< " pessimisticState# " << TBlobStorageGroupInfo::BlobStateToString(*pessimisticState));
135+
<< " a.State# " << TBlobStorageGroupInfo::BlobStateToString(*altruisticState));
109136
}
110137

111138

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ void TPDisk::Stop() {
327327
auto& req = JointChunkReads.front();
328328
Y_VERIFY_DEBUG_S(req->GetType() == ERequestType::RequestChunkReadPiece,
329329
"Unexpected request type# " << TypeName(*req));
330-
TRequestBase::AbortDelete(req.Release(), PCtx->ActorSystem);
330+
TRequestBase::AbortDelete(req.Get(), PCtx->ActorSystem);
331331
JointChunkReads.pop();
332332
}
333333

ydb/core/blobstorage/vdisk/repl/query_donor.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,9 @@ namespace NKikimr {
6868
}
6969

7070
if (action) {
71-
const TActorId temp(actorId);
7271
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_VDISK_GET, SelfId() << " sending " << query->ToString()
73-
<< " to " << temp);
74-
Send(actorId, query.release());
72+
<< " to " << actorId);
73+
Send(actorId, query.release(), IEventHandle::FlagTrackDelivery);
7574
} else {
7675
PassAway();
7776
}
@@ -116,6 +115,7 @@ namespace NKikimr {
116115

117116
STRICT_STFUNC(StateFunc,
118117
hFunc(TEvBlobStorage::TEvVGetResult, Handle);
118+
cFunc(TEvents::TSystem::Undelivered, Step);
119119
cFunc(TEvents::TSystem::Poison, PassAway);
120120
)
121121
};

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,32 @@ using TCallback = NFq::TJsonFilter::TCallback;
1616
const char* OffsetFieldName = "_offset";
1717
TString LogPrefix = "JsonFilter: ";
1818

19+
NYT::TNode CreateTypeNode(const TString& fieldType) {
20+
return NYT::TNode::CreateList()
21+
.Add("DataType")
22+
.Add(fieldType);
23+
}
24+
1925
void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
2026
node.Add(
2127
NYT::TNode::CreateList()
2228
.Add(fieldName)
23-
.Add(NYT::TNode::CreateList().Add("DataType").Add(fieldType))
29+
.Add(CreateTypeNode(fieldType))
30+
);
31+
}
32+
33+
void AddOptionalField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
34+
node.Add(NYT::TNode::CreateList()
35+
.Add(fieldName)
36+
.Add(NYT::TNode::CreateList().Add("OptionalType").Add(CreateTypeNode(fieldType)))
2437
);
2538
}
2639

2740
NYT::TNode MakeInputSchema(const TVector<TString>& columns) {
2841
auto structMembers = NYT::TNode::CreateList();
2942
AddField(structMembers, OffsetFieldName, "Uint64");
3043
for (const auto& col : columns) {
31-
AddField(structMembers, col, "String");
44+
AddOptionalField(structMembers, col, "String");
3245
}
3346
return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers));
3447
}
@@ -112,7 +125,9 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
112125

113126
size_t fieldId = 0;
114127
for (const auto& column : values.second) {
115-
items[FieldsPositions[fieldId++]] = NKikimr::NMiniKQL::MakeString(column[rowId]);
128+
items[FieldsPositions[fieldId++]] = column[rowId].data() // Check that std::string_view was initialized in json_parser
129+
? NKikimr::NMiniKQL::MakeString(column[rowId]).MakeOptional()
130+
: NKikimr::NUdf::TUnboxedValuePod();
116131
}
117132

118133
Worker->Push(std::move(result));
@@ -264,7 +279,13 @@ class TJsonFilter::TImpl {
264279
} else if (columnType == "Optional<Json>") {
265280
columnType = "Optional<String>";
266281
}
267-
str << "CAST(" << columnNames[i] << " as " << columnType << ") as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : "");
282+
283+
if (columnType.StartsWith("Optional")) {
284+
str << "IF(" << columnNames[i] << " IS NOT NULL, Unwrap(CAST(" << columnNames[i] << " as " << columnType << ")), NULL)";
285+
} else {
286+
str << "Unwrap(CAST(" << columnNames[i] << " as " << columnType << "))";
287+
}
288+
str << " as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : "");
268289
}
269290
str << " FROM Input;\n";
270291
str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n";

0 commit comments

Comments
 (0)