Skip to content

Commit 8e46b3b

Browse files
Merge 01b6fc7 into eb870f2
2 parents eb870f2 + 01b6fc7 commit 8e46b3b

File tree

6 files changed

+93
-17
lines changed

6 files changed

+93
-17
lines changed

ydb/core/grpc_services/base/base.h

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <ydb/core/grpc_streaming/grpc_streaming.h>
2727
#include <ydb/core/tx/scheme_board/events.h>
2828
#include <ydb/core/base/events.h>
29+
#include <ydb/core/util/ulid.h>
2930

3031
#include <ydb/library/actors/wilson/wilson_span.h>
3132

@@ -751,7 +752,12 @@ class TGRpcRequestBiStreamWrapper
751752
TGRpcRequestBiStreamWrapper(TIntrusivePtr<IStreamCtx> ctx, bool rlAllowed = true)
752753
: Ctx_(ctx)
753754
, RlAllowed_(rlAllowed)
754-
{ }
755+
, TraceId(GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER))
756+
{
757+
if (!TraceId) {
758+
TraceId = UlidGen.Next().ToString();
759+
}
760+
}
755761

756762
bool IsClientLost() const override {
757763
// TODO: Implement for BiDirectional streaming
@@ -852,7 +858,7 @@ class TGRpcRequestBiStreamWrapper
852858
}
853859

854860
TMaybe<TString> GetTraceId() const override {
855-
return GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER);
861+
return TraceId;
856862
}
857863

858864
NWilson::TTraceId GetWilsonTraceId() const override {
@@ -930,6 +936,8 @@ class TGRpcRequestBiStreamWrapper
930936
IGRpcProxyCounters::TPtr Counters_;
931937
NWilson::TSpan Span_;
932938
bool IsTracingDecided_ = false;
939+
TULIDGenerator UlidGen;
940+
TMaybe<TString> TraceId;
933941
};
934942

935943
template <typename TDerived>
@@ -1037,7 +1045,12 @@ class TGRpcRequestWrapperImpl
10371045

10381046
TGRpcRequestWrapperImpl(NYdbGrpc::IRequestContextBase* ctx)
10391047
: Ctx_(ctx)
1040-
{ }
1048+
, TraceId(GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER))
1049+
{
1050+
if (!TraceId) {
1051+
TraceId = UlidGen.Next().ToString();
1052+
}
1053+
}
10411054

10421055
const TMaybe<TString> GetYdbToken() const override {
10431056
return ExtractYdbToken(Ctx_->GetPeerMetaValues(NYdb::YDB_AUTH_TICKET_HEADER));
@@ -1169,7 +1182,7 @@ class TGRpcRequestWrapperImpl
11691182
}
11701183

11711184
TMaybe<TString> GetTraceId() const override {
1172-
return GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER);
1185+
return TraceId;
11731186
}
11741187

11751188
NWilson::TTraceId GetWilsonTraceId() const override {
@@ -1372,6 +1385,8 @@ class TGRpcRequestWrapperImpl
13721385
TAuditLogHook AuditLogHook;
13731386
bool RequestFinished = false;
13741387
bool IsTracingDecided_ = false;
1388+
TULIDGenerator UlidGen;
1389+
TMaybe<TString> TraceId;
13751390
};
13761391

13771392
template <ui32 TRpcId, typename TReq, typename TResp, bool IsOperation, typename TDerived>

ydb/core/kqp/gateway/kqp_gateway.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ class IKqpGateway : public NYql::IKikimrGateway {
206206

207207
virtual NThreading::TFuture<TQueryResult> ExecDataQueryAst(const TString& cluster, const TString& query,
208208
TQueryData::TPtr params, const TAstQuerySettings& settings,
209-
const Ydb::Table::TransactionSettings& txSettings) = 0;
209+
const Ydb::Table::TransactionSettings& txSettings, const TMaybe<TString>& traceId) = 0;
210210

211211
virtual NThreading::TFuture<TQueryResult> ExplainScanQueryAst(const TString& cluster, const TString& query) = 0;
212212

@@ -215,21 +215,21 @@ class IKqpGateway : public NYql::IKikimrGateway {
215215

216216
virtual NThreading::TFuture<TQueryResult> StreamExecDataQueryAst(const TString& cluster, const TString& query,
217217
TQueryData::TPtr, const TAstQuerySettings& settings,
218-
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target) = 0;
218+
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target, const TMaybe<TString>& traceId) = 0;
219219

220220
virtual NThreading::TFuture<TQueryResult> StreamExecScanQueryAst(const TString& cluster, const TString& query,
221221
TQueryData::TPtr, const TAstQuerySettings& settings, const NActors::TActorId& target,
222222
std::shared_ptr<NGRpcService::IRequestCtxMtSafe> rpcCtx) = 0;
223223

224224
virtual NThreading::TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query,
225225
TQueryData::TPtr params, const TAstQuerySettings& settings,
226-
const Ydb::Table::TransactionSettings& txSettings) = 0;
226+
const Ydb::Table::TransactionSettings& txSettings, const TMaybe<TString>& traceId) = 0;
227227

228228
virtual NThreading::TFuture<TQueryResult> ExplainGenericQuery(const TString& cluster, const TString& query) = 0;
229229

230230
virtual NThreading::TFuture<TQueryResult> StreamExecGenericQuery(const TString& cluster, const TString& query,
231231
TQueryData::TPtr params, const TAstQuerySettings& settings,
232-
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target) = 0;
232+
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target, const TMaybe<TString>& traceId) = 0;
233233
};
234234

235235
TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database,

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1847,7 +1847,8 @@ class TKikimrIcGateway : public IKqpGateway {
18471847

18481848
TFuture<TQueryResult> StreamExecDataQueryAst(const TString& cluster, const TString& query,
18491849
TQueryData::TPtr params, const TAstQuerySettings& settings,
1850-
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target) override
1850+
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target,
1851+
const TMaybe<TString>& traceId) override
18511852
{
18521853
YQL_ENSURE(cluster == Cluster);
18531854

@@ -1859,6 +1860,10 @@ class TKikimrIcGateway : public IKqpGateway {
18591860
ev->Record.SetUserToken(UserToken->GetSerializedToken());
18601861
}
18611862

1863+
if (traceId) {
1864+
ev->Record.SetTraceId(*traceId);
1865+
}
1866+
18621867
ev->Record.MutableRequest()->SetDatabase(Database);
18631868
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
18641869
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_AST_DML);
@@ -1946,7 +1951,8 @@ class TKikimrIcGateway : public IKqpGateway {
19461951
}
19471952

19481953
TFuture<TQueryResult> ExecDataQueryAst(const TString& cluster, const TString& query, TQueryData::TPtr params,
1949-
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings) override
1954+
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
1955+
const TMaybe<TString>& traceId) override
19501956
{
19511957
YQL_ENSURE(cluster == Cluster);
19521958

@@ -1958,6 +1964,10 @@ class TKikimrIcGateway : public IKqpGateway {
19581964
ev->Record.SetUserToken(UserToken->GetSerializedToken());
19591965
}
19601966

1967+
if (traceId) {
1968+
ev->Record.SetTraceId(*traceId);
1969+
}
1970+
19611971
ev->Record.MutableRequest()->SetDatabase(Database);
19621972
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
19631973
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_AST_DML);
@@ -2021,12 +2031,17 @@ class TKikimrIcGateway : public IKqpGateway {
20212031
}
20222032

20232033
TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query, TQueryData::TPtr params,
2024-
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings) override
2034+
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
2035+
const TMaybe<TString>& traceId) override
20252036
{
20262037
YQL_ENSURE(cluster == Cluster);
20272038

20282039
auto ev = MakeHolder<TEvKqp::TEvQueryRequest>();
20292040

2041+
if (traceId) {
2042+
ev->Record.SetTraceId(*traceId);
2043+
}
2044+
20302045
auto& request = *ev->Record.MutableRequest();
20312046
request.SetCollectStats(settings.CollectStats);
20322047

@@ -2047,7 +2062,8 @@ class TKikimrIcGateway : public IKqpGateway {
20472062

20482063
TFuture<TQueryResult> StreamExecGenericQuery(const TString& cluster, const TString& query,
20492064
TQueryData::TPtr params, const TAstQuerySettings& settings,
2050-
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target) override
2065+
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target,
2066+
const TMaybe<TString>& traceId) override
20512067
{
20522068
YQL_ENSURE(cluster == Cluster);
20532069

@@ -2059,6 +2075,10 @@ class TKikimrIcGateway : public IKqpGateway {
20592075
ev->Record.SetUserToken(UserToken->GetSerializedToken());
20602076
}
20612077

2078+
if (traceId) {
2079+
ev->Record.SetTraceId(*traceId);
2080+
}
2081+
20622082
ev->Record.MutableRequest()->SetDatabase(Database);
20632083
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
20642084
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
811811
TKiDataQueryBlocks dataQueryBlocks(query);
812812

813813
auto queryAstStr = SerializeExpr(ctx, *query);
814+
TMaybe<TString> traceId = SessionCtx->GetUserRequestContext() ? SessionCtx->GetUserRequestContext()->TraceId : TMaybe<TString>{};
814815

815816
bool useGenericQuery = ShouldUseGenericQuery(dataQueryBlocks);
816817
bool useScanQuery = ShouldUseScanQuery(dataQueryBlocks, settings);
@@ -828,7 +829,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
828829
future = Gateway->ExplainGenericQuery(Cluster, SessionCtx->Query().PreparingQuery->GetText());
829830
} else {
830831
future = Gateway->ExecGenericQuery(Cluster, SessionCtx->Query().PreparingQuery->GetText(), CollectParameters(query),
831-
querySettings, txSettings);
832+
querySettings, txSettings, traceId);
832833
}
833834
} else if (useScanQuery) {
834835
ui64 rowsLimit = 0;
@@ -850,7 +851,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
850851
future = Gateway->ExplainDataQueryAst(Cluster, queryAstStr);
851852
} else {
852853
future = Gateway->ExecDataQueryAst(Cluster, queryAstStr, CollectParameters(query),
853-
querySettings, txSettings);
854+
querySettings, txSettings, traceId);
854855
}
855856
}
856857
break;
@@ -860,7 +861,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
860861
txSettings.mutable_serializable_read_write();
861862

862863
future = Gateway->StreamExecGenericQuery(Cluster, SessionCtx->Query().PreparingQuery->GetText(), CollectParameters(query),
863-
querySettings, txSettings, SessionCtx->Query().ReplyTarget);
864+
querySettings, txSettings, SessionCtx->Query().ReplyTarget, traceId);
864865
} else if (useScanQuery) {
865866
future = Gateway->StreamExecScanQueryAst(Cluster, queryAstStr, CollectParameters(query),
866867
querySettings, SessionCtx->Query().ReplyTarget, SessionCtx->Query().RpcCtx);
@@ -869,7 +870,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
869870
txSettings.mutable_serializable_read_write();
870871

871872
future = Gateway->StreamExecDataQueryAst(Cluster, queryAstStr, CollectParameters(query),
872-
querySettings, txSettings, SessionCtx->Query().ReplyTarget);
873+
querySettings, txSettings, SessionCtx->Query().ReplyTarget, traceId);
873874
}
874875
break;
875876

ydb/core/kqp/session_actor/kqp_worker_actor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
188188
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
189189

190190
KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup,
191-
QueryState->RequestEv->GetUserToken(), GUCSettings, Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);
191+
QueryState->RequestEv->GetUserToken(), GUCSettings, Settings.ApplicationName, AppData(ctx)->FunctionRegistry,
192+
!Settings.LongSession, false, nullptr, nullptr, nullptr, QueryState->RequestEv->GetUserRequestContext());
192193

193194
auto& queryRequest = QueryState->RequestEv;
194195
QueryState->ProxyRequestId = proxyRequestId;

ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,45 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
105105
// check datashard logs (should be empty, because DataShard only logs modification operations)
106106
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 0);
107107
}
108+
109+
Y_UNIT_TEST_TWIN(UpsertViaLegacyScripting, Streaming) {
110+
TKikimrSettings serverSettings;
111+
TStringStream ss;
112+
serverSettings.LogStream = &ss;
113+
TKikimrRunner kikimr(serverSettings);
114+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
115+
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
116+
117+
118+
const auto query = R"(
119+
--!syntax_v1
120+
121+
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
122+
(3u, "Value3"),
123+
(101u, "Value101"),
124+
(201u, "Value201");
125+
)";
126+
127+
if (Streaming) {
128+
auto result = client.StreamExecuteYqlScript(query).GetValueSync();
129+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
130+
CollectStreamResult(result);
131+
} else {
132+
auto result = client.ExecuteYqlScript(query).GetValueSync();
133+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
134+
}
135+
136+
// check executer logs
137+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1);
138+
// check session actor logs (should contain double logs because this query was executed via worker actor)
139+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 4);
140+
// check grpc logs
141+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
142+
// check datashard logs
143+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2);
144+
145+
Cout << ss.Str() << Endl;
146+
}
108147
}
109148

110149
} // namespace NKqp

0 commit comments

Comments
 (0)