Skip to content

Commit ef04324

Browse files
authored
Merge 20938fc into b1eb036
2 parents b1eb036 + 20938fc commit ef04324

File tree

2 files changed

+21
-4
lines changed

2 files changed

+21
-4
lines changed

ydb/core/change_exchange/change_sender_common_ops.cpp

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueReco
9595
RequestRecords();
9696
}
9797

98-
bool TBaseChangeSender::RequestRecords() {
98+
bool TBaseChangeSender::RequestRecords(bool forceAtLeastOne) {
9999
if (!Enqueued) {
100100
return false;
101101
}
@@ -105,7 +105,11 @@ bool TBaseChangeSender::RequestRecords() {
105105

106106
while (it != Enqueued.end()) {
107107
if (MemUsage && (MemUsage + it->BodySize) > MemLimit) {
108-
break;
108+
if (!forceAtLeastOne) {
109+
break;
110+
}
111+
112+
forceAtLeastOne = false;
109113
}
110114

111115
MemUsage += it->BodySize;
@@ -161,7 +165,20 @@ void TBaseChangeSender::SendRecords() {
161165
THashSet<ui64> registrations;
162166
bool needToResolve = false;
163167

168+
// used to avoid deadlock between RequestRecords & SendRecords
169+
bool processedAtLeastOne = false;
170+
164171
while (it != PendingSent.end()) {
172+
if (Enqueued && Enqueued.begin()->Order <= it->first) {
173+
break;
174+
}
175+
176+
processedAtLeastOne = true;
177+
178+
if (PendingBody && PendingBody.begin()->Order <= it->first) {
179+
break;
180+
}
181+
165182
if (!it->second->IsBroadcast()) {
166183
const ui64 partitionId = Resolver->GetPartitionId(it->second);
167184
if (!Senders.contains(partitionId)) {
@@ -215,7 +232,7 @@ void TBaseChangeSender::SendRecords() {
215232
Resolver->Resolve();
216233
}
217234

218-
RequestRecords();
235+
RequestRecords(!processedAtLeastOne);
219236
}
220237

221238
void TBaseChangeSender::ForgetRecords(TVector<ui64>&& records) {

ydb/core/change_exchange/change_sender_common_ops.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class TBaseChangeSender: public IChangeSender {
9999
void CreateMissingSenders(const TVector<ui64>& partitionIds);
100100
void RecreateSenders(const TVector<ui64>& partitionIds);
101101

102-
bool RequestRecords();
102+
bool RequestRecords(bool forceAtLeastOne = false);
103103
void SendRecords();
104104

105105
void SendPreparedRecords(ui64 partitionId);

0 commit comments

Comments
 (0)