Skip to content

YQ-3722 RD added messages accumulating #10380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ydb/core/fq/libs/config/protos/row_dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@ message TRowDispatcherCoordinatorConfig {
string CoordinationNodePath = 2;
bool LocalMode = 3; // Use only local row_dispatcher.
}

message TJsonParserConfig {
uint64 BatchSizeBytes = 1;
uint64 BatchCreationTimeoutMs = 2;
}

message TRowDispatcherConfig {
bool Enabled = 1;
uint64 TimeoutBeforeStartSessionSec = 2;
uint64 SendStatusPeriodSec = 3;
uint64 MaxSessionUsedMemory = 4;
bool WithoutConsumer = 5;
TJsonParserConfig JsonParser = 7;
TRowDispatcherCoordinatorConfig Coordinator = 6;
}
18 changes: 9 additions & 9 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
TVector<NYT::TNode> Schemas;
};

class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>&>> {
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>> {
public:
TFilterInputConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -92,7 +92,7 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, c
}
}

void OnObject(std::pair<ui64, const TVector<TVector<std::string_view>>&> values) override {
void OnObject(std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&> values) override {
Y_ENSURE(FieldsPositions.size() == values.second.size());

NKikimr::NMiniKQL::TThrowingBindTerminator bind;
Expand All @@ -108,7 +108,7 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, c
static_cast<ui32>(values.second.size() + 1),
items);

items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first++);
items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first[rowId]);

size_t fieldId = 0;
for (const auto& column : values.second) {
Expand Down Expand Up @@ -200,7 +200,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
static constexpr bool IsPartial = false;
static constexpr bool SupportPushStreamMode = true;

using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>&>>>;
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>>;

static TConsumerType MakeConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -242,9 +242,9 @@ class TJsonFilter::TImpl {
LOG_ROW_DISPATCHER_DEBUG("Program created");
}

void Push(ui64 offset, const TVector<TVector<std::string_view>>& values) {
void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
Y_ENSURE(values, "Expected non empty schema");
InputConsumer->OnObject(std::make_pair(offset, values));
InputConsumer->OnObject(std::make_pair(offsets, values));
}

TString GetSql() const {
Expand Down Expand Up @@ -277,7 +277,7 @@ class TJsonFilter::TImpl {

private:
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>&>>> InputConsumer;
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>> InputConsumer;
const TString Sql;
};

Expand All @@ -292,8 +292,8 @@ TJsonFilter::TJsonFilter(
TJsonFilter::~TJsonFilter() {
}

void TJsonFilter::Push(ui64 offset, const TVector<TVector<std::string_view>>& values) {
Impl->Push(offset, values);
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
Impl->Push(offsets, values);
}

TString TJsonFilter::GetSql() {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/row_dispatcher/json_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TJsonFilter {

~TJsonFilter();

void Push(ui64 offset, const TVector<TVector<std::string_view>>& values);
void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values);
TString GetSql();

private:
Expand Down
184 changes: 126 additions & 58 deletions ydb/core/fq/libs/row_dispatcher/json_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,49 +10,72 @@ namespace {

TString LogPrefix = "JsonParser: ";

} // anonymous namespace
struct TJsonParserBuffer {
size_t NumberValues = 0;
bool Finished = false;
TInstant CreationStartTime = TInstant::Now();
TVector<ui64> Offsets = {};

bool IsReady() const {
return !Finished && NumberValues > 0;
}

namespace NFq {
size_t GetSize() const {
return Values.size();
}

//// TParserBuffer
void Reserve(size_t size, size_t numberValues) {
Values.reserve(2 * (size + simdjson::SIMDJSON_PADDING));
Offsets.reserve(numberValues);
}

TJsonParserBuffer::TJsonParserBuffer()
: NumberValues(0)
, Finished(false)
{}
void AddMessages(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages) {
Y_ENSURE(!Finished, "Cannot add messages into finished buffer");

void TJsonParserBuffer::Reserve(size_t size) {
Y_ENSURE(!Finished, "Cannot reserve finished buffer");
Values.reserve(2 * (size + simdjson::SIMDJSON_PADDING));
}
size_t messagesSize = 0;
for (const auto& message : messages) {
messagesSize += message.GetData().size();
}

void TJsonParserBuffer::AddValue(const TString& value) {
Y_ENSURE(!Finished, "Cannot add value into finished buffer");
NumberValues++;
Values << value;
}
NumberValues += messages.size();
Reserve(Values.size() + messagesSize, NumberValues);
for (const auto& message : messages) {
Values << message.GetData();
Offsets.emplace_back(message.GetOffset());
}
}

std::string_view TJsonParserBuffer::AddHolder(std::string_view value) {
Y_ENSURE(Values.size() + value.size() <= Values.capacity(), "Requested too large holders");
const size_t startPos = Values.size();
Values << value;
return std::string_view(Values).substr(startPos, value.length());
}
std::string_view AddHolder(std::string_view value) {
Y_ENSURE(Values.size() + value.size() <= Values.capacity(), "Requested too large holders");
const size_t startPos = Values.size();
Values << value;
return std::string_view(Values).substr(startPos, value.length());
}

std::pair<const char*, size_t> TJsonParserBuffer::Finish() {
Y_ENSURE(!Finished, "Cannot finish buffer twice");
Finished = true;
Values << TString(simdjson::SIMDJSON_PADDING, ' ');
Values.reserve(2 * Values.size());
return {Values.data(), Values.size()};
}
std::pair<const char*, size_t> Finish() {
Y_ENSURE(!Finished, "Cannot finish buffer twice");
Finished = true;
Values << TString(simdjson::SIMDJSON_PADDING, ' ');
Values.reserve(2 * Values.size());
return {Values.data(), Values.size()};
}

void TJsonParserBuffer::Clear() {
Y_ENSURE(Finished, "Cannot clear not finished buffer");
NumberValues = 0;
Finished = false;
Values.clear();
}
void Clear() {
Y_ENSURE(Finished, "Cannot clear not finished buffer");
NumberValues = 0;
Finished = false;
CreationStartTime = TInstant::Now();
Values.clear();
Offsets.clear();
}

private:
TStringBuilder Values = {};
};

} // anonymous namespace

namespace NFq {

//// TJsonParser

Expand All @@ -63,10 +86,13 @@ class TJsonParser::TImpl {
};

public:
TImpl(const TVector<TString>& columns, const TVector<TString>& types)
: ParsedValues(columns.size())
TImpl(const TVector<TString>& columns, const TVector<TString>& types, ui64 batchSize, TDuration batchCreationTimeout)
: BatchSize(batchSize)
, BatchCreationTimeout(batchCreationTimeout)
, ParsedValues(columns.size())
{
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
LOG_ROW_DISPATCHER_INFO("Simdjson active implementation " << simdjson::get_active_implementation()->name());

Columns.reserve(columns.size());
for (size_t i = 0; i < columns.size(); i++) {
Expand All @@ -80,22 +106,51 @@ class TJsonParser::TImpl {
for (size_t i = 0; i < columns.size(); i++) {
ColumnsIndex.emplace(std::string_view(Columns[i].Name), i);
}

Buffer.Reserve(BatchSize, 1);
Parser.threaded = false;
}

bool IsReady() const {
return Buffer.IsReady() && (Buffer.GetSize() >= BatchSize || TInstant::Now() - Buffer.CreationStartTime >= BatchCreationTimeout);
}

TInstant GetCreationDeadline() const {
return Buffer.IsReady() ? Buffer.CreationStartTime + BatchCreationTimeout : TInstant::Zero();
}

size_t GetNumberValues() const {
return Buffer.IsReady() ? Buffer.NumberValues : 0;
}

const TVector<ui64>& GetOffsets() {
return Buffer.Offsets;
}

void AddMessages(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages) {
if (messages.empty()) {
return;
}

if (Buffer.Finished) {
Buffer.Clear();
}
Buffer.AddMessages(messages);
}

const TVector<TVector<std::string_view>>& Parse() {
Y_ENSURE(Buffer.IsReady(), "Nothing to parse");

const auto [values, size] = Buffer.Finish();
LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values);

for (auto& parsedColumn : ParsedValues) {
parsedColumn.clear();
parsedColumn.reserve(Buffer.GetNumberValues());
parsedColumn.reserve(Buffer.NumberValues);
}

simdjson::ondemand::parser parser;
parser.threaded = false;

size_t rowId = 0;
simdjson::ondemand::document_stream documents = parser.iterate_many(values, size, simdjson::dom::DEFAULT_BATCH_SIZE);
simdjson::ondemand::document_stream documents = Parser.iterate_many(values, size, simdjson::dom::DEFAULT_BATCH_SIZE);
for (auto document : documents) {
for (auto item : document.get_object()) {
const auto it = ColumnsIndex.find(item.escaped_key().value());
Expand Down Expand Up @@ -126,27 +181,20 @@ class TJsonParser::TImpl {
}
rowId++;
}
Y_ENSURE(rowId == Buffer.GetNumberValues(), "Unexpected number of json documents");
Y_ENSURE(rowId == Buffer.NumberValues, "Unexpected number of json documents");

for (auto& parsedColumn : ParsedValues) {
parsedColumn.resize(Buffer.GetNumberValues());
parsedColumn.resize(Buffer.NumberValues);
}
return ParsedValues;
}

TJsonParserBuffer& GetBuffer() {
if (Buffer.GetFinished()) {
Buffer.Clear();
}
return Buffer;
}

TString GetDescription() const {
TStringBuilder description = TStringBuilder() << "Columns: ";
for (const auto& column : Columns) {
description << "'" << column.Name << "':" << column.Type << " ";
}
description << "\nBuffer size: " << Buffer.GetNumberValues() << ", finished: " << Buffer.GetFinished();
description << "\nNumber values in buffer: " << Buffer.NumberValues << ", buffer size: " << Buffer.GetSize() << ", finished: " << Buffer.Finished;
return description;
}

Expand Down Expand Up @@ -182,22 +230,42 @@ class TJsonParser::TImpl {
}

private:
const ui64 BatchSize;
const TDuration BatchCreationTimeout;
TVector<TColumnDescription> Columns;
absl::flat_hash_map<std::string_view, size_t> ColumnsIndex;

TJsonParserBuffer Buffer;
simdjson::ondemand::parser Parser;

TVector<TVector<std::string_view>> ParsedValues;
};

TJsonParser::TJsonParser(const TVector<TString>& columns, const TVector<TString>& types)
: Impl(std::make_unique<TJsonParser::TImpl>(columns, types))
TJsonParser::TJsonParser(const TVector<TString>& columns, const TVector<TString>& types, ui64 batchSize, TDuration batchCreationTimeout)
: Impl(std::make_unique<TJsonParser::TImpl>(columns, types, batchSize, batchCreationTimeout))
{}

TJsonParser::~TJsonParser() {
}

TJsonParserBuffer& TJsonParser::GetBuffer() {
return Impl->GetBuffer();
void TJsonParser::AddMessages(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages) {
Impl->AddMessages(messages);
}

bool TJsonParser::IsReady() const {
return Impl->IsReady();
}

TInstant TJsonParser::GetCreationDeadline() const {
return Impl->GetCreationDeadline();
}

size_t TJsonParser::GetNumberValues() const {
return Impl->GetNumberValues();
}

const TVector<ui64>& TJsonParser::GetOffsets() const {
return Impl->GetOffsets();
}

const TVector<TVector<std::string_view>>& TJsonParser::Parse() {
Expand All @@ -212,8 +280,8 @@ TString TJsonParser::GetDebugString(const TVector<TVector<std::string_view>>& pa
return Impl->GetDebugString(parsedValues);
}

std::unique_ptr<TJsonParser> NewJsonParser(const TVector<TString>& columns, const TVector<TString>& types) {
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, types));
std::unique_ptr<TJsonParser> NewJsonParser(const TVector<TString>& columns, const TVector<TString>& types, ui64 batchSize, TDuration batchCreationTimeout) {
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, types, batchSize, batchCreationTimeout));
}

} // namespace NFq
Loading
Loading