Skip to content

Commit 9a0cec9

Browse files
authored
Merge 9704813 into 18ebd43
2 parents 18ebd43 + 9704813 commit 9a0cec9

File tree

2 files changed

+14
-11
lines changed

2 files changed

+14
-11
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

+1
Original file line numberDiff line numberDiff line change
@@ -1260,6 +1260,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
12601260
.InputIndex = inputIndex,
12611261
.StatsLevel = collectStatsLevel,
12621262
.TxId = TxId,
1263+
.TaskId = Task.GetId(),
12631264
.SecureParams = secureParams,
12641265
.TaskParams = taskParams,
12651266
.ReadRanges = readRanges,

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp

+13-11
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ struct TEvPrivate {
9393
class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqComputeActorAsyncInput {
9494
public:
9595
using TPartitionKey = std::pair<TString, ui64>; // Cluster, partition id.
96-
using TDebugOffsets = TMaybe<std::pair<ui64, ui64>>;
96+
using TDebugOffsets = THashMap<ui64, std::pair<ui64, ui64>>;
9797

9898
TDqPqReadActor(
9999
ui64 inputIndex,
@@ -173,7 +173,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
173173

174174
DeferredCommits.emplace(checkpoint.GetId(), std::make_pair(std::move(CurrentDeferredCommit), CurrentDeferredCommitOffset));
175175
CurrentDeferredCommit = NYdb::NTopic::TDeferredCommit();
176-
CurrentDeferredCommitOffset.Clear();
176+
CurrentDeferredCommitOffset.clear();
177177
}
178178

179179
void LoadState(const NDqProto::TSourceState& state) override {
@@ -216,13 +216,13 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
216216

217217
void CommitState(const NDqProto::TCheckpoint& checkpoint) override {
218218
const auto checkpointId = checkpoint.GetId();
219+
SRC_LOG_D("Commit state, checkpoint " << checkpointId);
219220
while (!DeferredCommits.empty() && DeferredCommits.front().first <= checkpointId) {
221+
SRC_LOG_D("Commit checkpoint " << DeferredCommits.front().first);
220222
auto& valuePair = DeferredCommits.front().second;
221-
const auto& offsets = valuePair.second;
222-
if (offsets.Empty()) {
223-
SRC_LOG_D("Commit offset: [ empty ]");
224-
} else {
225-
SRC_LOG_D("Commit offset: [" << offsets->first << ", " << offsets->second << "]");
223+
const auto& offsetMap = valuePair.second;
224+
for (const auto& [partitionId, offsets]: offsetMap) {
225+
SRC_LOG_D("Commit offset, partition id " << partitionId << " [" << offsets.first << ", " << offsets.second << ")");
226226
}
227227
valuePair.first.Commit();
228228
DeferredCommits.pop();
@@ -434,11 +434,13 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
434434
for (const auto& [PartitionSession, ranges] : readyBatch.OffsetRanges) {
435435
for (const auto& [start, end] : ranges) {
436436
CurrentDeferredCommit.Add(PartitionSession, start, end);
437-
if (!CurrentDeferredCommitOffset) {
438-
CurrentDeferredCommitOffset = std::make_pair(start, end);
437+
auto offsetsIt = CurrentDeferredCommitOffset.find(PartitionSession->GetPartitionId());
438+
if (offsetsIt == CurrentDeferredCommitOffset.end()) {
439+
CurrentDeferredCommitOffset[PartitionSession->GetPartitionId()] = std::make_pair(start, end);
439440
} else {
440-
CurrentDeferredCommitOffset->first = std::min(CurrentDeferredCommitOffset->first, start);
441-
CurrentDeferredCommitOffset->second = std::max(CurrentDeferredCommitOffset->second, end);
441+
auto& offsets = offsetsIt->second;
442+
offsets.first = std::min(offsets.first, start);
443+
offsets.second = std::max(offsets.second, end);
442444
}
443445
}
444446
PartitionToOffset[MakePartitionKey(PartitionSession)] = ranges.back().second;

0 commit comments

Comments
 (0)