Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,6 @@ class TKqpExecuterBase : public TActor<TDerived> {
size_t Size() const {
return Proto.GetChannelData().GetData().GetRaw().size() + Payload.size();
}

ui32 RowCount() const {
return Proto.GetChannelData().GetData().GetRows();
}
};

void HandleChannelData(NYql::NDq::TEvDqCompute::TEvChannelData::TPtr& ev) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/runtime/kqp_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void TKqpProtoBuilder::BuildYdbResultSet(
}
NDq::TDqDataSerializer dataSerializer(*TypeEnv, *HolderFactory, transportVersion);
for (auto& part : data) {
if (part.RowCount()) {
if (part.ChunkCount()) {
TUnboxedValueBatch rows(mkqlSrcRowType);
dataSerializer.Deserialize(std::move(part), mkqlSrcRowType, rows);
rows.ForEachRow([&](const NUdf::TUnboxedValue& value) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const
NDq::TDqSerializedBatch outputData;
auto fetchStatus = FetchOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), outputData);
MKQL_ENSURE_S(fetchStatus == NUdf::EFetchStatus::Finish);
MKQL_ENSURE_S(outputData.Proto.GetRows() == 0);
MKQL_ENSURE_S(outputData.Proto.GetChunks() == 0);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelData::TPtr& ev)
LOG_T("Received input for channelId: " << channelId
<< ", seqNo: " << record.GetSeqNo()
<< ", size: " << channelData.Proto.GetData().GetRaw().size()
<< ", rows: " << channelData.Proto.GetData().GetRows()
<< ", chunks: " << channelData.Proto.GetData().GetChunks()
<< ", watermark: " << channelData.Proto.HasWatermark()
<< ", checkpoint: " << channelData.Proto.HasCheckpoint()
<< ", finished: " << channelData.Proto.GetFinished()
Expand Down Expand Up @@ -177,7 +177,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelDataAck::TPtr&
if (record.GetFinish()) {
auto it = outputChannel.InFlight.begin();
while (it != outputChannel.InFlight.end()) {
outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.RowCount());
outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.ChunkCount());
it = outputChannel.InFlight.erase(it);
}
outputChannel.RetryState.reset();
Expand All @@ -190,7 +190,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelDataAck::TPtr&
// remove all messages with seqNo <= ackSeqNo
auto it = outputChannel.InFlight.begin();
while (it != outputChannel.InFlight.end() && it->first <= record.GetSeqNo()) {
outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.RowCount());
outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.ChunkCount());
it = outputChannel.InFlight.erase(it);
}

Expand Down Expand Up @@ -549,14 +549,14 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, con
YQL_ENSURE(!outputChannel.RetryState);

const ui64 seqNo = ++outputChannel.LastSentSeqNo;
const ui32 chunkBytes = channelData.PayloadSize();
const ui32 chunkRows = channelData.RowCount();
const ui32 dataBytes = channelData.PayloadSize();
const ui32 dataChunks = channelData.ChunkCount();
const bool finished = channelData.Proto.GetFinished();

LOG_T("SendChannelData, channelId: " << channelData.Proto.GetChannelId()
<< ", peer: " << *outputChannel.Peer
<< ", rows: " << chunkRows
<< ", bytes: " << chunkBytes
<< ", chunks: " << dataChunks
<< ", bytes: " << dataBytes
<< ", watermark: " << channelData.Proto.HasWatermark()
<< ", checkpoint: " << channelData.Proto.HasCheckpoint()
<< ", seqNo: " << seqNo
Expand Down Expand Up @@ -588,7 +588,7 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, con
dataEv->Record.SetNoAck(!needAck);
Send(*outputChannel.Peer, dataEv.Release(), flags, /* cookie */ outputChannel.ChannelId);

outputChannel.PeerState.AddInFlight(chunkBytes, chunkRows);
outputChannel.PeerState.AddInFlight(dataBytes, dataChunks);
}

bool TDqComputeActorChannels::PollChannel(ui64 channelId, i64 freeSpace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
}
}

bool DoHandleChannelsAfterFinishImpl() override final{
bool DoHandleChannelsAfterFinishImpl() override final{
Y_ABORT_UNLESS(this->Checkpoints);

if (this->Checkpoints->HasPendingCheckpoint() && !this->Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) {
Expand All @@ -83,7 +83,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo

auto channel = inputChannel->Channel;

if (channelData.RowCount()) {
if (channelData.ChunkCount()) {
TDqSerializedBatch batch;
batch.Proto = std::move(*channelData.Proto.MutableData());
batch.Payload = std::move(channelData.Payload);
Expand Down Expand Up @@ -211,7 +211,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
if (!limits.OutputChunkMaxSize) {
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
}

if (this->Task.GetEnableSpilling()) {
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback()));
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/actors/dq.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ struct TChannelDataOOB {
return Proto.GetData().GetRaw().size() + Payload.Size();
}

ui32 RowCount() const {
return Proto.GetData().GetRows();
ui32 ChunkCount() const {
return Proto.GetData().GetChunks();
}
};

Expand Down
10 changes: 5 additions & 5 deletions ydb/library/yql/dq/common/dq_serialized_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ void TDqSerializedBatch::ConvertToNoOOB() {
TChunkedBuffer SaveForSpilling(TDqSerializedBatch&& batch) {
TChunkedBuffer result;

ui32 transportversion = batch.Proto.GetTransportVersion();
ui32 rowCount = batch.Proto.GetRows();
ui32 transportVersion = batch.Proto.GetTransportVersion();
ui32 chunkCount = batch.Proto.GetChunks();

TChunkedBuffer protoPayload(std::move(*batch.Proto.MutableRaw()));

AppendNumber(result, transportversion);
AppendNumber(result, rowCount);
AppendNumber(result, transportVersion);
AppendNumber(result, chunkCount);
AppendNumber(result, protoPayload.Size());
result.Append(std::move(protoPayload));
AppendNumber(result, batch.Payload.Size());
Expand All @@ -84,7 +84,7 @@ TDqSerializedBatch LoadSpilled(TBuffer&& blob) {
TStringBuf source(sharedBuf->Data(), sharedBuf->Size());
TDqSerializedBatch result;
result.Proto.SetTransportVersion(ReadNumber<ui32>(source));
result.Proto.SetRows(ReadNumber<ui32>(source));
result.Proto.SetChunks(ReadNumber<ui32>(source));

size_t protoSize = ReadNumber<size_t>(source);
YQL_ENSURE(source.size() >= protoSize, "Premature end of spilled data");
Expand Down
6 changes: 5 additions & 1 deletion ydb/library/yql/dq/common/dq_serialized_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ struct TDqSerializedBatch {
return Proto.GetRaw().size() + Payload.Size();
}

ui32 ChunkCount() const {
return Proto.GetChunks();
}

ui32 RowCount() const {
return Proto.GetRows();
return Proto.GetChunks(); // FIXME with Rows
}

void Clear() {
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/proto/dq_transport.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ enum EDataTransportVersion {
message TData {
uint32 TransportVersion = 1;
bytes Raw = 2;
uint32 Rows = 3;
uint32 Chunks = 3;
optional uint32 PayloadId = 4;
}
7 changes: 4 additions & 3 deletions ydb/library/yql/dq/runtime/dq_input_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TDqInputChannel : public IDqInputChannel {

void PushImpl(TDqSerializedBatch&& data) {
const i64 space = data.Size();
const size_t rowCount = data.RowCount();
const size_t chunkCount = data.ChunkCount();
auto inputType = Impl.GetInputType();
NKikimr::NMiniKQL::TUnboxedValueBatch batch(inputType);
if (Y_UNLIKELY(PushStats.CollectProfile())) {
Expand All @@ -58,7 +58,8 @@ class TDqInputChannel : public IDqInputChannel {
DataSerializer.Deserialize(std::move(data), inputType, batch);
}

YQL_ENSURE(batch.RowCount() == rowCount);
// single batch row is chunk and may be Arrow block
YQL_ENSURE(batch.RowCount() == chunkCount);
Impl.AddBatch(std::move(batch), space);
}

Expand Down Expand Up @@ -123,7 +124,7 @@ class TDqInputChannel : public IDqInputChannel {

void Push(TDqSerializedBatch&& data) override {
YQL_ENSURE(!Impl.IsFinished(), "input channel " << PushStats.ChannelId << " already finished");
if (Y_UNLIKELY(data.Proto.GetRows() == 0)) {
if (Y_UNLIKELY(data.Proto.GetChunks() == 0)) {
return;
}
StoredSerializedBytes += data.Size();
Expand Down
62 changes: 31 additions & 31 deletions ydb/library/yql/dq/runtime/dq_output_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class TDqOutputChannel : public IDqOutputChannel {
}

ui64 GetValuesCount() const override {
return SpilledRowCount + PackedRowCount + ChunkRowCount;
return SpilledChunkCount + PackedChunkCount + PackerCurrentChunkCount;
}

const TDqOutputStats& GetPushStats() const override {
Expand Down Expand Up @@ -110,7 +110,7 @@ class TDqOutputChannel : public IDqOutputChannel {
values[i] = {};
}

ChunkRowCount++;
PackerCurrentChunkCount++;

size_t packerSize = Packer.PackedSizeEstimate();
if (packerSize >= MaxChunkBytes) {
Expand All @@ -120,9 +120,9 @@ class TDqOutputChannel : public IDqOutputChannel {
PushStats.Bytes += Data.back().Buffer.Size();
}
PackedDataSize += Data.back().Buffer.Size();
PackedRowCount += ChunkRowCount;
Data.back().RowCount = ChunkRowCount;
ChunkRowCount = 0;
PackedChunkCount += PackerCurrentChunkCount;
Data.back().ChunkCount = PackerCurrentChunkCount;
PackerCurrentChunkCount = 0;
packerSize = 0;
}

Expand All @@ -133,23 +133,23 @@ class TDqOutputChannel : public IDqOutputChannel {

TDqSerializedBatch data;
data.Proto.SetTransportVersion(TransportVersion);
data.Proto.SetRows(head.RowCount);
data.Proto.SetChunks(head.ChunkCount);
data.SetPayload(std::move(head.Buffer));
Storage->Put(NextStoredId++, SaveForSpilling(std::move(data)));

PackedDataSize -= bufSize;
PackedRowCount -= head.RowCount;
PackedChunkCount -= head.ChunkCount;

SpilledRowCount += head.RowCount;
SpilledChunkCount += head.ChunkCount;

if (PopStats.CollectFull()) {
PopStats.SpilledRows += head.RowCount;
PopStats.SpilledBytes += bufSize + sizeof(head.RowCount);
PopStats.SpilledRows += head.ChunkCount; // FIXME with RowCount
PopStats.SpilledBytes += bufSize + sizeof(head.ChunkCount);
PopStats.SpilledBlobs++;
}

Data.pop_front();
LOG("Data spilled. Total rows spilled: " << SpilledRowCount << ", bytesInMemory: " << (PackedDataSize + packerSize));
LOG("Data spilled. Total rows spilled: " << SpilledChunkCount << ", bytesInMemory: " << (PackedDataSize + packerSize)); // FIXME with RowCount
}

if (IsFull() || FirstStoredId < NextStoredId) {
Expand All @@ -158,7 +158,7 @@ class TDqOutputChannel : public IDqOutputChannel {

if (PopStats.CollectFull()) {
PopStats.MaxMemoryUsage = std::max(PopStats.MaxMemoryUsage, PackedDataSize + packerSize);
PopStats.MaxRowsInMemory = std::max(PopStats.MaxRowsInMemory, PackedRowCount);
PopStats.MaxRowsInMemory = std::max(PopStats.MaxRowsInMemory, PackedChunkCount);
}
}

Expand Down Expand Up @@ -195,18 +195,18 @@ class TDqOutputChannel : public IDqOutputChannel {
}
++FirstStoredId;
data = LoadSpilled(std::move(blob));
SpilledRowCount -= data.RowCount();
SpilledChunkCount -= data.ChunkCount();
} else if (!Data.empty()) {
auto& packed = Data.front();
PackedRowCount -= packed.RowCount;
PackedChunkCount -= packed.ChunkCount;
PackedDataSize -= packed.Buffer.Size();
data.Proto.SetRows(packed.RowCount);
data.Proto.SetChunks(packed.ChunkCount);
data.SetPayload(std::move(packed.Buffer));
Data.pop_front();
} else {
data.Proto.SetRows(ChunkRowCount);
data.Proto.SetChunks(PackerCurrentChunkCount);
data.SetPayload(FinishPackAndCheckSize());
ChunkRowCount = 0;
PackerCurrentChunkCount = 0;
}

DLOG("Took " << data.RowCount() << " rows");
Expand Down Expand Up @@ -255,21 +255,21 @@ class TDqOutputChannel : public IDqOutputChannel {

data.Clear();
data.Proto.SetTransportVersion(TransportVersion);
if (SpilledRowCount == 0 && PackedRowCount == 0) {
data.Proto.SetRows(ChunkRowCount);
if (SpilledChunkCount == 0 && PackedChunkCount == 0) {
data.Proto.SetChunks(PackerCurrentChunkCount);
data.SetPayload(FinishPackAndCheckSize());
ChunkRowCount = 0;
PackerCurrentChunkCount = 0;
return true;
}

// Repack all - thats why PopAll should never be used
if (ChunkRowCount) {
if (PackerCurrentChunkCount) {
Data.emplace_back();
Data.back().Buffer = FinishPackAndCheckSize();
PackedDataSize += Data.back().Buffer.Size();
PackedRowCount += ChunkRowCount;
Data.back().RowCount = ChunkRowCount;
ChunkRowCount = 0;
PackedChunkCount += PackerCurrentChunkCount;
Data.back().ChunkCount = PackerCurrentChunkCount;
PackerCurrentChunkCount = 0;
}

NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType);
Expand All @@ -291,7 +291,7 @@ class TDqOutputChannel : public IDqOutputChannel {
});
}

data.Proto.SetRows(rows.RowCount());
data.Proto.SetChunks(rows.RowCount()); // 1 UVB "row" is Chunk
data.SetPayload(FinishPackAndCheckSize());
if (PopStats.CollectBasic()) {
PopStats.Bytes += data.Size();
Expand Down Expand Up @@ -332,7 +332,7 @@ class TDqOutputChannel : public IDqOutputChannel {
ui64 rows = GetValuesCount();
Data.clear();
Packer.Clear();
SpilledRowCount = PackedDataSize = PackedRowCount = ChunkRowCount = 0;
SpilledChunkCount = PackedDataSize = PackedChunkCount = PackerCurrentChunkCount = 0;
FirstStoredId = NextStoredId;
return rows;
}
Expand All @@ -358,18 +358,18 @@ class TDqOutputChannel : public IDqOutputChannel {

struct TSerializedBatch {
TChunkedBuffer Buffer;
ui64 RowCount = 0;
ui64 ChunkCount = 0;
};
std::deque<TSerializedBatch> Data;

size_t SpilledRowCount = 0;
size_t SpilledChunkCount = 0;
ui64 FirstStoredId = 0;
ui64 NextStoredId = 0;

size_t PackedDataSize = 0;
size_t PackedRowCount = 0;
size_t ChunkRowCount = 0;
size_t PackedChunkCount = 0;

size_t PackerCurrentChunkCount = 0;

bool Finished = false;

Expand Down
6 changes: 3 additions & 3 deletions ydb/library/yql/dq/runtime/dq_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ TDqSerializedBatch SerializeValue(NDqProto::EDataTransportVersion version, const

TDqSerializedBatch result;
result.Proto.SetTransportVersion(version);
result.Proto.SetRows(1);
result.Proto.SetChunks(1);
result.SetPayload(std::move(packResult));
return result;
}
Expand Down Expand Up @@ -87,7 +87,7 @@ TDqSerializedBatch SerializeBuffer(NDqProto::EDataTransportVersion version, cons

TDqSerializedBatch result;
result.Proto.SetTransportVersion(version);
result.Proto.SetRows(buffer.RowCount());
result.Proto.SetChunks(buffer.RowCount());
result.SetPayload(std::move(packResult));
return result;
}
Expand Down Expand Up @@ -176,7 +176,7 @@ NDqProto::TData TDqDataSerializer::SerializeParamValue(const TType* type, const
NDqProto::TData data;
data.SetTransportVersion(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
data.SetRaw(packResult.data(), packResult.size());
data.SetRows(1);
data.SetChunks(1);

return data;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/runtime/dq_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class TDqDataSerializer : private TNonCopyable {
}
TDqSerializedBatch result;
result.Proto.SetTransportVersion(TransportVersion);
result.Proto.SetRows(count);
result.Proto.SetChunks(count);
result.SetPayload(packer.Finish());
return result;
}
Expand Down
Loading