Skip to content

Commit

Permalink
allow multi fulltext index on a tag/edge (vesoft-inc#1976)
Browse files Browse the repository at this point in the history
* allow multi fulltext index on a tag/edge

* resolve conflicts

Co-authored-by: hs.zhang <22708345+cangfengzhs@users.noreply.github.com>
Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 26, 2022
1 parent 846bb13 commit 02d5f9e
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 78 deletions.
27 changes: 24 additions & 3 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4295,8 +4295,8 @@ StatusOr<std::unordered_map<std::string, cpp2::FTIndex>> MetaClient::getFTIndexB
return indexes;
}

StatusOr<std::pair<std::string, cpp2::FTIndex>> MetaClient::getFTIndexBySpaceSchemaFromCache(
GraphSpaceID spaceId, int32_t schemaId) {
StatusOr<std::pair<std::string, cpp2::FTIndex>> MetaClient::getFTIndexFromCache(
GraphSpaceID spaceId, int32_t schemaId, const std::string& field) {
if (!ready_) {
return Status::Error("Not ready!");
}
Expand All @@ -4306,13 +4306,34 @@ StatusOr<std::pair<std::string, cpp2::FTIndex>> MetaClient::getFTIndexBySpaceSch
auto id = it.second.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type
? it.second.get_depend_schema().get_edge_type()
: it.second.get_depend_schema().get_tag_id();
if (it.second.get_space_id() == spaceId && id == schemaId) {
// There will only be one field. However, in order to minimize changes, the IDL was not modified
auto f = it.second.fields()->front();
if (it.second.get_space_id() == spaceId && id == schemaId && f == field) {
return std::make_pair(it.first, it.second);
}
}
return Status::IndexNotFound();
}

StatusOr<std::unordered_map<std::string, cpp2::FTIndex>> MetaClient::getFTIndexFromCache(
GraphSpaceID spaceId, int32_t schemaId) {
if (!ready_) {
return Status::Error("Not ready!");
}
folly::rcu_reader guard;
const auto& metadata = *metadata_.load();
std::unordered_map<std::string, cpp2::FTIndex> ret;
for (auto& it : metadata.fulltextIndexMap_) {
auto id = it.second.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type
? it.second.get_depend_schema().get_edge_type()
: it.second.get_depend_schema().get_tag_id();
if (it.second.get_space_id() == spaceId && id == schemaId) {
ret[it.first] = it.second;
}
}
return ret;
}

StatusOr<cpp2::FTIndex> MetaClient::getFTIndexByNameFromCache(GraphSpaceID spaceId,
const std::string& name) {
if (!ready_) {
Expand Down
8 changes: 6 additions & 2 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,12 @@ class MetaClient : public BaseMetaClient {
StatusOr<std::unordered_map<std::string, cpp2::FTIndex>> getFTIndexBySpaceFromCache(
GraphSpaceID spaceId);

StatusOr<std::pair<std::string, cpp2::FTIndex>> getFTIndexBySpaceSchemaFromCache(
GraphSpaceID spaceId, int32_t schemaId);
StatusOr<std::pair<std::string, cpp2::FTIndex>> getFTIndexFromCache(GraphSpaceID spaceId,
int32_t schemaId,
const std::string& field);

StatusOr<std::unordered_map<std::string, cpp2::FTIndex>> getFTIndexFromCache(GraphSpaceID spaceId,
int32_t schemaId);

StatusOr<cpp2::FTIndex> getFTIndexByNameFromCache(GraphSpaceID spaceId, const std::string& name);

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/SchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class SchemaManager {
StatusOr<std::pair<bool, int32_t>> getSchemaIDByName(GraphSpaceID space,
folly::StringPiece schemaName);

virtual StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
virtual StatusOr<std::unordered_map<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
GraphSpaceID spaceId, int32_t schemaId) = 0;

protected:
Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/ServerBasedSchemaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ StatusOr<EdgeSchema> ServerBasedSchemaManager::getAllLatestVerEdgeSchema(GraphSp
return metaClient_->getAllLatestVerEdgeSchemaFromCache(space);
}

StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> ServerBasedSchemaManager::getFTIndex(
GraphSpaceID spaceId, int32_t schemaId) {
auto ret = metaClient_->getFTIndexBySpaceSchemaFromCache(spaceId, schemaId);
StatusOr<std::unordered_map<std::string, nebula::meta::cpp2::FTIndex>>
ServerBasedSchemaManager::getFTIndex(GraphSpaceID spaceId, int32_t schemaId) {
auto ret = metaClient_->getFTIndexFromCache(spaceId, schemaId);
if (!ret.ok()) {
return ret.status();
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/ServerBasedSchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class ServerBasedSchemaManager : public SchemaManager {
// get all latest version of all edges
StatusOr<EdgeSchema> getAllLatestVerEdgeSchema(GraphSpaceID space) override;

StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
StatusOr<std::unordered_map<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
GraphSpaceID spaceId, int32_t schemaId) override;

void init(MetaClient *client);
Expand Down
13 changes: 3 additions & 10 deletions src/graph/validator/LookupValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,19 +590,12 @@ StatusOr<Expression*> LookupValidator::checkConstExpr(Expression* expr,

// Check does test search contains properties search in test search expression
StatusOr<std::string> LookupValidator::checkTSExpr(Expression* expr) {
auto tsExpr = static_cast<TextSearchExpression*>(expr);
auto prop = tsExpr->arg()->prop();
auto metaClient = qctx_->getMetaClient();
auto tsi = metaClient->getFTIndexBySpaceSchemaFromCache(spaceId(), schemaId());
auto tsi = metaClient->getFTIndexFromCache(spaceId(), schemaId(), prop);
NG_RETURN_IF_ERROR(tsi);
auto tsName = tsi.value().first;

auto ftFields = tsi.value().second.get_fields();
auto tsExpr = static_cast<TextSearchExpression*>(expr);
auto prop = tsExpr->arg()->prop();

auto iter = std::find(ftFields.begin(), ftFields.end(), prop);
if (iter == ftFields.end()) {
return Status::SemanticError("Column %s not found in %s", prop.c_str(), tsName.c_str());
}
return tsName;
}

Expand Down
4 changes: 2 additions & 2 deletions src/graph/validator/test/MockSchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ class MockSchemaManager final : public nebula::meta::SchemaManager {
LOG(FATAL) << "Unimplemented.";
}

StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(GraphSpaceID,
int32_t) override {
StatusOr<std::unordered_map<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
GraphSpaceID, int32_t) override {
LOG(FATAL) << "Unimplemented";
return Status::Error("Unimplemented");
}
Expand Down
52 changes: 28 additions & 24 deletions src/kvstore/listener/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,20 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
LOG(ERROR) << "get tag reader failed, tagID " << tagId;
return;
}
if (ftIndex.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = ftIndex.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
for (auto& index : ftIndex) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
}
std::string indexName = index.first;
std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString();
std::string text = std::move(v).getStr();
callback(type, indexName, vid, "", "", 0, text);
}
std::string indexName = ftIndex.first;
std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString();
std::string text = std::move(v).getStr();
callback(type, indexName, vid, "", "", 0, text);
} else if (nebula::NebulaKeyUtils::isEdge(vIdLen_, key)) {
auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, key);
auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, edgeType);
Expand All @@ -115,20 +117,22 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
LOG(ERROR) << "get edge reader failed, schema ID " << edgeType;
return;
}
if (ftIndex.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = ftIndex.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
for (auto& index : ftIndex) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
}
std::string indexName = index.first;
std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString();
std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString();
int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key);
std::string text = std::move(v).getStr();
callback(type, indexName, "", src, dst, rank, text);
}
std::string indexName = ftIndex.first;
std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString();
std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString();
int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key);
std::string text = std::move(v).getStr();
callback(type, indexName, "", src, dst, rank, text);
}
}

Expand Down
20 changes: 12 additions & 8 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,8 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::IndexItem>> BaseProcessor<RES
}

template <typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex> BaseProcessor<RESP>::getFTIndex(
GraphSpaceID spaceId, int32_t tagOrEdge) {
ErrorOr<nebula::cpp2::ErrorCode, std::unordered_map<std::string, cpp2::FTIndex>>
BaseProcessor<RESP>::getFTIndex(GraphSpaceID spaceId, int32_t tagOrEdge) {
const auto& indexPrefix = MetaKeyUtils::fulltextIndexPrefix();
auto iterRet = doPrefix(indexPrefix);
if (!nebula::ok(iterRet)) {
Expand All @@ -438,18 +438,18 @@ ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex> BaseProcessor<RESP>::getFTIndex(
return retCode;
}
auto indexIter = nebula::value(iterRet).get();

std::unordered_map<std::string, cpp2::FTIndex> ret;
while (indexIter->valid()) {
auto index = MetaKeyUtils::parsefulltextIndex(indexIter->val());
auto id = index.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type
? index.get_depend_schema().get_edge_type()
: index.get_depend_schema().get_tag_id();
if (spaceId == index.get_space_id() && tagOrEdge == id) {
return index;
ret[indexIter->key().toString()] = index;
}
indexIter->next();
}
return nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND;
return ret;
}

template <typename RESP>
Expand Down Expand Up @@ -480,14 +480,18 @@ nebula::cpp2::ErrorCode BaseProcessor<RESP>::indexCheck(

template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::ftIndexCheck(
const std::vector<std::string>& cols, const std::vector<cpp2::AlterSchemaItem>& alterItems) {
const std::unordered_map<std::string, cpp2::FTIndex>& ftIndices,
const std::vector<cpp2::AlterSchemaItem>& alterItems) {
std::set<std::string> cols;
for (auto& [indexName, index] : ftIndices) {
cols.insert(index.fields_ref()->front());
}
for (const auto& item : alterItems) {
if (*item.op_ref() == nebula::meta::cpp2::AlterSchemaOp::CHANGE ||
*item.op_ref() == nebula::meta::cpp2::AlterSchemaOp::DROP) {
const auto& itemCols = item.get_schema().get_columns();
for (const auto& iCol : itemCols) {
auto it =
std::find_if(cols.begin(), cols.end(), [&](const auto& c) { return c == iCol.name; });
auto it = cols.find(iCol.name);
if (it != cols.end()) {
LOG(INFO) << "fulltext index conflict";
return nebula::cpp2::ErrorCode::E_RELATED_FULLTEXT_INDEX_EXISTS;
Expand Down
9 changes: 5 additions & 4 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,9 @@ class BaseProcessor {
* @param alterItems
* @return ErrorCode::E_RELATED_FULLTEXT_INDEX_EXISTS if contains
*/
nebula::cpp2::ErrorCode ftIndexCheck(const std::vector<std::string>& cols,
const std::vector<cpp2::AlterSchemaItem>& alterItems);
nebula::cpp2::ErrorCode ftIndexCheck(
const std::unordered_map<std::string, cpp2::FTIndex>& ftIndices,
const std::vector<cpp2::AlterSchemaItem>& alterItems);

/**
* @brief List all tag/edge index for given space and tag/edge id.
Expand All @@ -386,8 +387,8 @@ class BaseProcessor {
* @param tagOrEdge
* @return ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex>
*/
ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex> getFTIndex(GraphSpaceID spaceId,
int32_t tagOrEdge);
ErrorOr<nebula::cpp2::ErrorCode, std::unordered_map<std::string, cpp2::FTIndex>> getFTIndex(
GraphSpaceID spaceId, int32_t tagOrEdge);

/**
* @brief Check if index on given fields alredy exist.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/AlterEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) {
auto ftIdxRet = getFTIndex(spaceId, edgeType);
if (nebula::ok(ftIdxRet)) {
auto fti = std::move(nebula::value(ftIdxRet));
auto ftStatus = ftIndexCheck(fti.get_fields(), edgeItems);
auto ftStatus = ftIndexCheck(fti, edgeItems);
if (ftStatus != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(ftStatus);
onFinished();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/AlterTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void AlterTagProcessor::process(const cpp2::AlterTagReq& req) {
auto ftIdxRet = getFTIndex(spaceId, tagId);
if (nebula::ok(ftIdxRet)) {
auto fti = std::move(nebula::value(ftIdxRet));
auto ftStatus = ftIndexCheck(fti.get_fields(), tagItems);
auto ftStatus = ftIndexCheck(fti, tagItems);
if (ftStatus != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(ftStatus);
onFinished();
Expand Down
16 changes: 8 additions & 8 deletions src/meta/processors/schema/DropEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) {

auto ftIdxRet = getFTIndex(spaceId, edgeType);
if (nebula::ok(ftIdxRet)) {
LOG(INFO) << "Drop edge error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}

if (nebula::error(ftIdxRet) != nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND) {
if (!nebula::value(ftIdxRet).empty()) {
LOG(INFO) << "Drop edge error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}
} else {
handleErrorCode(nebula::error(ftIdxRet));
onFinished();
return;
Expand Down
16 changes: 8 additions & 8 deletions src/meta/processors/schema/DropTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) {

auto ftIdxRet = getFTIndex(spaceId, tagId);
if (nebula::ok(ftIdxRet)) {
LOG(INFO) << "Drop tag error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}

if (nebula::error(ftIdxRet) != nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND) {
if (!nebula::value(ftIdxRet).empty()) {
LOG(INFO) << "Drop tag error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}
} else {
handleErrorCode(nebula::error(ftIdxRet));
onFinished();
return;
Expand Down
4 changes: 2 additions & 2 deletions src/mock/AdHocSchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ class AdHocSchemaManager final : public nebula::meta::SchemaManager {
EdgeType edge,
SchemaVer ver);

StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(GraphSpaceID,
int32_t) override {
StatusOr<std::unordered_map<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
GraphSpaceID, int32_t) override {
LOG(FATAL) << "Unimplemented";
return Status::Error("Unimplemented");
}
Expand Down

0 comments on commit 02d5f9e

Please sign in to comment.