Skip to content

Commit 0025e00

Browse files
authored
Fix sink empty batch (#10463)
1 parent 6b4d585 commit 0025e00

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

ydb/core/kqp/runtime/kqp_write_table.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -889,14 +889,17 @@ class TShardsInfo {
889889

890890
void MakeNextBatches(i64 maxDataSize, ui64 maxCount) {
891891
YQL_ENSURE(BatchesInFlight == 0);
892+
YQL_ENSURE(!IsEmpty());
892893
i64 dataSize = 0;
894+
// For columnshard batch can be slightly larger than the limit.
893895
while (BatchesInFlight < maxCount
894896
&& BatchesInFlight < Batches.size()
895-
&& dataSize + GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize) {
897+
&& (dataSize + GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize || BatchesInFlight == 0)) {
896898
dataSize += GetBatch(BatchesInFlight)->GetMemory();
897899
++BatchesInFlight;
898900
}
899-
YQL_ENSURE(BatchesInFlight == Batches.size() || GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize);
901+
YQL_ENSURE(BatchesInFlight != 0);
902+
YQL_ENSURE(BatchesInFlight == maxCount || BatchesInFlight == Batches.size() || dataSize + GetBatch(BatchesInFlight)->GetMemory() >= maxDataSize);
900903
}
901904

902905
const IPayloadSerializer::IBatchPtr& GetBatch(size_t index) const {
@@ -1200,7 +1203,9 @@ class TShardedWriteController : public IShardedWriteController {
12001203
if (force) {
12011204
for (auto& [shardId, batches] : Serializer->FlushBatchesForce()) {
12021205
for (auto& batch : batches) {
1203-
ShardsInfo.GetShard(shardId).PushBatch(std::move(batch));
1206+
if (batch && !batch->IsEmpty()) {
1207+
ShardsInfo.GetShard(shardId).PushBatch(std::move(batch));
1208+
}
12041209
}
12051210
}
12061211
} else {

0 commit comments

Comments
 (0)