Skip to content

Commit

Permalink
Delete vertices with get neighbor (vesoft-inc#1860)
Browse files Browse the repository at this point in the history
* delete vertices with get neighbor

* remove get edge keys
  • Loading branch information
yaphet authored Mar 6, 2020
1 parent c0aa061 commit eebfde4
Show file tree
Hide file tree
Showing 16 changed files with 196 additions and 366 deletions.
111 changes: 98 additions & 13 deletions src/graph/DeleteVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "dataman/RowReader.h"
#include "graph/DeleteVerticesExecutor.h"
#include "storage/client/StorageClient.h"

Expand Down Expand Up @@ -52,8 +53,44 @@ void DeleteVerticesExecutor::execute() {
vids_.emplace_back(vid);
}

// TODO(zlcook) Get edgeKeys of a vertex by Go
auto future = ectx()->getStorageClient()->getEdgeKeys(space_, vids_);
auto edgeAllStatus = ectx()->schemaManager()->getAllEdge(space_);
if (!edgeAllStatus.ok()) {
doError(edgeAllStatus.status());
return;
}

std::vector<EdgeType> edgeTypes;
std::vector<std::string> columns = {_DST, _RANK};
std::vector<nebula::storage::cpp2::PropDef> props;
for (auto &e : edgeAllStatus.value()) {
auto edgeStatus = ectx()->schemaManager()->toEdgeType(space_, e);
if (!edgeStatus.ok()) {
LOG(ERROR) << "Can't get all of the edge types";
doError(edgeStatus.status());
return;
}

auto type = edgeStatus.value();
VLOG(3) << "Get Edge Type: " << type << " and " << (-1 * type);
edgeTypes.emplace_back(type);
edgeTypes.emplace_back(-1 * type);

for (auto& column : columns) {
storage::cpp2::PropDef def;
def.owner = storage::cpp2::PropOwner::EDGE;
def.name = column;
def.id.set_edge_type(type);
props.emplace_back(std::move(def));

storage::cpp2::PropDef reverseDef;
reverseDef.owner = storage::cpp2::PropOwner::EDGE;
reverseDef.name = column;
reverseDef.id.set_edge_type(-1 * type);
props.emplace_back(std::move(reverseDef));
}
}

auto future = ectx()->getStorageClient()->getNeighbors(space_, vids_, edgeTypes, "", props);
auto *runner = ectx()->rctx()->runner();

auto cb = [this] (auto &&result) {
Expand All @@ -72,17 +109,65 @@ void DeleteVerticesExecutor::execute() {
auto rpcResp = std::move(result).responses();
std::vector<storage::cpp2::EdgeKey> allEdges;
for (auto& response : rpcResp) {
auto keys = response.get_edge_keys();
for (auto iter = keys->begin(); iter != keys->end(); iter++) {
for (auto& edge : iter->second) {
storage::cpp2::EdgeKey reverseEdge;
reverseEdge.set_src(edge.get_dst());
reverseEdge.set_edge_type(-(edge.get_edge_type()));
reverseEdge.set_ranking(edge.get_ranking());
reverseEdge.set_dst(edge.get_src());

allEdges.emplace_back(std::move(edge));
allEdges.emplace_back(std::move(reverseEdge));
std::unordered_map<EdgeType, std::shared_ptr<ResultSchemaProvider>> edgeSchema;
auto *eschema = response.get_edge_schema();
if (eschema != nullptr) {
std::transform(eschema->cbegin(), eschema->cend(),
std::inserter(edgeSchema, edgeSchema.begin()), [](auto &schema) {
return std::make_pair(
schema.first,
std::make_shared<ResultSchemaProvider>(schema.second));
});
}

if (edgeSchema.empty()) {
LOG(ERROR) << "Can't find edge's schema";
doError(Status::Error("Can't find edge's schema"));
return;
}

for (auto &vdata : response.vertices) {
auto src = vdata.get_vertex_id();
for (auto &edata : vdata.get_edge_data()) {
auto edgeType = edata.get_type();
auto it = edgeSchema.find(edgeType);
if (it == edgeSchema.end()) {
LOG(ERROR) << "Can't find " << edgeType;
doError(Status::Error("Can't find edge type %d", edgeType));
return;
}

for (auto& edge : edata.get_edges()) {
auto dst = edge.get_dst();
auto reader = RowReader::getRowReader(edge.get_props(), it->second);
if (reader == nullptr) {
LOG(ERROR) << "Can't get row reader";
doError(Status::Error("Can't get reader! edge type %d", edgeType));
return;
}

auto rankRes = RowReader::getPropByName(reader.get(), _RANK);
if (!ok(rankRes)) {
LOG(ERROR) << "Can't get rank " << edgeType;
doError(Status::Error("Can't get rank! edge type %d", edgeType));
return;
}

auto rank = boost::get<int64_t>(value(rankRes));
storage::cpp2::EdgeKey edgeKey;
edgeKey.set_src(src);
edgeKey.set_edge_type(edgeType);
edgeKey.set_ranking(rank);
edgeKey.set_dst(dst);
allEdges.emplace_back(std::move(edgeKey));

storage::cpp2::EdgeKey reverseEdgeKey;
reverseEdgeKey.set_src(dst);
reverseEdgeKey.set_edge_type(-1 * edgeType);
reverseEdgeKey.set_ranking(rank);
reverseEdgeKey.set_dst(src);
allEdges.emplace_back(std::move(reverseEdgeKey));
}
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/graph/GoExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,10 @@ void GoExecutor::stepOut() {
VLOG(1) << "edge type size: " << edgeTypes_.size()
<< " return cols: " << returns.size();
auto future = ectx()->getStorageClient()->getNeighbors(spaceId,
starts_,
edgeTypes_,
filterPushdown,
std::move(returns));
starts_,
edgeTypes_,
filterPushdown,
std::move(returns));
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&result) {
auto completeness = result.completeness();
Expand Down
10 changes: 5 additions & 5 deletions src/graph/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ Status Collector::collectWithoutSchema(VariantType &var, RowWriter *writer) {
}

OptVariantType Collector::getProp(const meta::SchemaProviderIf *schema,
const std::string &prop,
const RowReader *reader) {
const std::string &prop,
const RowReader *reader) {
DCHECK(reader != nullptr);
DCHECK(schema != nullptr);
using nebula::cpp2::SupportedType;
Expand Down Expand Up @@ -280,9 +280,9 @@ OptVariantType Collector::getProp(const meta::SchemaProviderIf *schema,
}

Status Collector::getSchema(const std::vector<VariantType> &vals,
const std::vector<std::string> &colNames,
const std::vector<nebula::cpp2::SupportedType> &colTypes,
SchemaWriter *outputSchema) {
const std::vector<std::string> &colNames,
const std::vector<nebula::cpp2::SupportedType> &colTypes,
SchemaWriter *outputSchema) {
DCHECK(outputSchema != nullptr);
DCHECK_EQ(vals.size(), colNames.size());
DCHECK_EQ(vals.size(), colTypes.size());
Expand Down
74 changes: 63 additions & 11 deletions src/graph/test/DeleteVerticesTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ TEST_F(DeleteVerticesTest, Base) {
};
ASSERT_TRUE(verifyResult(resp, expected));
}
{
cpp2::ExecutionResponse resp;
auto *fmt = "GO FROM %ld OVER like REVERSELY";
auto query = folly::stringPrintf(fmt, players_["Tony Parker"].vid());
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<std::tuple<int64_t>> expected = {
{players_["Tim Duncan"].vid()},
{players_["Marco Belinelli"].vid()},
{players_["LaMarcus Aldridge"].vid()},
{players_["Boris Diaw"].vid()},
{players_["Dejounte Murray"].vid()},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
{
cpp2::ExecutionResponse resp;
auto &player = players_["Tony Parker"];
Expand Down Expand Up @@ -75,17 +90,6 @@ TEST_F(DeleteVerticesTest, Base) {
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
// Check again
{
cpp2::ExecutionResponse resp;
auto *fmt = "GO FROM %ld OVER like";
auto query = folly::stringPrintf(fmt, players_["Boris Diaw"].vid());
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<std::tuple<int64_t>> expected = {
{players_["Tim Duncan"].vid()},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
{
cpp2::ExecutionResponse resp;
auto &player = players_["Tony Parker"];
Expand All @@ -111,6 +115,33 @@ TEST_F(DeleteVerticesTest, Base) {
};
ASSERT_TRUE(verifyResult(resp, expected));
}
{
LOG(INFO) << "Boris Diaw ID " << players_["Boris Diaw"].vid();
cpp2::ExecutionResponse resp;
auto *fmt = "GO FROM %ld OVER like";
auto query = folly::stringPrintf(fmt, players_["Boris Diaw"].vid());
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<std::tuple<int64_t>> expected = {
{players_["Tim Duncan"].vid()},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
{
LOG(INFO) << "Tim Duncan ID " << players_["Tim Duncan"].vid();
LOG(INFO) << "Marco Belinelli ID " << players_["Marco Belinelli"].vid();
LOG(INFO) << "LaMarcus Aldridge ID " << players_["LaMarcus Aldridge"].vid();
LOG(INFO) << "Boris Diaw ID " << players_["Boris Diaw"].vid();
LOG(INFO) << "Dejounte Murray ID " << players_["Dejounte Murray"].vid();
cpp2::ExecutionResponse resp;
auto *fmt = "GO FROM %ld OVER like REVERSELY";
auto query = folly::stringPrintf(fmt, players_["Tony Parker"].vid());
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<std::tuple<int64_t>> expected = {
};
ASSERT_TRUE(verifyResult(resp, expected));
}
}

TEST_F(DeleteVerticesTest, DeleteMultiVertices) {
Expand Down Expand Up @@ -167,6 +198,17 @@ TEST_F(DeleteVerticesTest, DeleteWithHash) {
};
ASSERT_TRUE(verifyResult(resp, expected));
}
{
cpp2::ExecutionResponse resp;
auto *fmt = "GO FROM %ld OVER like REVERSELY";
auto query = folly::stringPrintf(fmt, players_["Grant Hill"].vid());
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<std::tuple<int64_t>> expected = {
{players_["Tracy McGrady"].vid()},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
{
cpp2::ExecutionResponse resp;
auto &player = players_["Grant Hill"];
Expand Down Expand Up @@ -215,6 +257,16 @@ TEST_F(DeleteVerticesTest, DeleteWithHash) {
};
ASSERT_TRUE(verifyResult(resp, expected));
}
{
cpp2::ExecutionResponse resp;
auto *fmt = "GO FROM %ld OVER like REVERSELY";
auto query = folly::stringPrintf(fmt, players_["Grant Hill"].vid());
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<std::tuple<int64_t>> expected = {
};
ASSERT_TRUE(verifyResult(resp, expected));
}
{
cpp2::ExecutionResponse resp;
auto &player = players_["Grant Hill"];
Expand Down
11 changes: 0 additions & 11 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,6 @@ struct AddEdgesRequest {
3: bool overwritable,
}

struct EdgeKeysRequest {
1: common.GraphSpaceID space_id,
2: map<common.PartitionID, list<common.VertexID>>(cpp.template = "std::unordered_map") parts,
}

struct EdgeKeysResponse {
1: required ResponseCommon result,
2: optional map<common.VertexID, list<EdgeKey>>(cpp.template = "std::unordered_map") edge_keys,
}

struct DeleteVerticesRequest {
1: common.GraphSpaceID space_id,
2: map<common.PartitionID, list<common.VertexID>>(cpp.template = "std::unordered_map") parts,
Expand Down Expand Up @@ -478,7 +468,6 @@ service StorageService {
ExecResponse addVertices(1: AddVerticesRequest req);
ExecResponse addEdges(1: AddEdgesRequest req);

EdgeKeysResponse getEdgeKeys(1: EdgeKeysRequest req);
ExecResponse deleteEdges(1: DeleteEdgesRequest req);
ExecResponse deleteVertices(1: DeleteVerticesRequest req);

Expand Down
1 change: 0 additions & 1 deletion src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ nebula_add_library(
query/QueryVertexPropsProcessor.cpp
query/QueryEdgePropsProcessor.cpp
query/QueryStatsProcessor.cpp
query/QueryEdgeKeysProcessor.cpp
query/ScanEdgeProcessor.cpp
query/ScanVertexProcessor.cpp
mutate/AddVerticesProcessor.cpp
Expand Down
7 changes: 0 additions & 7 deletions src/storage/StorageServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "storage/query/QueryEdgePropsProcessor.h"
#include "storage/query/QueryStatsProcessor.h"
#include "storage/query/GetUUIDProcessor.h"
#include "storage/query/QueryEdgeKeysProcessor.h"
#include "storage/query/ScanEdgeProcessor.h"
#include "storage/query/ScanVertexProcessor.h"
#include "storage/mutate/AddVerticesProcessor.h"
Expand Down Expand Up @@ -99,12 +98,6 @@ StorageServiceHandler::future_addEdges(const cpp2::AddEdgesRequest& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::EdgeKeysResponse>
StorageServiceHandler::future_getEdgeKeys(const cpp2::EdgeKeysRequest& req) {
auto* processor = QueryEdgeKeysProcessor::instance(kvstore_, schemaMan_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResponse>
StorageServiceHandler::future_deleteVertices(const cpp2::DeleteVerticesRequest& req) {
auto* processor = DeleteVerticesProcessor::instance(kvstore_,
Expand Down
3 changes: 0 additions & 3 deletions src/storage/StorageServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ class StorageServiceHandler final : public cpp2::StorageServiceSvIf {
folly::Future<cpp2::ExecResponse>
future_addEdges(const cpp2::AddEdgesRequest& req) override;

folly::Future<cpp2::EdgeKeysResponse>
future_getEdgeKeys(const cpp2::EdgeKeysRequest& req) override;

folly::Future<cpp2::ExecResponse>
future_deleteEdges(const cpp2::DeleteEdgesRequest& req) override;

Expand Down
30 changes: 0 additions & 30 deletions src/storage/client/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,36 +248,6 @@ folly::SemiFuture<StorageRpcResponse<cpp2::EdgePropResponse>> StorageClient::get
});
}

folly::SemiFuture<StorageRpcResponse<storage::cpp2::EdgeKeysResponse>> StorageClient::getEdgeKeys(
GraphSpaceID space,
std::vector<VertexID> vids,
folly::EventBase* evb) {
auto status = clusterIdsToHosts(space, vids, [] (const VertexID v) { return v; });

if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::EdgeKeysResponse>>(
std::runtime_error(status.status().toString()));
}

auto& clusters = status.value();
std::unordered_map<HostAddr, cpp2::EdgeKeysRequest> requests;
for (auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.set_space_id(space);
req.set_parts(std::move(c.second));
}

return collectResponse(
evb, std::move(requests),
[](cpp2::StorageServiceAsyncClient* client,
const cpp2::EdgeKeysRequest& r) {
return client->future_getEdgeKeys(r);},
[](const std::pair<const PartitionID, std::vector<VertexID>>& p) {
return p.first;
});
}

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> StorageClient::deleteEdges(
GraphSpaceID space,
std::vector<storage::cpp2::EdgeKey> edges,
Expand Down
5 changes: 0 additions & 5 deletions src/storage/client/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,6 @@ class StorageClient {
std::vector<storage::cpp2::PropDef> returnCols,
folly::EventBase* evb = nullptr);

folly::SemiFuture<StorageRpcResponse<storage::cpp2::EdgeKeysResponse>> getEdgeKeys(
GraphSpaceID space,
std::vector<VertexID> vids,
folly::EventBase* evb = nullptr);

folly::SemiFuture<StorageRpcResponse<storage::cpp2::ExecResponse>> deleteEdges(
GraphSpaceID space,
std::vector<storage::cpp2::EdgeKey> edges,
Expand Down
Loading

0 comments on commit eebfde4

Please sign in to comment.