1
1
#include " dq_pq_meta_extractor.h"
2
2
3
- #include < optional>
4
-
5
3
#include < ydb/library/yql/minikql/mkql_string_util.h>
6
4
#include < ydb/library/yql/providers/pq/common/pq_meta_fields.h>
7
- #include < ydb/library/yql/public/udf/udf_data_type.h>
8
- #include < ydb/library/yql/public/udf/udf_value.h>
9
-
10
- #include < ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
11
-
12
- #include < util/generic/string.h>
13
5
14
6
namespace {
15
7
const std::unordered_map<TString, NYql::NDq::TPqMetaExtractor::TPqMetaExtractorLambda> ExtractorsMap = {
16
8
{
17
- " _yql_sys_create_time" , [](const NYdb::NPersQueue ::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
9
+ " _yql_sys_create_time" , [](const NYdb::NTopic ::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
18
10
using TDataType = NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>;
19
11
return std::make_pair (
20
12
NYql::NUdf::TUnboxedValuePod (static_cast <TDataType::TLayout>(message.GetCreateTime ().MicroSeconds ())),
@@ -23,7 +15,7 @@ namespace {
23
15
}
24
16
},
25
17
{
26
- " _yql_sys_tsp_write_time" , [](const NYdb::NPersQueue ::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
18
+ " _yql_sys_tsp_write_time" , [](const NYdb::NTopic ::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
27
19
using TDataType = NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>;
28
20
return std::make_pair (
29
21
NYql::NUdf::TUnboxedValuePod (static_cast <TDataType::TLayout>(message.GetWriteTime ().MicroSeconds ())),
@@ -32,24 +24,24 @@ namespace {
32
24
}
33
25
},
34
26
{
35
- " _yql_sys_partition_id" , [](const NYdb::NPersQueue ::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
27
+ " _yql_sys_partition_id" , [](const NYdb::NTopic ::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
36
28
using TDataType = NYql::NUdf::TDataType<ui64>;
37
29
return std::make_pair (
38
- NYql::NUdf::TUnboxedValuePod (message.GetPartitionStream ()->GetPartitionId ()),
30
+ NYql::NUdf::TUnboxedValuePod (message.GetPartitionSession ()->GetPartitionId ()),
39
31
NYql::NUdf::GetDataTypeInfo (TDataType::Slot).FixedSize
40
32
);
41
33
}
42
34
},
43
35
{
44
- " _yql_sys_offset" , [](const NYdb::NPersQueue ::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
36
+ " _yql_sys_offset" , [](const NYdb::NTopic ::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
45
37
using TDataType = NYql::NUdf::TDataType<ui64>;
46
38
return std::make_pair (
47
39
NYql::NUdf::TUnboxedValuePod (message.GetOffset ()),
48
40
NYql::NUdf::GetDataTypeInfo (TDataType::Slot).FixedSize );
49
41
}
50
42
},
51
43
{
52
- " _yql_sys_message_group_id" , [](const NYdb::NPersQueue ::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
44
+ " _yql_sys_message_group_id" , [](const NYdb::NTopic ::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
53
45
const auto & data = message.GetMessageGroupId ();
54
46
return std::make_pair (
55
47
NKikimr::NMiniKQL::MakeString (NYql::NUdf::TStringRef (data.Data (), data.Size ())),
@@ -58,7 +50,7 @@ namespace {
58
50
}
59
51
},
60
52
{
61
- " _yql_sys_seq_no" , [](const NYdb::NPersQueue ::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
53
+ " _yql_sys_seq_no" , [](const NYdb::NTopic ::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
62
54
using TDataType = NYql::NUdf::TDataType<ui64>;
63
55
return std::make_pair (
64
56
NYql::NUdf::TUnboxedValuePod (message.GetSeqNo ()),
@@ -72,14 +64,14 @@ namespace {
72
64
namespace NYql ::NDq {
73
65
74
66
TPqMetaExtractor::TPqMetaExtractor () {
75
- for (auto key : AllowedPqMetaSysColumns ()) {
67
+ for (const auto & key : AllowedPqMetaSysColumns ()) {
76
68
Y_ENSURE (
77
69
ExtractorsMap.contains (key),
78
70
" Pq metadata field " << key << " hasn't valid runtime extractor. You should add it." );
79
71
}
80
72
}
81
73
82
- TPqMetaExtractor::TPqMetaExtractorLambda TPqMetaExtractor::FindExtractorLambda (TString sysColumn) const {
74
+ TPqMetaExtractor::TPqMetaExtractorLambda TPqMetaExtractor::FindExtractorLambda (const TString& sysColumn) const {
83
75
auto iter = ExtractorsMap.find (sysColumn);
84
76
Y_ENSURE (iter != ExtractorsMap.end (), sysColumn);
85
77
0 commit comments