Skip to content

Commit 7520303

Browse files
Merge 02ed0b5 into 1143523
2 parents 1143523 + 02ed0b5 commit 7520303

File tree

5 files changed

+32
-45
lines changed

5 files changed

+32
-45
lines changed

ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ class TReadSessionActor : public TActorBootstrapped<TReadSessionActor> {
726726
static ui32 NormalizeMaxReadSize(ui32 sourceValue);
727727
static ui32 NormalizeMaxReadPartitionsCount(ui32 sourceValue);
728728

729-
static bool RemoveEmptyMessages(NPersQueue::TReadResponse::TBatchedData& data); // returns true if there are nonempty messages
729+
static bool HasMessages(const NPersQueue::TReadResponse::TBatchedData& data); // returns true if there are any messages
730730

731731
private:
732732
IReadSessionHandlerRef Handler;

ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1527,7 +1527,7 @@ bool TReadSessionActor::ProcessAnswer(const TActorContext& ctx, TFormedReadRespo
15271527

15281528
Y_ABORT_UNLESS(formedResponse->RequestsInfly == 0);
15291529
i64 diff = formedResponse->Response.ByteSize();
1530-
const bool hasMessages = RemoveEmptyMessages(*formedResponse->Response.MutableBatchedData());
1530+
const bool hasMessages = HasMessages(formedResponse->Response.GetBatchedData());
15311531
if (hasMessages) {
15321532
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " assign read id " << ReadIdToResponse << " to read request " << formedResponse->Guid);
15331533
formedResponse->Response.MutableBatchedData()->SetCookie(ReadIdToResponse);
@@ -1760,26 +1760,15 @@ void TReadSessionActor::HandleWakeup(const TActorContext& ctx) {
17601760
}
17611761
}
17621762

1763-
bool TReadSessionActor::RemoveEmptyMessages(TReadResponse::TBatchedData& data) {
1764-
bool hasNonEmptyMessages = false;
1765-
auto isMessageEmpty = [&](TReadResponse::TBatchedData::TMessageData& message) -> bool {
1766-
if (message.GetData().empty()) {
1767-
return true;
1768-
} else {
1769-
hasNonEmptyMessages = true;
1770-
return false;
1763+
bool TReadSessionActor::HasMessages(const TReadResponse::TBatchedData& data) {
1764+
for (const auto& partData : data.GetPartitionData()) {
1765+
for (const auto& batch : partData.GetBatch()) {
1766+
if (batch.MessageDataSize() > 0) {
1767+
return true;
1768+
}
17711769
}
1772-
};
1773-
auto batchRemover = [&](TReadResponse::TBatchedData::TBatch& batch) -> bool {
1774-
NProtoBuf::RemoveRepeatedFieldItemIf(batch.MutableMessageData(), isMessageEmpty);
1775-
return batch.MessageDataSize() == 0;
1776-
};
1777-
auto partitionDataRemover = [&](TReadResponse::TBatchedData::TPartitionData& partition) -> bool {
1778-
NProtoBuf::RemoveRepeatedFieldItemIf(partition.MutableBatch(), batchRemover);
1779-
return partition.BatchSize() == 0;
1780-
};
1781-
NProtoBuf::RemoveRepeatedFieldItemIf(data.MutablePartitionData(), partitionDataRemover);
1782-
return hasNonEmptyMessages;
1770+
}
1771+
return false;
17831772
}
17841773

17851774

ydb/services/persqueue_v1/actors/helpers.cpp

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,27 @@
44

55
namespace NKikimr::NGRpcProxy::V1 {
66

7-
bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data) {
8-
auto batchRemover = [&](PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch& batch) -> bool {
9-
return batch.message_data_size() == 0;
10-
};
11-
auto partitionDataRemover = [&](PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData& partition) -> bool {
12-
NProtoBuf::RemoveRepeatedFieldItemIf(partition.mutable_batches(), batchRemover);
13-
return partition.batches_size() == 0;
14-
};
15-
NProtoBuf::RemoveRepeatedFieldItemIf(data.mutable_partition_data(), partitionDataRemover);
16-
return !data.partition_data().empty();
7+
bool HasMessages(const PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data) {
8+
for (const auto& partData : data.partition_data()) {
9+
for (const auto& batch : partData.batches()) {
10+
if (batch.message_data_size() > 0) {
11+
return true;
12+
}
13+
}
14+
}
15+
return false;
1716
}
1817

1918
// TODO: remove after refactor
20-
bool RemoveEmptyMessages(Topic::StreamReadMessage::ReadResponse& data) {
21-
auto batchRemover = [&](Topic::StreamReadMessage::ReadResponse::Batch& batch) -> bool {
22-
return batch.message_data_size() == 0;
23-
};
24-
auto partitionDataRemover = [&](Topic::StreamReadMessage::ReadResponse::PartitionData& partition) -> bool {
25-
NProtoBuf::RemoveRepeatedFieldItemIf(partition.mutable_batches(), batchRemover);
26-
return partition.batches_size() == 0;
27-
};
28-
NProtoBuf::RemoveRepeatedFieldItemIf(data.mutable_partition_data(), partitionDataRemover);
29-
return !data.partition_data().empty();
19+
bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data) {
20+
for (const auto& partData : data.partition_data()) {
21+
for (const auto& batch : partData.batches()) {
22+
if (batch.message_data_size() > 0) {
23+
return true;
24+
}
25+
}
26+
}
27+
return false;
3028
}
3129

3230
}

ydb/services/persqueue_v1/actors/helpers.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ static constexpr ui64 READ_BLOCK_SIZE = 8_KB; // metering
1313

1414
using namespace Ydb;
1515

16-
bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data);
16+
bool HasMessages(const PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data);
1717

18-
bool RemoveEmptyMessages(Topic::StreamReadMessage::ReadResponse& data);
18+
bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data);
1919

2020
}

ydb/services/persqueue_v1/actors/read_session_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1947,9 +1947,9 @@ ui64 TReadSessionActor<UseMigrationProtocol>::PrepareResponse(typename TFormedRe
19471947
formedResponse->ByteSizeBeforeFiltering = formedResponse->Response.ByteSize();
19481948

19491949
if constexpr (UseMigrationProtocol) {
1950-
formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch());
1950+
formedResponse->HasMessages = HasMessages(formedResponse->Response.data_batch());
19511951
} else {
1952-
formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response());
1952+
formedResponse->HasMessages = HasMessages(formedResponse->Response.read_response());
19531953
}
19541954

19551955
return formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0;

0 commit comments

Comments
 (0)