@@ -30,78 +30,77 @@ void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) {
30
30
}
31
31
32
32
CHECK_NOTNULL (kvstore_);
33
- if (indexes_.empty ()) {
34
- std::for_each (partVertices.begin (), partVertices.end (), [&](auto & pv) {
35
- auto partId = pv.first ;
36
- const auto & vertices = pv.second ;
37
- std::vector<kvstore::KV> data;
38
- std::for_each (vertices.begin (), vertices.end (), [&](auto & v) {
39
- const auto & tags = v.get_tags ();
40
- std::for_each (tags.begin (), tags.end (), [&](auto & tag) {
41
- VLOG (3 ) << " PartitionID: " << partId << " , VertexID: " << v.get_id ()
42
- << " , TagID: " << tag.get_tag_id () << " , TagVersion: " << version;
43
- auto key = NebulaKeyUtils::vertexKey (partId, v.get_id (),
44
- tag.get_tag_id (), version);
45
- data.emplace_back (std::move (key), std::move (tag.get_props ()));
46
- if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr ) {
47
- vertexCache_->evict (std::make_pair (v.get_id (), tag.get_tag_id ()));
48
- VLOG (3 ) << " Evict cache for vId " << v.get_id ()
49
- << " , tagId " << tag.get_tag_id ();
50
- }
51
- });
33
+ std::unordered_set<std::pair<VertexID, TagID>> uniqueIDs;
34
+ uniqueIDs.reserve (128 );
35
+
36
+ std::for_each (partVertices.begin (), partVertices.end (), [&](auto & pv) {
37
+ std::vector<kvstore::KV> data;
38
+ data.reserve (128 );
39
+ std::vector<std::tuple<VertexID, TagID, std::string>> cacheData;
40
+ if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr ) {
41
+ cacheData.reserve (128 );
42
+ }
43
+ auto partId = pv.first ;
44
+ const auto & vertices = pv.second ;
45
+
46
+ uniqueIDs.clear ();
47
+ std::for_each (vertices.rbegin (), vertices.rend (), [&](auto & v) {
48
+ const auto & tags = v.get_tags ();
49
+ std::for_each (tags.begin (), tags.end (), [&](auto & tag) {
50
+ auto uniqueKey = std::make_pair (v.get_id (), tag.get_tag_id ());
51
+ if (uniqueIDs.find (uniqueKey) != uniqueIDs.end ()) {
52
+ return ;
53
+ }
54
+
55
+ VLOG (3 ) << " PartitionID: " << partId << " , VertexID: " << v.get_id ()
56
+ << " , TagID: " << tag.get_tag_id () << " , TagVersion: " << version;
57
+ auto key = NebulaKeyUtils::vertexKey (partId, v.get_id (),
58
+ tag.get_tag_id (), version);
59
+ if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr ) {
60
+ cacheData.emplace_back (v.get_id (), tag.get_tag_id (), tag.get_props ());
61
+ }
62
+ data.emplace_back (std::move (key), std::move (tag.get_props ()));
63
+ uniqueIDs.emplace (uniqueKey);
52
64
});
53
- doPut (spaceId_, partId, std::move (data));
54
- });
55
- } else {
56
- std::for_each (partVertices.begin (), partVertices.end (), [&](auto &pv) {
57
- auto partId = pv.first ;
58
- auto atomic = [version, partId, vertices = std::move (pv.second ), this ]()
59
- -> folly::Optional<std::string> {
60
- return addVertices (version, partId, vertices);
61
- };
62
- auto callback = [partId, this ](kvstore::ResultCode code) {
63
- handleAsync (spaceId_, partId, code);
64
- };
65
- this ->kvstore_ ->asyncAtomicOp (spaceId_, partId, atomic, callback);
66
65
});
67
- }
68
- }
69
66
70
- std::string AddVerticesProcessor::addVertices (int64_t version, PartitionID partId,
71
- const std::vector<cpp2::Vertex>& vertices) {
72
- std::unique_ptr<kvstore::BatchHolder> batchHolder = std::make_unique<kvstore::BatchHolder>();
73
- /*
74
- * Define the map newIndexes to avoid inserting duplicate vertex.
75
- * This map means :
76
- * map<vertex_unique_key, prop_value> ,
77
- * -- vertex_unique_key is only used as the unique key , for example:
78
- * insert below vertices in the same request:
79
- * kv(part1_vid1_tag1 , v1)
80
- * kv(part1_vid1_tag1 , v2)
81
- * kv(part1_vid1_tag1 , v3)
82
- * kv(part1_vid1_tag1 , v4)
83
- *
84
- * Ultimately, kv(part1_vid1_tag1 , v4) . It's just what I need.
85
- */
86
- std::map<std::string, std::string> newVertices;
87
- std::for_each (vertices.begin (), vertices.end (), [&](auto & v) {
88
- auto vId = v.get_id ();
89
- const auto & tags = v.get_tags ();
90
- std::for_each (tags.begin (), tags.end (), [&](auto & tag) {
91
- auto tagId = tag.get_tag_id ();
92
- auto prop = tag.get_props ();
93
- VLOG (3 ) << " PartitionID: " << partId << " , VertexID: " << vId
94
- << " , TagID: " << tagId << " , TagVersion: " << version;
95
- auto key = NebulaKeyUtils::vertexKey (partId, vId, tagId, version);
96
- newVertices[key] = std::move (prop);
97
- if (FLAGS_enable_vertex_cache && this ->vertexCache_ != nullptr ) {
98
- this ->vertexCache_ ->evict (std::make_pair (vId, tagId));
99
- VLOG (3 ) << " Evict cache for vId " << vId << " , tagId " << tagId;
67
+ auto callback = [partId,
68
+ this ,
69
+ cacheData = std::move (cacheData)] (kvstore::ResultCode code) mutable {
70
+ if (FLAGS_enable_vertex_cache
71
+ && vertexCache_ != nullptr
72
+ && code == kvstore::ResultCode::SUCCEEDED) {
73
+ for (auto && tup : cacheData) {
74
+ vertexCache_->insert (std::make_pair (std::get<0 >(tup),
75
+ std::get<1 >(tup)),
76
+ std::move (std::get<2 >(tup)));
77
+ }
100
78
}
101
- });
79
+ handleAsync (spaceId_, partId, code);
80
+ };
81
+ if (indexes_.empty ()) {
82
+ this ->kvstore_ ->asyncMultiPut (spaceId_,
83
+ partId,
84
+ std::move (data),
85
+ std::move (callback));
86
+ } else {
87
+ auto atomicOp = [partId, data = std::move (data), this ]() mutable
88
+ -> folly::Optional<std::string> {
89
+ return addVerticesWithIndex (partId, std::move (data));
90
+ };
91
+
92
+ this ->kvstore_ ->asyncAtomicOp (spaceId_,
93
+ partId,
94
+ std::move (atomicOp),
95
+ std::move (callback));
96
+ }
102
97
});
98
+ }
103
99
104
- for (auto & v : newVertices) {
100
+ std::string AddVerticesProcessor::addVerticesWithIndex (PartitionID partId,
101
+ std::vector<kvstore::KV>&& data) {
102
+ std::unique_ptr<kvstore::BatchHolder> batchHolder = std::make_unique<kvstore::BatchHolder>();
103
+ for (auto & v : data) {
105
104
std::string val;
106
105
RowReader nReader = RowReader::getEmptyRowReader ();
107
106
auto tagId = NebulaKeyUtils::getTagId (v.first );
@@ -150,9 +149,7 @@ std::string AddVerticesProcessor::addVertices(int64_t version, PartitionID partI
150
149
/*
151
150
* step 3 , Insert new vertex data
152
151
*/
153
- auto key = v.first ;
154
- auto prop = v.second ;
155
- batchHolder->put (std::move (key), std::move (prop));
152
+ batchHolder->put (std::move (v.first ), std::move (v.second ));
156
153
}
157
154
return encodeBatchValue (batchHolder->getBatch ());
158
155
}
0 commit comments