Skip to content

Commit

Permalink
Fix get wrong data when multi tags have same prop name (vesoft-inc#640)
Browse files Browse the repository at this point in the history
* Fix the same prop name of diff tags

* Rebase upstream

* Rebase upstream
  • Loading branch information
laura-ding authored and dutor committed Aug 2, 2019
1 parent 0e3150c commit 9ca22fd
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/dataman/RowReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ int32_t RowReader::getSchemaVer(folly::StringPiece row) {
}

// The first three bits indicate the number of bytes for the
// schena version. If the number is zero, no schema version
// schema version. If the number is zero, no schema version
// presents
size_t verBytes = *(it++) >> 5;
int32_t ver = 0;
Expand Down
63 changes: 61 additions & 2 deletions src/dataman/RowReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class RowReader {
folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema);

static ErrorOr<ResultType, VariantType> getProp(const RowReader* reader,
static ErrorOr<ResultType, VariantType> getPropByName(const RowReader* reader,
const std::string& prop) {
auto& vType = reader->getSchema()->getFieldType(prop);
switch (vType.type) {
Expand Down Expand Up @@ -143,7 +143,66 @@ class RowReader {
}
default:
LOG(FATAL) << "Unknown type: " << static_cast<int32_t>(vType.type);
return "";
return ResultType::E_DATA_INVALID;
}
}


static ErrorOr<ResultType, VariantType> getPropByIndex(const RowReader *reader,
int64_t index) {
auto& vType = reader->getSchema()->getFieldType(index);
switch (vType.get_type()) {
case nebula::cpp2::SupportedType::BOOL: {
bool v;
auto ret = reader->getBool(index, v);
if (ret != ResultType::SUCCEEDED) {
return ret;
}
return v;
}
case nebula::cpp2::SupportedType::INT: {
int64_t v;
auto ret = reader->getInt(index, v);
if (ret != ResultType::SUCCEEDED) {
return ret;
}
return v;
}
case nebula::cpp2::SupportedType::VID: {
VertexID v;
auto ret = reader->getVid(index, v);
if (ret != ResultType::SUCCEEDED) {
return ret;
}
return v;
}
case nebula::cpp2::SupportedType::FLOAT: {
float v;
auto ret = reader->getFloat(index, v);
if (ret != ResultType::SUCCEEDED) {
return ret;
}
return static_cast<double>(v);
}
case nebula::cpp2::SupportedType::DOUBLE: {
double v;
auto ret = reader->getDouble(index, v);
if (ret != ResultType::SUCCEEDED) {
return ret;
}
return v;
}
case nebula::cpp2::SupportedType::STRING: {
folly::StringPiece v;
auto ret = reader->getString(index, v);
if (ret != ResultType::SUCCEEDED) {
return ret;
}
return v.toString();
}
default:
LOG(FATAL) << "Unknown type: " << static_cast<int32_t>(vType.get_type());
return ResultType::E_DATA_INVALID;
}
}

Expand Down
96 changes: 68 additions & 28 deletions src/graph/GoExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
namespace nebula {
namespace graph {

using SchemaProps = std::unordered_map<std::string, std::vector<std::string>>;
using nebula::cpp2::SupportedType;

GoExecutor::GoExecutor(Sentence *sentence, ExecutionContext *ectx) : TraverseExecutor(ectx) {
// The RTTI is guaranteed by Sentence::Kind,
// so we use `static_cast' instead of `dynamic_cast' for the sake of efficiency.
Expand Down Expand Up @@ -393,7 +396,7 @@ void GoExecutor::finishExecution(RpcResponse &&rpcResp) {
}


StatusOr<std::vector<storage::cpp2::PropDef>> GoExecutor::getStepOutProps() const {
StatusOr<std::vector<storage::cpp2::PropDef>> GoExecutor::getStepOutProps() {
std::vector<storage::cpp2::PropDef> props;
{
storage::cpp2::PropDef pd;
Expand All @@ -407,18 +410,29 @@ StatusOr<std::vector<storage::cpp2::PropDef>> GoExecutor::getStepOutProps() cons
}

auto spaceId = ectx()->rctx()->session()->space();
SchemaProps tagProps;
for (auto &tagProp : expCtx_->srcTagProps()) {
storage::cpp2::PropDef pd;
pd.owner = storage::cpp2::PropOwner::SOURCE;
pd.name = tagProp.second;
auto status = ectx()->schemaManager()->toTagID(spaceId, tagProp.first);
tagProps[tagProp.first].emplace_back(tagProp.second);
}

int64_t index = -1;
for (auto &tagIt : tagProps) {
auto status = ectx()->schemaManager()->toTagID(spaceId, tagIt.first);
if (!status.ok()) {
return Status::Error("No schema found for '%s'", tagProp.first);
return Status::Error("No schema found for '%s'", tagIt.first);
}
auto tagId = status.value();
pd.set_tag_id(tagId);
props.emplace_back(std::move(pd));
for (auto &prop : tagIt.second) {
index++;
storage::cpp2::PropDef pd;
pd.owner = storage::cpp2::PropOwner::DEST;
pd.name = prop;
pd.set_tag_id(tagId);
props.emplace_back(std::move(pd));
srcTagProps_.emplace(std::make_pair(tagIt.first, prop), index);
}
}

for (auto &prop : expCtx_->edgeProps()) {
storage::cpp2::PropDef pd;
pd.owner = storage::cpp2::PropOwner::EDGE;
Expand All @@ -430,20 +444,30 @@ StatusOr<std::vector<storage::cpp2::PropDef>> GoExecutor::getStepOutProps() cons
}


StatusOr<std::vector<storage::cpp2::PropDef>> GoExecutor::getDstProps() const {
std::vector<storage::cpp2::PropDef> props;
StatusOr<std::vector<storage::cpp2::PropDef>> GoExecutor::getDstProps() {
auto spaceId = ectx()->rctx()->session()->space();
SchemaProps tagProps;
for (auto &tagProp : expCtx_->dstTagProps()) {
storage::cpp2::PropDef pd;
pd.owner = storage::cpp2::PropOwner::DEST;
pd.name = tagProp.second;
auto status = ectx()->schemaManager()->toTagID(spaceId, tagProp.first);
tagProps[tagProp.first].emplace_back(tagProp.second);
}

std::vector<storage::cpp2::PropDef> props;
int64_t index = -1;
for (auto &tagIt : tagProps) {
auto status = ectx()->schemaManager()->toTagID(spaceId, tagIt.first);
if (!status.ok()) {
return Status::Error("No schema found for '%s'", tagProp.first);
return Status::Error("No schema found for '%s'", tagIt.first);
}
auto tagId = status.value();
pd.set_tag_id(tagId);
props.emplace_back(std::move(pd));
for (auto &prop : tagIt.second) {
index++;
storage::cpp2::PropDef pd;
pd.owner = storage::cpp2::PropOwner::DEST;
pd.name = prop;
pd.set_tag_id(tagId);
props.emplace_back(std::move(pd));
dstTagProps_.emplace(std::make_pair(tagIt.first, prop), index);
}
}
return props;
}
Expand Down Expand Up @@ -509,7 +533,6 @@ std::unique_ptr<InterimResult> GoExecutor::setupInterimResult(RpcResponse &&rpcR
std::shared_ptr<SchemaWriter> schema;
std::unique_ptr<RowSetWriter> rsWriter;
auto uniqResult = std::make_unique<std::unordered_set<std::string>>();
using nebula::cpp2::SupportedType;
auto cb = [&] (std::vector<VariantType> record) {
if (schema == nullptr) {
schema = std::make_shared<SchemaWriter>();
Expand Down Expand Up @@ -576,6 +599,7 @@ std::unique_ptr<InterimResult> GoExecutor::setupInterimResult(RpcResponse &&rpcR
return std::make_unique<InterimResult>(std::move(rsWriter));
}


void GoExecutor::onEmptyInputs() {
if (onResult_) {
onResult_(nullptr);
Expand Down Expand Up @@ -614,19 +638,36 @@ void GoExecutor::processFinalResult(RpcResponse &rpcResp, Callback cb) const {
while (iter) {
auto &getters = expCtx_->getters();
getters.getEdgeProp = [&] (const std::string &prop) -> VariantType {
auto res = RowReader::getProp(&*iter, prop);
auto res = RowReader::getPropByName(&*iter, prop);
CHECK(ok(res));
return value(std::move(res));
};
getters.getSrcTagProp = [&] (const std::string&, const std::string &prop) {
auto res = RowReader::getProp(vreader.get(), prop);
getters.getSrcTagProp = [&] (const std::string &tagName, const std::string &prop) {
auto tagIter = this->srcTagProps_.find(std::make_pair(tagName, prop));
if (tagIter == this->srcTagProps_.end()) {
LOG(ERROR) << "Src tagName : " << tagName
<< ", propName : " << prop << " is not exist";
}
auto index = tagIter->second;
const nebula::cpp2::ValueType& type = vschema->getFieldType(index);
if (type == CommonConstants::kInvalidValueType()) {
LOG(ERROR) << "Tag : " << tagName <<" no schema for the index " << index;
}
auto res = RowReader::getPropByIndex(vreader.get(), index);
CHECK(ok(res));
return value(std::move(res));
};
getters.getDstTagProp = [&] (const std::string&, const std::string &prop) {
auto dst = RowReader::getProp(&*iter, "_dst");
CHECK(ok(dst));
return vertexHolder_->get(boost::get<int64_t>(value(std::move(dst))), prop);
getters.getDstTagProp = [&] (const std::string &tagName, const std::string &prop) {
auto res = RowReader::getPropByName(&*iter, "_dst");
CHECK(ok(res));
auto dst = value(std::move(res));
auto tagIter = this->dstTagProps_.find(std::make_pair(tagName, prop));
if (tagIter == this->dstTagProps_.end()) {
LOG(ERROR) << "Dst tagName : " << tagName
<< ", propName : " << prop << " is not exist";
}
auto index = tagIter->second;
return vertexHolder_->get(boost::get<int64_t>(dst), index);
};
// Evaluate filter
if (filter_ != nullptr) {
Expand All @@ -652,16 +693,15 @@ void GoExecutor::processFinalResult(RpcResponse &rpcResp, Callback cb) const {
}


VariantType GoExecutor::VertexHolder::get(VertexID id, const std::string &prop) const {
DCHECK(schema_ != nullptr);
VariantType GoExecutor::VertexHolder::get(VertexID id, int64_t index) const {
auto iter = data_.find(id);

// TODO(dutor) We need a type to represent NULL or non-existing prop
CHECK(iter != data_.end());

auto reader = RowReader::getRowReader(iter->second, schema_);

auto res = RowReader::getProp(reader.get(), prop);
auto res = RowReader::getPropByIndex(reader.get(), index);
CHECK(ok(res));
return value(std::move(res));
}
Expand Down
14 changes: 11 additions & 3 deletions src/graph/GoExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class GoExecutor final : public TraverseExecutor {
*/
void onVertexProps(RpcResponse &&rpcResp);

StatusOr<std::vector<storage::cpp2::PropDef>> getStepOutProps() const;
StatusOr<std::vector<storage::cpp2::PropDef>> getDstProps() const;
StatusOr<std::vector<storage::cpp2::PropDef>> getStepOutProps();
StatusOr<std::vector<storage::cpp2::PropDef>> getDstProps();

void fetchVertexProps(std::vector<VertexID> ids, RpcResponse &&rpcResp);

Expand Down Expand Up @@ -154,13 +154,17 @@ class GoExecutor final : public TraverseExecutor {
*/
class VertexHolder final {
public:
VariantType get(VertexID id, const std::string &prop) const;
VariantType get(VertexID id, int64_t index) const;
void add(const storage::cpp2::QueryResponse &resp);
const auto* schema() const {
return schema_.get();
}

private:
// The schema include multi vertexes, and multi tags of one vertex
// eg: get 3 vertexex, vertex A has tag1.prop1, vertex B has tag2.prop2,
// vertex C has tag3.prop3,
// and the schema is {[tag1.prop1, type], [tag2.prop2, type], [tag3.prop3, type]}
std::shared_ptr<ResultSchemaProvider> schema_;
std::unordered_map<VertexID, std::string> data_;
};
Expand All @@ -183,6 +187,10 @@ class GoExecutor final : public TraverseExecutor {
std::vector<VertexID> starts_;
std::unique_ptr<VertexHolder> vertexHolder_;
std::unique_ptr<cpp2::ExecutionResponse> resp_;
// The name of Tag or Edge, index of prop in data
using SchemaPropIndex = std::unordered_map<std::pair<std::string, std::string>, int64_t>;
SchemaPropIndex srcTagProps_;
SchemaPropIndex dstTagProps_;
};

} // namespace graph
Expand Down
76 changes: 76 additions & 0 deletions src/graph/test/DataTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,26 @@ AssertionResult DataTest::prepareSchema() {
<< " failed, error code "<< static_cast<int32_t>(code);
}
}
// Test same propName diff tyep in diff tags
{
cpp2::ExecutionResponse resp;
std::string cmd = "CREATE TAG employee(name int)";
auto code = client_->execute(cmd, resp);
if (cpp2::ErrorCode::SUCCEEDED != code) {
return TestError() << "Do cmd:" << cmd
<< " failed, error code "<< static_cast<int32_t>(code);
}
}
// Test same propName same type in diff tags
{
cpp2::ExecutionResponse resp;
std::string cmd = "CREATE TAG interest(name string)";
auto code = client_->execute(cmd, resp);
if (cpp2::ErrorCode::SUCCEEDED != code) {
return TestError() << "Do cmd:" << cmd
<< " failed, error code "<< static_cast<int32_t>(code);
}
}
sleep(FLAGS_load_data_interval_secs + 3);
return TestOK();
}
Expand Down Expand Up @@ -269,6 +289,62 @@ TEST_F(DataTest, InsertVertex) {
};
ASSERT_TRUE(verifyResult(resp, expected));
}
// Test same prop name diff type in diff tags
{
cpp2::ExecutionResponse resp;
std::string cmd = "INSERT VERTEX person(name, age),employee(name) "
"VALUES hash(\"Joy\"):(\"Joy\", 18, 123),"
"hash(\"Petter\"):(\"Petter\", 19, 456)";
auto code = client_->execute(cmd, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
{
cpp2::ExecutionResponse resp;
std::string cmd = "INSERT EDGE schoolmate(likeness) VALUES "
"hash(\"Joy\")->hash(\"Petter\"):(90)";
auto code = client_->execute(cmd, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
{
cpp2::ExecutionResponse resp;
std::string cmd = "GO FROM hash(\"Joy\") OVER schoolmate YIELD $^.person.name,"
"schoolmate.likeness, $$.person.name, $$.person.age,$$.employee.name";
auto code = client_->execute(cmd, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<std::tuple<std::string, int64_t, std::string, int64_t, int64_t>> expected = {
{"Joy", 90, "Petter", 19, 456},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
// Test same prop name same type in diff tags
{
cpp2::ExecutionResponse resp;
std::string cmd = "INSERT VERTEX person(name, age),interest(name) "
"VALUES hash(\"Bob\"):(\"Bob\", 19, \"basketball\")";
auto code = client_->execute(cmd, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
{
cpp2::ExecutionResponse resp;
std::string cmd = "INSERT EDGE schoolmate(likeness) VALUES "
"hash(\"Petter\")->hash(\"Bob\"):(90)";
auto code = client_->execute(cmd, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
{
cpp2::ExecutionResponse resp;
std::string cmd = "GO FROM hash(\"Petter\") OVER schoolmate "
"YIELD $^.person.name, $^.employee.name, "
"schoolmate.likeness, $$.person.name,"
"$$.interest.name, $$.person.age";
auto code = client_->execute(cmd, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
using type = std::tuple<std::string, int64_t, int64_t, std::string, std::string, int64_t>;
std::vector<type> expected = {
{"Petter", 456, 90, "Bob", "basketball", 19},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
// Insert wrong type
{
cpp2::ExecutionResponse resp;
Expand Down
Loading

0 comments on commit 9ca22fd

Please sign in to comment.