@@ -73,41 +73,36 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
73
73
const std::string& key,
74
74
const std::string& value,
75
75
const PickFunc& callback) {
76
+ bool isTag = nebula::NebulaKeyUtils::isTag (vIdLen_, key);
77
+ bool isEdge = nebula::NebulaKeyUtils::isEdge (vIdLen_, key);
78
+ if (!(isTag || isEdge)) {
79
+ return ;
80
+ }
81
+ std::unordered_map<std::string, nebula::meta::cpp2::FTIndex> ftIndexes;
82
+ nebula::RowReaderWrapper reader;
83
+
84
+ std::string vid;
85
+ std::string src;
86
+ std::string dst;
87
+ int rank = 0 ;
76
88
if (nebula::NebulaKeyUtils::isTag (vIdLen_, key)) {
77
89
auto tagId = NebulaKeyUtils::getTagId (vIdLen_, key);
78
90
auto ftIndexRes = schemaMan_->getFTIndex (spaceId_, tagId);
79
91
if (!ftIndexRes.ok ()) {
80
92
LOG (ERROR) << ftIndexRes.status ().message ();
81
93
return ;
82
94
}
83
- auto ftIndex = std::move (ftIndexRes).value ();
84
- if (ftIndex.empty ()) {
85
- return ;
86
- }
87
- auto reader = RowReaderWrapper::getTagPropReader (schemaMan_, spaceId_, tagId, value);
88
- if (reader == nullptr ) {
89
- LOG (ERROR) << " get tag reader failed, tagID " << tagId;
90
- return ;
91
- }
92
- for (auto & index : ftIndex) {
93
- if (index .second .get_fields ().size () > 1 ) {
94
- LOG (ERROR) << " Only one field will create fulltext index" ;
95
- }
96
- auto field = index .second .get_fields ().front ();
97
- auto v = reader->getValueByName (field);
98
- if (v.type () == Value::Type::NULLVALUE) {
99
- continue ;
100
- }
101
- if (v.type () != Value::Type::STRING) {
102
- LOG (ERROR) << " Can't create fulltext index on type " << v.type ();
95
+ ftIndexes = std::move (ftIndexRes).value ();
96
+ if (type == BatchLogType::OP_BATCH_PUT) {
97
+ reader = RowReaderWrapper::getTagPropReader (schemaMan_, spaceId_, tagId, value);
98
+ if (reader == nullptr ) {
99
+ LOG (ERROR) << " get tag reader failed, tagID " << tagId;
100
+ return ;
103
101
}
104
- std::string indexName = index .first ;
105
- std::string vid = NebulaKeyUtils::getVertexId (vIdLen_, key).toString ();
106
- vid = truncateVid (vid);
107
- std::string text = std::move (v).getStr ();
108
- callback (type, indexName, vid, " " , " " , 0 , text);
109
102
}
110
- } else if (nebula::NebulaKeyUtils::isEdge (vIdLen_, key)) {
103
+ vid = NebulaKeyUtils::getVertexId (vIdLen_, key).toString ();
104
+ vid = truncateVid (vid);
105
+ } else {
111
106
auto edgeType = NebulaKeyUtils::getEdgeType (vIdLen_, key);
112
107
if (edgeType < 0 ) {
113
108
return ;
@@ -116,33 +111,44 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
116
111
if (!ftIndexRes.ok ()) {
117
112
return ;
118
113
}
119
- auto ftIndex = std::move (ftIndexRes).value ();
120
- auto reader = RowReaderWrapper::getEdgePropReader (schemaMan_, spaceId_, edgeType, value);
121
- if (reader == nullptr ) {
122
- LOG (ERROR) << " get edge reader failed, schema ID " << edgeType;
123
- return ;
124
- }
125
- for (auto & index : ftIndex) {
126
- if (index .second .get_fields ().size () > 1 ) {
127
- LOG (ERROR) << " Only one field will create fulltext index" ;
114
+ ftIndexes = std::move (ftIndexRes).value ();
115
+ if (type == BatchLogType::OP_BATCH_PUT) {
116
+ reader = RowReaderWrapper::getEdgePropReader (schemaMan_, spaceId_, edgeType, value);
117
+ if (reader == nullptr ) {
118
+ LOG (ERROR) << " get edge reader failed, schema ID " << edgeType;
119
+ return ;
128
120
}
121
+ }
122
+ src = NebulaKeyUtils::getSrcId (vIdLen_, key).toString ();
123
+ dst = NebulaKeyUtils::getDstId (vIdLen_, key).toString ();
124
+ rank = NebulaKeyUtils::getRank (vIdLen_, key);
125
+
126
+ src = truncateVid (src);
127
+ dst = truncateVid (dst);
128
+ }
129
+ if (ftIndexes.empty ()) {
130
+ return ;
131
+ }
132
+
133
+ for (auto & index : ftIndexes) {
134
+ if (index .second .get_fields ().size () > 1 ) {
135
+ LOG (ERROR) << " Only one field will create fulltext index" ;
136
+ }
137
+ std::string text;
138
+ std::string indexName = index .first ;
139
+ if (type == BatchLogType::OP_BATCH_PUT) {
129
140
auto field = index .second .get_fields ().front ();
130
141
auto v = reader->getValueByName (field);
131
142
if (v.type () == Value::Type::NULLVALUE) {
143
+ callback (BatchLogType::OP_BATCH_REMOVE, indexName, vid, src, dst, 0 , text);
132
144
continue ;
133
145
}
134
146
if (v.type () != Value::Type::STRING) {
135
147
LOG (ERROR) << " Can't create fulltext index on type " << v.type ();
136
148
}
137
- std::string indexName = index .first ;
138
- std::string src = NebulaKeyUtils::getSrcId (vIdLen_, key).toString ();
139
- std::string dst = NebulaKeyUtils::getDstId (vIdLen_, key).toString ();
140
- int64_t rank = NebulaKeyUtils::getRank (vIdLen_, key);
141
- std::string text = std::move (v).getStr ();
142
- src = truncateVid (src);
143
- dst = truncateVid (dst);
144
- callback (type, indexName, " " , src, dst, rank, text);
149
+ text = std::move (v).getStr ();
145
150
}
151
+ callback (type, indexName, vid, src, dst, rank, text);
146
152
}
147
153
}
148
154
0 commit comments