Skip to content

Commit 23d20bc

Browse files
feat(data_integrity_trails): generate traceId in grpc (#9297)
1 parent 2bb49d7 commit 23d20bc

File tree

9 files changed

+110
-24
lines changed

9 files changed

+110
-24
lines changed

ydb/core/grpc_services/base/base.h

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <ydb/core/grpc_streaming/grpc_streaming.h>
2727
#include <ydb/core/tx/scheme_board/events.h>
2828
#include <ydb/core/base/events.h>
29+
#include <ydb/core/util/ulid.h>
2930

3031
#include <ydb/library/actors/wilson/wilson_span.h>
3132

@@ -751,7 +752,12 @@ class TGRpcRequestBiStreamWrapper
751752
TGRpcRequestBiStreamWrapper(TIntrusivePtr<IStreamCtx> ctx, bool rlAllowed = true)
752753
: Ctx_(ctx)
753754
, RlAllowed_(rlAllowed)
754-
{ }
755+
, TraceId(GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER))
756+
{
757+
if (!TraceId) {
758+
TraceId = UlidGen.Next().ToString();
759+
}
760+
}
755761

756762
bool IsClientLost() const override {
757763
// TODO: Implement for BiDirectional streaming
@@ -852,7 +858,7 @@ class TGRpcRequestBiStreamWrapper
852858
}
853859

854860
TMaybe<TString> GetTraceId() const override {
855-
return GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER);
861+
return TraceId;
856862
}
857863

858864
NWilson::TTraceId GetWilsonTraceId() const override {
@@ -930,6 +936,8 @@ class TGRpcRequestBiStreamWrapper
930936
IGRpcProxyCounters::TPtr Counters_;
931937
NWilson::TSpan Span_;
932938
bool IsTracingDecided_ = false;
939+
TULIDGenerator UlidGen;
940+
TMaybe<TString> TraceId;
933941
};
934942

935943
template <typename TDerived>
@@ -1037,7 +1045,12 @@ class TGRpcRequestWrapperImpl
10371045

10381046
TGRpcRequestWrapperImpl(NYdbGrpc::IRequestContextBase* ctx)
10391047
: Ctx_(ctx)
1040-
{ }
1048+
, TraceId(GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER))
1049+
{
1050+
if (!TraceId) {
1051+
TraceId = UlidGen.Next().ToString();
1052+
}
1053+
}
10411054

10421055
const TMaybe<TString> GetYdbToken() const override {
10431056
return ExtractYdbToken(Ctx_->GetPeerMetaValues(NYdb::YDB_AUTH_TICKET_HEADER));
@@ -1169,7 +1182,7 @@ class TGRpcRequestWrapperImpl
11691182
}
11701183

11711184
TMaybe<TString> GetTraceId() const override {
1172-
return GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER);
1185+
return TraceId;
11731186
}
11741187

11751188
NWilson::TTraceId GetWilsonTraceId() const override {
@@ -1372,6 +1385,8 @@ class TGRpcRequestWrapperImpl
13721385
TAuditLogHook AuditLogHook;
13731386
bool RequestFinished = false;
13741387
bool IsTracingDecided_ = false;
1388+
TULIDGenerator UlidGen;
1389+
TMaybe<TString> TraceId;
13751390
};
13761391

13771392
template <ui32 TRpcId, typename TReq, typename TResp, bool IsOperation, typename TDerived>

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
276276

277277
KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
278278
FederatedQuerySetup, UserToken, GUCSettings, QueryServiceConfig, ApplicationName, AppData(ctx)->FunctionRegistry,
279-
false, false, std::move(TempTablesState), nullptr, SplitCtx);
279+
false, false, std::move(TempTablesState), nullptr, SplitCtx, UserRequestContext);
280280

281281
IKqpHost::TPrepareSettings prepareSettings;
282282
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;

ydb/core/kqp/gateway/kqp_gateway.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ class IKqpGateway : public NYql::IKikimrGateway {
206206

207207
virtual NThreading::TFuture<TQueryResult> ExecDataQueryAst(const TString& cluster, const TString& query,
208208
TQueryData::TPtr params, const TAstQuerySettings& settings,
209-
const Ydb::Table::TransactionSettings& txSettings) = 0;
209+
const Ydb::Table::TransactionSettings& txSettings, const TMaybe<TString>& traceId) = 0;
210210

211211
virtual NThreading::TFuture<TQueryResult> ExplainScanQueryAst(const TString& cluster, const TString& query) = 0;
212212

@@ -215,21 +215,21 @@ class IKqpGateway : public NYql::IKikimrGateway {
215215

216216
virtual NThreading::TFuture<TQueryResult> StreamExecDataQueryAst(const TString& cluster, const TString& query,
217217
TQueryData::TPtr, const TAstQuerySettings& settings,
218-
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target) = 0;
218+
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target, const TMaybe<TString>& traceId) = 0;
219219

220220
virtual NThreading::TFuture<TQueryResult> StreamExecScanQueryAst(const TString& cluster, const TString& query,
221221
TQueryData::TPtr, const TAstQuerySettings& settings, const NActors::TActorId& target,
222222
std::shared_ptr<NGRpcService::IRequestCtxMtSafe> rpcCtx) = 0;
223223

224224
virtual NThreading::TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query,
225225
TQueryData::TPtr params, const TAstQuerySettings& settings,
226-
const Ydb::Table::TransactionSettings& txSettings) = 0;
226+
const Ydb::Table::TransactionSettings& txSettings, const TMaybe<TString>& traceId) = 0;
227227

228228
virtual NThreading::TFuture<TQueryResult> ExplainGenericQuery(const TString& cluster, const TString& query) = 0;
229229

230230
virtual NThreading::TFuture<TQueryResult> StreamExecGenericQuery(const TString& cluster, const TString& query,
231231
TQueryData::TPtr params, const TAstQuerySettings& settings,
232-
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target) = 0;
232+
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target, const TMaybe<TString>& traceId) = 0;
233233
};
234234

235235
TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database,

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1874,7 +1874,8 @@ class TKikimrIcGateway : public IKqpGateway {
18741874

18751875
TFuture<TQueryResult> StreamExecDataQueryAst(const TString& cluster, const TString& query,
18761876
TQueryData::TPtr params, const TAstQuerySettings& settings,
1877-
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target) override
1877+
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target,
1878+
const TMaybe<TString>& traceId) override
18781879
{
18791880
YQL_ENSURE(cluster == Cluster);
18801881

@@ -1886,6 +1887,10 @@ class TKikimrIcGateway : public IKqpGateway {
18861887
ev->Record.SetUserToken(UserToken->GetSerializedToken());
18871888
}
18881889

1890+
if (traceId) {
1891+
ev->Record.SetTraceId(*traceId);
1892+
}
1893+
18891894
ev->Record.MutableRequest()->SetDatabase(Database);
18901895
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
18911896
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_AST_DML);
@@ -1973,7 +1978,8 @@ class TKikimrIcGateway : public IKqpGateway {
19731978
}
19741979

19751980
TFuture<TQueryResult> ExecDataQueryAst(const TString& cluster, const TString& query, TQueryData::TPtr params,
1976-
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings) override
1981+
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
1982+
const TMaybe<TString>& traceId) override
19771983
{
19781984
YQL_ENSURE(cluster == Cluster);
19791985

@@ -1985,6 +1991,10 @@ class TKikimrIcGateway : public IKqpGateway {
19851991
ev->Record.SetUserToken(UserToken->GetSerializedToken());
19861992
}
19871993

1994+
if (traceId) {
1995+
ev->Record.SetTraceId(*traceId);
1996+
}
1997+
19881998
ev->Record.MutableRequest()->SetDatabase(Database);
19891999
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
19902000
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_AST_DML);
@@ -2048,12 +2058,17 @@ class TKikimrIcGateway : public IKqpGateway {
20482058
}
20492059

20502060
TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query, TQueryData::TPtr params,
2051-
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings) override
2061+
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
2062+
const TMaybe<TString>& traceId) override
20522063
{
20532064
YQL_ENSURE(cluster == Cluster);
20542065

20552066
auto ev = MakeHolder<TEvKqp::TEvQueryRequest>();
20562067

2068+
if (traceId) {
2069+
ev->Record.SetTraceId(*traceId);
2070+
}
2071+
20572072
auto& request = *ev->Record.MutableRequest();
20582073
request.SetCollectStats(settings.CollectStats);
20592074

@@ -2074,7 +2089,8 @@ class TKikimrIcGateway : public IKqpGateway {
20742089

20752090
TFuture<TQueryResult> StreamExecGenericQuery(const TString& cluster, const TString& query,
20762091
TQueryData::TPtr params, const TAstQuerySettings& settings,
2077-
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target) override
2092+
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target,
2093+
const TMaybe<TString>& traceId) override
20782094
{
20792095
YQL_ENSURE(cluster == Cluster);
20802096

@@ -2086,6 +2102,10 @@ class TKikimrIcGateway : public IKqpGateway {
20862102
ev->Record.SetUserToken(UserToken->GetSerializedToken());
20872103
}
20882104

2105+
if (traceId) {
2106+
ev->Record.SetTraceId(*traceId);
2107+
}
2108+
20892109
ev->Record.MutableRequest()->SetDatabase(Database);
20902110
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
20912111
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
811811
TKiDataQueryBlocks dataQueryBlocks(query);
812812

813813
auto queryAstStr = SerializeExpr(ctx, *query);
814+
TMaybe<TString> traceId = SessionCtx->GetUserRequestContext() ? SessionCtx->GetUserRequestContext()->TraceId : TMaybe<TString>{};
814815

815816
bool useGenericQuery = ShouldUseGenericQuery(dataQueryBlocks);
816817
bool useScanQuery = ShouldUseScanQuery(dataQueryBlocks, settings);
@@ -828,7 +829,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
828829
future = Gateway->ExplainGenericQuery(Cluster, SessionCtx->Query().PreparingQuery->GetText());
829830
} else {
830831
future = Gateway->ExecGenericQuery(Cluster, SessionCtx->Query().PreparingQuery->GetText(), CollectParameters(query),
831-
querySettings, txSettings);
832+
querySettings, txSettings, traceId);
832833
}
833834
} else if (useScanQuery) {
834835
ui64 rowsLimit = 0;
@@ -850,7 +851,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
850851
future = Gateway->ExplainDataQueryAst(Cluster, queryAstStr);
851852
} else {
852853
future = Gateway->ExecDataQueryAst(Cluster, queryAstStr, CollectParameters(query),
853-
querySettings, txSettings);
854+
querySettings, txSettings, traceId);
854855
}
855856
}
856857
break;
@@ -860,7 +861,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
860861
txSettings.mutable_serializable_read_write();
861862

862863
future = Gateway->StreamExecGenericQuery(Cluster, SessionCtx->Query().PreparingQuery->GetText(), CollectParameters(query),
863-
querySettings, txSettings, SessionCtx->Query().ReplyTarget);
864+
querySettings, txSettings, SessionCtx->Query().ReplyTarget, traceId);
864865
} else if (useScanQuery) {
865866
future = Gateway->StreamExecScanQueryAst(Cluster, queryAstStr, CollectParameters(query),
866867
querySettings, SessionCtx->Query().ReplyTarget, SessionCtx->Query().RpcCtx);
@@ -869,7 +870,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
869870
txSettings.mutable_serializable_read_write();
870871

871872
future = Gateway->StreamExecDataQueryAst(Cluster, queryAstStr, CollectParameters(query),
872-
querySettings, txSettings, SessionCtx->Query().ReplyTarget);
873+
querySettings, txSettings, SessionCtx->Query().ReplyTarget, traceId);
873874
}
874875
break;
875876

@@ -1033,7 +1034,8 @@ class TKqpHost : public IKqpHost {
10331034
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
10341035
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
10351036
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, NActors::TActorSystem* actorSystem = nullptr,
1036-
NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig())
1037+
NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig(),
1038+
const TIntrusivePtr<TUserRequestContext>& userRequestContext = nullptr)
10371039
: Gateway(gateway)
10381040
, Cluster(cluster)
10391041
, GUCSettings(gUCSettings)
@@ -1044,7 +1046,7 @@ class TKqpHost : public IKqpHost {
10441046
, KeepConfigChanges(keepConfigChanges)
10451047
, IsInternalCall(isInternalCall)
10461048
, FederatedQuerySetup(federatedQuerySetup)
1047-
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken))
1049+
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken, nullptr, userRequestContext))
10481050
, Config(config)
10491051
, TypesCtx(MakeIntrusive<TTypeAnnotationContext>())
10501052
, PlanBuilder(CreatePlanBuilder(*TypesCtx))
@@ -1958,10 +1960,11 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway, const
19581960
const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
19591961
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TGUCSettings::TPtr& gUCSettings,
19601962
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe<TString>& applicationName, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
1961-
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx)
1963+
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx,
1964+
const TIntrusivePtr<TUserRequestContext>& userRequestContext)
19621965
{
19631966
return MakeIntrusive<TKqpHost>(gateway, cluster, database, gUCSettings, applicationName, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry,
1964-
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx, queryServiceConfig);
1967+
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx, queryServiceConfig, userRequestContext);
19651968
}
19661969

19671970
} // namespace NKqp

ydb/core/kqp/host/kqp_host.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
123123
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe<TString>& applicationName = Nothing(), const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
124124
bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
125125
NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/,
126-
NYql::TExprContext* ctx = nullptr);
126+
NYql::TExprContext* ctx = nullptr, const TIntrusivePtr<TUserRequestContext>& userRequestContext = nullptr);
127127

128128
} // namespace NKqp
129129
} // namespace NKikimr

ydb/core/kqp/provider/yql_kikimr_provider.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include <ydb/core/base/path.h>
77
#include <ydb/core/external_sources/external_source_factory.h>
8+
#include <ydb/core/kqp/common/kqp_user_request_context.h>
89
#include <ydb/core/kqp/common/simple/temp_tables.h>
910
#include <ydb/core/kqp/query_data/kqp_query_data.h>
1011
#include <ydb/library/yql/ast/yql_gc_nodes.h>
@@ -460,12 +461,14 @@ class TKikimrSessionContext : public TThrRefBase {
460461
TIntrusivePtr<ITimeProvider> timeProvider,
461462
TIntrusivePtr<IRandomProvider> randomProvider,
462463
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
463-
TIntrusivePtr<TKikimrTransactionContextBase> txCtx = nullptr)
464+
TIntrusivePtr<TKikimrTransactionContextBase> txCtx = nullptr,
465+
const TIntrusivePtr<NKikimr::NKqp::TUserRequestContext>& userRequestContext = nullptr)
464466
: Configuration(config)
465467
, TablesData(MakeIntrusive<TKikimrTablesData>())
466468
, QueryCtx(MakeIntrusive<TKikimrQueryContext>(functionRegistry, timeProvider, randomProvider))
467469
, TxCtx(txCtx)
468470
, UserToken(userToken)
471+
, UserRequestContext(userRequestContext)
469472
{}
470473

471474
TKikimrSessionContext(const TKikimrSessionContext&) = delete;
@@ -547,6 +550,10 @@ class TKikimrSessionContext : public TThrRefBase {
547550
return UserToken;
548551
}
549552

553+
const TIntrusivePtr<NKikimr::NKqp::TUserRequestContext>& GetUserRequestContext() const {
554+
return UserRequestContext;
555+
}
556+
550557
private:
551558
TString UserName;
552559
TString Cluster;
@@ -558,6 +565,7 @@ class TKikimrSessionContext : public TThrRefBase {
558565
TIntrusivePtr<TKikimrTransactionContextBase> TxCtx;
559566
NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState;
560567
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
568+
TIntrusivePtr<NKikimr::NKqp::TUserRequestContext> UserRequestContext;
561569
};
562570

563571
TIntrusivePtr<IDataProvider> CreateKikimrDataSource(

ydb/core/kqp/session_actor/kqp_worker_actor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
188188
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
189189

190190
KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup,
191-
QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false, nullptr, nullptr, nullptr);
191+
QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry,
192+
!Settings.LongSession, false, nullptr, nullptr, nullptr, QueryState->RequestEv->GetUserRequestContext());
192193

193194
auto& queryRequest = QueryState->RequestEv;
194195
QueryState->ProxyRequestId = proxyRequestId;

ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,45 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
105105
// check datashard logs (should be empty, because DataShard only logs modification operations)
106106
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 0);
107107
}
108+
109+
Y_UNIT_TEST_TWIN(UpsertViaLegacyScripting, Streaming) {
110+
TKikimrSettings serverSettings;
111+
TStringStream ss;
112+
serverSettings.LogStream = &ss;
113+
TKikimrRunner kikimr(serverSettings);
114+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
115+
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
116+
117+
118+
const auto query = R"(
119+
--!syntax_v1
120+
121+
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
122+
(3u, "Value3"),
123+
(101u, "Value101"),
124+
(201u, "Value201");
125+
)";
126+
127+
if (Streaming) {
128+
auto result = client.StreamExecuteYqlScript(query).GetValueSync();
129+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
130+
CollectStreamResult(result);
131+
} else {
132+
auto result = client.ExecuteYqlScript(query).GetValueSync();
133+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
134+
}
135+
136+
// check executer logs
137+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1);
138+
// check session actor logs (should contain double logs because this query was executed via worker actor)
139+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 4);
140+
// check grpc logs
141+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
142+
// check datashard logs
143+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2);
144+
145+
Cout << ss.Str() << Endl;
146+
}
108147
}
109148

110149
} // namespace NKqp

0 commit comments

Comments
 (0)