Skip to content

Commit 084d477

Browse files
committed
Fix review issues
1 parent f563a18 commit 084d477

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

ydb/core/change_exchange/change_sender_common_ops.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueReco
9595
RequestRecords();
9696
}
9797

98-
bool TBaseChangeSender::RequestRecords() {
98+
bool TBaseChangeSender::RequestRecords(bool forceAtLeastOne) {
9999
if (!Enqueued) {
100100
return false;
101101
}
@@ -105,7 +105,11 @@ bool TBaseChangeSender::RequestRecords() {
105105

106106
while (it != Enqueued.end()) {
107107
if (MemUsage && (MemUsage + it->BodySize) > MemLimit) {
108-
break;
108+
if (!forceAtLeastOne) {
109+
break;
110+
}
111+
112+
forceAtLeastOne = false;
109113
}
110114

111115
MemUsage += it->BodySize;
@@ -161,6 +165,9 @@ void TBaseChangeSender::SendRecords() {
161165
THashSet<ui64> registrations;
162166
bool needToResolve = false;
163167

168+
// used to avoid deadlock between RequestRecords & SendRecords
169+
bool processedAtLeastOne = false;
170+
164171
while (it != PendingSent.end()) {
165172
if (Enqueued && Enqueued.begin()->Order <= it->first) {
166173
break;
@@ -170,6 +177,8 @@ void TBaseChangeSender::SendRecords() {
170177
break;
171178
}
172179

180+
processedAtLeastOne = true;
181+
173182
if (!it->second->IsBroadcast()) {
174183
const ui64 partitionId = Resolver->GetPartitionId(it->second);
175184
if (!Senders.contains(partitionId)) {
@@ -223,7 +232,7 @@ void TBaseChangeSender::SendRecords() {
223232
Resolver->Resolve();
224233
}
225234

226-
RequestRecords();
235+
RequestRecords(!processedAtLeastOne);
227236
}
228237

229238
void TBaseChangeSender::ForgetRecords(TVector<ui64>&& records) {

ydb/core/change_exchange/change_sender_common_ops.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class TBaseChangeSender: public IChangeSender {
108108
void CreateMissingSenders(const TVector<ui64>& partitionIds);
109109
void RecreateSenders(const TVector<ui64>& partitionIds);
110110

111-
bool RequestRecords();
111+
bool RequestRecords(bool forceAtLeastOne = false);
112112
void SendRecords();
113113

114114
void SendPreparedRecords(ui64 partitionId);

0 commit comments

Comments
 (0)