Skip to content

YQ-3975 RD fixed fault for parsing errors without filter #12755

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,7 @@ class TTopicFilters : public ITopicFilters {
continue;
}

if (filterHandler.GetPurecalcFilter()) {
PushToFilter(filterHandler, offsets, columnIndex, values, numberRows);
continue;
}

// Clients without filters
LOG_ROW_DISPATCHER_TRACE("Add " << numberRows << " rows to client " << consumer->GetFilterId() << " without filtering");
for (ui64 rowId = 0; rowId < numberRows; ++rowId) {
consumer->OnFilteredData(rowId);
}
PushToFilter(filterHandler, offsets, columnIndex, values, numberRows);
}
Stats.AddFilterLatency(TInstant::Now() - startFilter);
}
Expand Down Expand Up @@ -193,7 +184,9 @@ class TTopicFilters : public ITopicFilters {
LOG_ROW_DISPATCHER_TRACE("Create filter with id " << filter->GetFilterId());

IPurecalcFilter::TPtr purecalcFilter;
if (filter->GetWhereFilter()) {
if (const auto& predicate = filter->GetWhereFilter()) {
LOG_ROW_DISPATCHER_TRACE("Create purecalc filter for predicate '" << predicate << "' (filter id: " << filter->GetFilterId() << ")");

auto filterStatus = CreatePurecalcFilter(filter);
if (filterStatus.IsFail()) {
return filterStatus;
Expand Down Expand Up @@ -225,9 +218,6 @@ class TTopicFilters : public ITopicFilters {

private:
void PushToFilter(const TFilterHandler& filterHandler, const TVector<ui64>& offsets, const TVector<ui64>& columnIndex, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) {
const auto filter = filterHandler.GetPurecalcFilter();
Y_ENSURE(filter, "Expected initialized filter");

const auto consumer = filterHandler.GetConsumer();
const auto& columnIds = consumer->GetColumnIds();

Expand All @@ -246,8 +236,13 @@ class TTopicFilters : public ITopicFilters {
}
}

LOG_ROW_DISPATCHER_TRACE("Pass " << numberRows << " rows to purecalc filter (client id: " << consumer->GetFilterId() << ")");
filter->FilterData(result, numberRows);
if (const auto filter = filterHandler.GetPurecalcFilter()) {
LOG_ROW_DISPATCHER_TRACE("Pass " << numberRows << " rows to purecalc filter (filter id: " << consumer->GetFilterId() << ")");
filter->FilterData(result, numberRows);
} else if (numberRows) {
LOG_ROW_DISPATCHER_TRACE("Add " << numberRows << " rows to client " << consumer->GetFilterId() << " without filtering");
consumer->OnFilteredBatch(0, numberRows - 1);
}
}

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class IFilteredDataConsumer : public IPurecalcFilterConsumer {
virtual const TVector<ui64>& GetColumnIds() const = 0;
virtual TMaybe<ui64> GetNextMessageOffset() const = 0;

virtual void OnFilteredBatch(ui64 firstRow, ui64 lastRow) = 0; // inclusive interval [firstRow, lastRow]

virtual void OnFilterStarted() = 0;
virtual void OnFilteringError(TStatus status) = 0;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
Client->StartClientSession();
}

void OnFilteredBatch(ui64 firstRow, ui64 lastRow) override {
LOG_ROW_DISPATCHER_TRACE("OnFilteredBatch, rows [" << firstRow << ", " << lastRow << "]");
for (ui64 rowId = firstRow; rowId <= lastRow; ++rowId) {
OnFilteredData(rowId);
}
}

void OnFilteredData(ui64 rowId) override {
const ui64 offset = Self.Offsets->at(rowId);
if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && offset < *nextOffset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,23 +414,23 @@ class TJsonParser : public TTopicParserBase {

simdjson::ondemand::document_stream documents;
CHECK_JSON_ERROR(Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE).get(documents)) {
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error));
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)));
}

size_t rowId = 0;
for (auto document : documents) {
if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) {
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1);
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1 << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size)));
}

const ui64 offset = Buffer.Offsets[rowId];
CHECK_JSON_ERROR(document.error()) {
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error));
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)));
}

for (auto item : document.get_object()) {
CHECK_JSON_ERROR(item.error()) {
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error));
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)));
}

const auto it = ColumnsIndex.find(item.escaped_key().value());
Expand All @@ -445,7 +445,7 @@ class TJsonParser : public TTopicParserBase {
}

if (Y_UNLIKELY(rowId != Buffer.NumberValues)) {
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId);
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size)));
}

const ui64 firstOffset = Buffer.Offsets.front();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class TOptionalCell : public TBaseFixture::ICell {

void Validate(const NYql::NUdf::TUnboxedValue& parsedValue) const override {
if (!parsedValue) {
UNIT_FAIL("Unexpected NULL value for optional cell");
return;
}
Value->Validate(parsedValue.GetOptionalValue());
Expand Down Expand Up @@ -166,7 +167,7 @@ void TBaseFixture::SetUp(NUnitTest::TTestContext&) {
TAutoPtr<NKikimr::TAppPrepare> app = new NKikimr::TAppPrepare();
Runtime.SetLogBackend(NActors::CreateStderrBackend());
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NActors::NLog::PRI_TRACE);
Runtime.SetDispatchTimeout(TDuration::Seconds(5));
Runtime.SetDispatchTimeout(WAIT_TIMEOUT);
Runtime.Initialize(app->Unwrap());

// Init tls context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

namespace NFq::NRowDispatcher::NTests {

static constexpr TDuration WAIT_TIMEOUT = TDuration::Seconds(20);

class TBaseFixture : public NUnitTest::TBaseFixture, public TTypeParser {
public:
// Helper classes for checking serialized rows in multi type format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class TFormatHadlerFixture : public TBaseFixture {
FormatHandler = CreateTestFormatHandler(config, settings);
}

TStatus MakeClient(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback, ui64 expectedFilteredRows = 1) {
[[nodiscard]] TStatus MakeClient(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback, ui64 expectedFilteredRows = 1) {
ClientIds.emplace_back(ClientIds.size(), 0, 0, 0);

auto client = MakeIntrusive<TClientDataConsumer>(ClientIds.back(), columns, whereFilter, callback, expectedFilteredRows);
Expand Down Expand Up @@ -202,6 +202,30 @@ class TFormatHadlerFixture : public TBaseFixture {
FormatHandler->RemoveClient(clientId);
}

public:
static TCallback EmptyCheck() {
return [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {};
}

static TCallback OneBatchCheck(std::function<void(TRope&& messages, TVector<ui64>&& offsets)> callback) {
return [callback](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
auto [messages, offsets] = data.front();

UNIT_ASSERT(!offsets.empty());
callback(std::move(messages), std::move(offsets));
};
}

TCallback OneRowCheck(ui64 offset, const TRow& row) const {
return OneBatchCheck([this, offset, row](TRope&& messages, TVector<ui64>&& offsets) {
UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(offsets.front(), offset);

CheckMessageBatch(messages, TBatch().AddRow(row));
});
}

private:
void ExtractClientsData() {
for (auto& client : Clients) {
Expand Down Expand Up @@ -233,33 +257,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
CheckSuccess(MakeClient(
{commonColumn, {"col_first", "[DataType; String]"}},
"WHERE col_first = \"str_first__large__\"",
[&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);

auto [messages, offsets] = data.front();
UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset + 1);

CheckMessageBatch(messages, TBatch().AddRow(
TRow().AddString("event2").AddString("str_first__large__")
));
}
OneRowCheck(firstOffset + 1, TRow().AddString("event2").AddString("str_first__large__"))
));

CheckSuccess(MakeClient(
{commonColumn, {"col_second", "[DataType; String]"}},
"WHERE col_second = \"str_second\"",
[&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);

auto [messages, offsets] = data.front();
UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset);

CheckMessageBatch(messages, TBatch().AddRow(
TRow().AddString("event1").AddString("str_second")
));
}
OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second"))
));

ParseMessages({
Expand Down Expand Up @@ -288,14 +292,10 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
R"({"col_a": false, "col_b": {"X": "Y"}})"
};

CheckSuccess(MakeClient(schema, "WHERE FALSE", [&](TQueue<std::pair<TRope, TVector<ui64>>>&&) {}, 0));

auto trueChacker = [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
auto [messages, offsets] = data.front();
CheckSuccess(MakeClient(schema, "WHERE FALSE", EmptyCheck(), 0));

const auto trueChacker = OneBatchCheck([&](TRope&& messages, TVector<ui64>&& offsets) {
TBatch expectedBatch;
UNIT_ASSERT(!offsets.empty());
for (ui64 offset : offsets) {
UNIT_ASSERT(offset - firstOffset < testData.size());
expectedBatch.AddRow(
Expand All @@ -304,7 +304,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
}

CheckMessageBatch(messages, expectedBatch);
};
});
CheckSuccess(MakeClient(schema, "WHERE TRUE", trueChacker, 3));
CheckSuccess(MakeClient(schema, "", trueChacker, 2));

Expand All @@ -323,7 +323,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
Y_UNIT_TEST_F(ClientValidation, TFormatHadlerFixture) {
const TVector<TSchemaColumn> schema = {{"data", "[DataType; String]"}};
const TString filter = "WHERE FALSE";
const auto callback = [&](TQueue<std::pair<TRope, TVector<ui64>>>&&) {};
const auto callback = EmptyCheck();
CheckSuccess(MakeClient(schema, filter, callback, 0));

CheckError(
Expand All @@ -349,27 +349,12 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
const ui64 firstOffset = 42;
const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"};

CheckSuccess(MakeClient(
{commonColumn, {"col_first", "[OptionalType; [DataType; Uint8]]"}},
"WHERE TRUE",
[&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {},
0
));
CheckSuccess(MakeClient({commonColumn, {"col_first", "[OptionalType; [DataType; Uint8]]"}}, "WHERE TRUE", EmptyCheck(), 0));

CheckSuccess(MakeClient(
{commonColumn, {"col_second", "[DataType; String]"}},
"WHERE col_second = \"str_second\"",
[&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);

auto [messages, offsets] = data.front();
UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset);

CheckMessageBatch(messages, TBatch().AddRow(
TRow().AddString("event1").AddString("str_second")
));
}
OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second"))
));

CheckClientError(
Expand All @@ -379,6 +364,26 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
TStringBuilder() << "Failed to parse json string at offset " << firstOffset << ", got parsing error for column 'col_first' with type [OptionalType; [DataType; Uint8]]"
);
}

Y_UNIT_TEST_F(ClientErrorWithEmptyFilter, TFormatHadlerFixture) {
const ui64 firstOffset = 42;
const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"};

CheckSuccess(MakeClient({commonColumn, {"col_first", "[DataType; String]"}}, "", EmptyCheck(), 0));

CheckSuccess(MakeClient(
{commonColumn, {"col_second", "[DataType; String]"}},
"WHERE col_second = \"str_second\"",
OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second"))
));

CheckClientError(
{GetMessage(firstOffset, R"({"com_col": "event1", "col_second": "str_second"})")},
ClientIds[0],
EStatusId::PRECONDITION_FAILED,
TStringBuilder() << "Failed to parse json messages, found 1 missing values from offset " << firstOffset << " in non optional column 'col_first' with type [DataType; String]"
);
}
}

} // namespace NFq::NRowDispatcher::NTests
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ class TFiterFixture : public TBaseFixture {
}
}

void OnFilteredBatch(ui64 firstRow, ui64 lastRow) override {
UNIT_ASSERT_C(Started, "Unexpected data for not started filter");
for (ui64 rowId = firstRow; rowId <= lastRow; ++rowId) {
Callback(rowId);
}
}

void OnFilteredData(ui64 rowId) override {
UNIT_ASSERT_C(Started, "Unexpected data for not started filter");
Callback(rowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,9 @@ Y_UNIT_TEST_SUITE(TestJsonParser) {
Y_UNIT_TEST_F(JsonStructureValidation, TJsonParserFixture) {
CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; String]]"}}));
CheckColumnError(R"({"a1": Yelse})", 0, EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a1' with type [OptionalType; [DataType; String]] subissue: { <main>: Error: Failed to determine json value type, current token: 'Yelse', error: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc. }");
CheckBatchError(R"({"a1": "st""r"})", EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << FIRST_OFFSET + 1 << ", json item was corrupted: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc.");
CheckBatchError(R"({"a1": "x"} {"a1": "y"})", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 2 << " but got 2");
CheckBatchError(R"({)", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 3 << " but got 0");
CheckBatchError(R"({"a1": "st""r"})", EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << FIRST_OFFSET + 1 << ", json item was corrupted: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc. Current data batch: {\"a1\": \"st\"\"r\"}");
CheckBatchError(R"({"a1": "x"} {"a1": "y"})", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 2 << " but got 2 (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: {\"a1\": \"x\"} {\"a1\": \"y\"}");
CheckBatchError(R"({)", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 3 << " but got 0 (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: {");
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ void TTopicSession::StartClientSession(TClientsInfo& info) {

void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
const auto& source = ev->Get()->Record.GetSource();
LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: " << source.GetPredicate() << ", offset: " << ev->Get()->Record.GetOffset());
LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: '" << source.GetPredicate() << "', offset: " << ev->Get()->Record.GetOffset());

if (!CheckNewClient(ev)) {
return;
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ namespace {
using namespace NKikimr;
using namespace NYql::NDq;

const ui64 TimeoutBeforeStartSessionSec = 3;
const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;
constexpr ui64 TimeoutBeforeStartSessionSec = 3;
constexpr ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;
static_assert(GrabTimeoutSec <= WAIT_TIMEOUT.Seconds());

class TFixture : public NTests::TBaseFixture {
public:
Expand Down
Loading