Skip to content

Commit

Permalink
Check if inputs has duplicate column. (vesoft-inc#1841)
Browse files Browse the repository at this point in the history
* Delete DCHECK in SchemaWriter::appendCol and refactor the part of col_types genaration..

* Return error when get rows failed from interim result.

* Refactored the feedResult part.

* Check if inputs has duplicate column.

Co-authored-by: dutor <440396+dutor@users.noreply.github.com>
  • Loading branch information
CPWstatic and dutor authored Mar 3, 2020
1 parent f048b76 commit 6929e91
Show file tree
Hide file tree
Showing 29 changed files with 423 additions and 235 deletions.
1 change: 0 additions & 1 deletion src/dataman/SchemaWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ SchemaWriter& SchemaWriter::appendCol(folly::StringPiece name,
ValueType&& type) noexcept {
using folly::hash::SpookyHashV2;
uint64_t hash = SpookyHashV2::Hash64(name.data(), name.size(), 0);
DCHECK(nameIndex_.find(hash) == nameIndex_.end());

ColumnDef col;
col.set_name(name.toString());
Expand Down
4 changes: 4 additions & 0 deletions src/graph/FetchEdgesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ Status FetchEdgesExecutor::setupEdgeKeysFromRef() {
return Status::OK();
}

auto status = checkIfDuplicateColumn();
if (!status.ok()) {
return status;
}
auto ret = inputs->getVIDs(*srcid_);
if (!ret.ok()) {
return ret.status();
Expand Down
19 changes: 5 additions & 14 deletions src/graph/FetchExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,10 @@ Status FetchExecutor::prepareYield() {
returnColNames_.emplace_back(*col->alias());
}

// such as YIELD 1+1, it has not type in schema, the type from the eval()
colTypes_.emplace_back(nebula::cpp2::SupportedType::UNKNOWN);
if (col->expr()->isAliasExpression()) {
auto prop = *static_cast<AliasPropertyExpression*>(col->expr())->prop();
auto type = labelSchema_->getFieldType(prop);
if (type != CommonConstants::kInvalidValueType()) {
colTypes_.back() = type.get_type();
}
} else if (col->expr()->isTypeCastingExpression()) {
// type cast
auto exprPtr = dynamic_cast<TypeCastingExpression*>(col->expr());
colTypes_.back() = SchemaHelper::columnTypeToSupportedType(exprPtr->getType());
}
auto type = calculateExprType(col->expr());
colTypes_.emplace_back(type);

VLOG(1) << "type: " << static_cast<int64_t>(colTypes_.back());
}

if (expCtx_->hasSrcTagProp() || expCtx_->hasDstTagProp()) {
Expand Down Expand Up @@ -122,7 +113,7 @@ Status FetchExecutor::getOutputSchema(
};
getters.getEdgeDstId = [schema,
reader] (const std::string&) -> OptVariantType {
return Collector::getProp(schema, "_dst", reader);
return Collector::getProp(schema, _DST, reader);
};
std::vector<VariantType> record;
for (auto *column : yields_) {
Expand Down
5 changes: 0 additions & 5 deletions src/graph/FetchExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ class FetchExecutor : public TraverseExecutor {
explicit FetchExecutor(ExecutionContext *ectx,
const std::string &statsName) : TraverseExecutor(ectx, statsName) {}

void feedResult(std::unique_ptr<InterimResult> result) override {
inputs_ = std::move(result);
}

void setupResponse(cpp2::ExecutionResponse &resp) override;

protected:
Expand Down Expand Up @@ -49,7 +45,6 @@ class FetchExecutor : public TraverseExecutor {
std::vector<YieldColumn*> yields_;
std::unique_ptr<YieldColumns> yieldColsHolder_;
bool distinct_{false};
std::unique_ptr<InterimResult> inputs_;
std::vector<std::string> resultColNames_;
std::vector<std::string> returnColNames_;
std::unique_ptr<cpp2::ExecutionResponse> resp_;
Expand Down
4 changes: 4 additions & 0 deletions src/graph/FetchVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ Status FetchVerticesExecutor::setupVidsFromRef() {
return Status::OK();
}

auto status = checkIfDuplicateColumn();
if (!status.ok()) {
return status;
}
StatusOr<std::vector<VertexID>> result;
if (distinct_) {
result = inputs->getDistinctVIDs(*colname_);
Expand Down
4 changes: 4 additions & 0 deletions src/graph/FindPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,10 @@ Status FindPathExecutor::setupVidsFromRef(Clause::Vertices &vertices) {
}
}

auto status = checkIfDuplicateColumn();
if (!status.ok()) {
return status;
}
auto result = inputs->getDistinctVIDs(*(vertices.colname_));
if (!result.ok()) {
return std::move(result).status();
Expand Down
5 changes: 0 additions & 5 deletions src/graph/FindPathExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ class FindPathExecutor final : public TraverseExecutor {

void execute() override;

void feedResult(std::unique_ptr<InterimResult> result) override {
inputs_ = std::move(result);
}

void setupResponse(cpp2::ExecutionResponse &resp) override;

static std::string buildPathString(const Path &path);
Expand Down Expand Up @@ -111,7 +107,6 @@ class FindPathExecutor final : public TraverseExecutor {
Clause::Step step_;
Clause::Where where_;
bool shortest_{false};
std::unique_ptr<InterimResult> inputs_;
using SchemaPropIndex = std::unordered_map<std::pair<std::string, std::string>, int64_t>;
SchemaPropIndex srcTagProps_;
SchemaPropIndex dstTagProps_;
Expand Down
84 changes: 4 additions & 80 deletions src/graph/GoExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,6 @@ void GoExecutor::execute() {
}


void GoExecutor::feedResult(std::unique_ptr<InterimResult> result) {
inputs_ = std::move(result);
}


Status GoExecutor::prepareStep() {
auto *clause = sentence_->stepClause();
if (clause != nullptr) {
Expand Down Expand Up @@ -431,6 +426,10 @@ Status GoExecutor::setupStarts() {
return Status::OK();
}

auto status = checkIfDuplicateColumn();
if (!status.ok()) {
return status;
}
auto result = inputs->getVIDs(*colname_);
if (!result.ok()) {
LOG(ERROR) << "Get vid fail: " << *colname_;
Expand Down Expand Up @@ -1261,80 +1260,5 @@ OptVariantType GoExecutor::getPropFromInterim(VertexID id, const std::string &pr
DCHECK(index_ != nullptr);
return index_->getColumnWithVID(rootId, prop);
}


SupportedType GoExecutor::getPropTypeFromInterim(const std::string &prop) const {
DCHECK(index_ != nullptr);
return index_->getColumnType(prop);
}

nebula::cpp2::SupportedType GoExecutor::calculateExprType(Expression* exp) const {
auto spaceId = ectx()->rctx()->session()->space();
switch (exp->kind()) {
case Expression::kPrimary:
case Expression::kFunctionCall:
case Expression::kUnary:
case Expression::kArithmetic: {
return nebula::cpp2::SupportedType::UNKNOWN;
}
case Expression::kTypeCasting: {
auto exprPtr = static_cast<const TypeCastingExpression *>(exp);
return SchemaHelper::columnTypeToSupportedType(
exprPtr->getType());
}
case Expression::kRelational:
case Expression::kLogical: {
return nebula::cpp2::SupportedType::BOOL;
}
case Expression::kDestProp:
case Expression::kSourceProp: {
auto* tagPropExp = static_cast<const AliasPropertyExpression*>(exp);
const auto* tagName = tagPropExp->alias();
const auto* propName = tagPropExp->prop();
auto tagIdRet = ectx()->schemaManager()->toTagID(spaceId, *tagName);
if (tagIdRet.ok()) {
auto ts = ectx()->schemaManager()->getTagSchema(spaceId, tagIdRet.value());
if (ts != nullptr) {
return ts->getFieldType(*propName).type;
}
}
return nebula::cpp2::SupportedType::UNKNOWN;
}
case Expression::kEdgeDstId:
case Expression::kEdgeSrcId: {
return nebula::cpp2::SupportedType::VID;
}
case Expression::kEdgeRank:
case Expression::kEdgeType: {
return nebula::cpp2::SupportedType::INT;
}
case Expression::kAliasProp: {
auto* edgeExp = static_cast<const AliasPropertyExpression*>(exp);
const auto* propName = edgeExp->prop();
auto edgeStatus = ectx()->schemaManager()->toEdgeType(spaceId, *edgeExp->alias());
if (edgeStatus.ok()) {
auto edgeType = edgeStatus.value();
auto schema = ectx()->schemaManager()->getEdgeSchema(spaceId, edgeType);
if (schema != nullptr) {
return schema->getFieldType(*propName).type;
}
}
return nebula::cpp2::SupportedType::UNKNOWN;
}
case Expression::kVariableProp:
case Expression::kInputProp: {
auto* propExp = static_cast<const AliasPropertyExpression*>(exp);
const auto* propName = propExp->prop();
return getPropTypeFromInterim(*propName);
}
default: {
VLOG(1) << "Unsupport expression type! kind = "
<< std::to_string(static_cast<uint8_t>(exp->kind()));
return nebula::cpp2::SupportedType::UNKNOWN;
}
}
}


} // namespace graph
} // namespace nebula
7 changes: 0 additions & 7 deletions src/graph/GoExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ class GoExecutor final : public TraverseExecutor {

void execute() override;

void feedResult(std::unique_ptr<InterimResult> result) override;

void setupResponse(cpp2::ExecutionResponse &resp) override;

private:
Expand Down Expand Up @@ -198,10 +196,6 @@ class GoExecutor final : public TraverseExecutor {

OptVariantType getPropFromInterim(VertexID id, const std::string &prop) const;

nebula::cpp2::SupportedType getPropTypeFromInterim(const std::string &prop) const;

nebula::cpp2::SupportedType calculateExprType(Expression* exp) const;

enum FromType {
kInstantExpr,
kVariable,
Expand All @@ -223,7 +217,6 @@ class GoExecutor final : public TraverseExecutor {
std::unique_ptr<YieldClauseWrapper> yieldClauseWrapper_;
bool distinct_{false};
bool distinctPushDown_{false};
std::unique_ptr<InterimResult> inputs_;
using InterimIndex = InterimResult::InterimResultIndex;
std::unique_ptr<InterimIndex> index_;
std::unique_ptr<ExpressionContext> expCtx_;
Expand Down
46 changes: 20 additions & 26 deletions src/graph/GroupByExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,26 @@ Status GroupByExecutor::checkAll() {
void GroupByExecutor::execute() {
FLOG_INFO("Executing Group by: %s", sentence_->toString().c_str());

if (rows_.empty()) {
if (inputs_ == nullptr || !inputs_->hasData()) {
onEmptyInputs();
return;
}

auto status = checkAll();
auto status = checkIfDuplicateColumn();
if (!status.ok()) {
doError(std::move(status));
return;
}
auto ret = inputs_->getRows();
if (!ret.ok()) {
LOG(ERROR) << "Get rows failed: " << ret.status();
doError(std::move(ret).status());
return;
}
rows_ = std::move(ret).value();
schema_ = inputs_->schema();

status = checkAll();
if (!status.ok()) {
doError(std::move(status));
return;
Expand All @@ -205,12 +219,12 @@ void GroupByExecutor::execute() {
}

if (onResult_) {
auto ret = setupInterimResult();
if (!ret.ok()) {
doError(std::move(ret).status());
auto result = setupInterimResult();
if (!result.ok()) {
doError(std::move(result).status());
return;
}
onResult_(std::move(ret).value());
onResult_(std::move(result).value());
}

doFinish(Executor::ProcessControl::kNext);
Expand Down Expand Up @@ -329,26 +343,6 @@ std::vector<std::string> GroupByExecutor::getResultColumnNames() const {
}


void GroupByExecutor::feedResult(std::unique_ptr<InterimResult> result) {
if (result == nullptr) {
LOG(ERROR) << "result is nullptr";
return;
}

if (!result->hasData()) {
return;
}

auto ret = result->getRows();
if (!ret.ok()) {
LOG(ERROR) << "Get rows failed: " << ret.status();
return;
}
rows_ = std::move(ret).value();
schema_ = result->schema();
}


Status GroupByExecutor::generateOutputSchema() {
using nebula::cpp2::SupportedType;
if (resultSchema_ == nullptr) {
Expand Down
2 changes: 0 additions & 2 deletions src/graph/GroupByExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class GroupByExecutor final : public TraverseExecutor {

void execute() override;

void feedResult(std::unique_ptr<InterimResult> result) override;

void setupResponse(cpp2::ExecutionResponse &resp) override;

private:
Expand Down
Loading

0 comments on commit 6929e91

Please sign in to comment.