Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions python/zvec/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ class _Collection:
def Open(arg0: str, arg1: param.CollectionOption) -> _Collection: ...
def AddColumn(
self,
arg0: str,
arg1: schema._FieldSchema,
arg2: str,
arg3: param.AddColumnOption,
arg0: schema._FieldSchema,
arg1: str,
arg2: param.AddColumnOption,
) -> None: ...
def AlterColumn(
self,
Expand Down
4 changes: 1 addition & 3 deletions python/zvec/model/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ def add_column(
option (Optional[AddColumnOption], optional): Options for the operation.
Defaults to ``AddColumnOption()``.
"""
self._obj.AddColumn(
field_schema.name, field_schema._get_object(), expression, option
)
self._obj.AddColumn(field_schema._get_object(), expression, option)
self._schema = CollectionSchema._from_core(self._obj.Schema())

def drop_column(self, field_name: str) -> None:
Expand Down
2 changes: 1 addition & 1 deletion python/zvec/tool/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def require_module(module: str, mitigation: Optional[str] = None) -> Any:
package = mitigation or module
msg = f"Required package '{package}' is not installed. "
if "." in module:
top_level = module.split(".")[0]
top_level = module.split(".", maxsplit=1)[0]
msg += f"Module '{module}' is part of '{top_level}', "
if mitigation:
msg += f"please pip install '{mitigation}'."
Expand Down
5 changes: 2 additions & 3 deletions src/binding/python/model/python_collection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,10 @@ void ZVecPyCollection::bind_ddl_methods(

// binding column ddl methods
col.def("AddColumn",
[](Collection &self, const std::string &column_name,
const FieldSchema::Ptr &column_schema,
[](Collection &self, const FieldSchema::Ptr &column_schema,
const std::string &expression, const AddColumnOptions &options) {
const auto status =
self.AddColumn(column_name, column_schema, expression, options);
self.AddColumn(column_schema, expression, options);
throw_if_error(status);
})
.def("DropColumn",
Expand Down
20 changes: 6 additions & 14 deletions src/db/collection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ class CollectionImpl : public Collection {

Status Optimize(const OptimizeOptions &options) override;

Status AddColumn(const std::string &column_name,
const FieldSchema::Ptr &column_schema,
Status AddColumn(const FieldSchema::Ptr &column_schema,
const std::string &expression,
const AddColumnOptions &options) override;

Expand Down Expand Up @@ -1046,16 +1045,10 @@ Status CollectionImpl::validate(const std::string &column,
return Status::InvalidArgument("Column schema is null");
}

if (column.empty()) {
if (schema->name().empty()) {
return Status::InvalidArgument("Column name is empty");
}

if (schema->name() != column) {
return Status::InvalidArgument(
"Column name and schema name are not matched");
}

if (schema_->has_field(column)) {
if (schema_->has_field(schema->name())) {
return Status::InvalidArgument("column already exists");
}

Expand Down Expand Up @@ -1143,8 +1136,7 @@ Status CollectionImpl::validate(const std::string &column,
return Status::OK();
}

Status CollectionImpl::AddColumn(const std::string &column_name,
const FieldSchema::Ptr &column_schema,
Status CollectionImpl::AddColumn(const FieldSchema::Ptr &column_schema,
const std::string &expression,
const AddColumnOptions &options) {
CHECK_COLLECTION_READONLY_RETURN_STATUS;
Expand All @@ -1154,7 +1146,7 @@ Status CollectionImpl::AddColumn(const std::string &column_name,
CHECK_DESTROY_RETURN_STATUS(destroyed_, false);

// validate
auto s = validate(column_name, column_schema, expression, "", ColumnOp::ADD);
auto s = validate("", column_schema, expression, "", ColumnOp::ADD);
CHECK_RETURN_STATUS(s);

// forbidden writing until index is ready
Expand All @@ -1172,7 +1164,7 @@ Status CollectionImpl::AddColumn(const std::string &column_name,
Version new_version = version_manager_->get_current_version();

// add column on segment manager
s = segment_manager_->add_column(column_name, column_schema, expression,
s = segment_manager_->add_column(column_schema, expression,
options.concurrency_);
CHECK_RETURN_STATUS(s);

Expand Down
18 changes: 8 additions & 10 deletions src/db/index/segment/segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ class SegmentImpl : public Segment,
RecordBatchReaderPtr scan(
const std::vector<std::string> &columns) const override;

Status add_column(const std::string &column_name,
FieldSchema::Ptr column_schema,
Status add_column(FieldSchema::Ptr column_schema,
const std::string &expression,
const AddColumnOptions &options) override;

Expand Down Expand Up @@ -2939,8 +2938,7 @@ Status SegmentImpl::reopen_invert_indexer(bool read_only) {
return Status::OK();
}

Status SegmentImpl::add_column(const std::string &column_name,
FieldSchema::Ptr column_schema,
Status SegmentImpl::add_column(FieldSchema::Ptr column_schema,
const std::string &expression,
const AddColumnOptions & /*options*/) {
if (memory_store_) {
Expand Down Expand Up @@ -3003,8 +3001,8 @@ Status SegmentImpl::add_column(const std::string &column_name,
return Status::InternalError(result.status().message());
}
auto dataset = std::move(result).ValueOrDie();
auto eval_result = EvaluateExpressionWithDataset(dataset, column_name, expr,
expected_type);
auto eval_result = EvaluateExpressionWithDataset(
dataset, column_schema->name(), expr, expected_type);
if (!eval_result.ok()) {
return Status::InternalError("evaluate expression failed:",
eval_result.status().message());
Expand All @@ -3028,9 +3026,9 @@ Status SegmentImpl::add_column(const std::string &column_name,

std::vector<BlockMeta> new_blocks;
status = WriteColumnInBlocks(
column_name, new_column, filter_column_blocks, path_, segment_meta_->id(),
[this]() { return allocate_block_id(); }, !options_.enable_mmap_,
&new_blocks);
column_schema->name(), new_column, filter_column_blocks, path_,
segment_meta_->id(), [this]() { return allocate_block_id(); },
!options_.enable_mmap_, &new_blocks);
if (!status.ok()) {
return Status::InternalError(status.message());
}
Expand Down Expand Up @@ -3077,7 +3075,7 @@ Status SegmentImpl::add_column(const std::string &column_name,
segment_meta_->add_persisted_block(block);
}

auto column_indexer = (*invert_indexers_)[column_name];
auto column_indexer = (*invert_indexers_)[column_schema->name()];
auto s = insert_array_to_invert_indexer(column_schema, new_column,
&column_indexer);
CHECK_RETURN_STATUS(s);
Expand Down
3 changes: 1 addition & 2 deletions src/db/index/segment/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ class Segment {
virtual uint64_t doc_count(const IndexFilter::Ptr filter = nullptr) = 0;

// for collection
virtual Status add_column(const std::string &column_name,
FieldSchema::Ptr column_schema,
virtual Status add_column(FieldSchema::Ptr column_schema,
const std::string &expression,
const AddColumnOptions &options) = 0;

Expand Down
5 changes: 2 additions & 3 deletions src/db/index/segment/segment_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ std::vector<SegmentMeta::Ptr> SegmentManager::get_segments_meta() const {
return segments_meta;
}

Status SegmentManager::add_column(const std::string &column_name,
const FieldSchema::Ptr &column_schema,
Status SegmentManager::add_column(const FieldSchema::Ptr &column_schema,
const std::string &expression,
int concurrency) {
if (concurrency <= 0) {
Expand All @@ -98,7 +97,7 @@ Status SegmentManager::add_column(const std::string &column_name,
for (size_t j = i; j < end; ++j) {
auto &segment = segments[j].second;
futures.emplace_back(std::async(std::launch::async, [&]() -> Status {
return segment->add_column(column_name, column_schema, expression,
return segment->add_column(column_schema, expression,
AddColumnOptions{concurrency});
}));
}
Expand Down
3 changes: 1 addition & 2 deletions src/db/index/segment/segment_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ class SegmentManager {

std::vector<SegmentMeta::Ptr> get_segments_meta() const;

Status add_column(const std::string &column_name,
const FieldSchema::Ptr &column_schema,
Status add_column(const FieldSchema::Ptr &column_schema,
const std::string &expression, int concurrency);

Status alter_column(const std::string &column_name,
Expand Down
3 changes: 1 addition & 2 deletions src/include/zvec/db/collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ class Collection {
virtual Status Optimize(const OptimizeOptions &options = OptimizeOptions{
0}) = 0;

virtual Status AddColumn(const std::string &column_name,
const FieldSchema::Ptr &column_schema,
virtual Status AddColumn(const FieldSchema::Ptr &column_schema,
const std::string &expression,
const AddColumnOptions &options = AddColumnOptions{
0}) = 0;
Expand Down
35 changes: 11 additions & 24 deletions tests/db/collection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ TEST_F(CollectionTest, Feature_CreateAndOpen_General) {
ASSERT_FALSE(col->GroupByQuery({}).has_value());
ASSERT_FALSE(col->CreateIndex("", nullptr).ok());
ASSERT_FALSE(col->DropIndex("").ok());
ASSERT_FALSE(col->AddColumn("", nullptr, "").ok());
ASSERT_FALSE(col->AddColumn(nullptr, "").ok());
ASSERT_FALSE(col->AlterColumn("", "", nullptr).ok());
ASSERT_FALSE(col->DropColumn("").ok());
ASSERT_FALSE(col->CreateIndex("", nullptr).ok());
Expand Down Expand Up @@ -154,7 +154,7 @@ TEST_F(CollectionTest, Feature_CreateAndOpen_General) {
ASSERT_FALSE(col->DeleteByFilter("").ok());
ASSERT_FALSE(col->CreateIndex("", nullptr).ok());
ASSERT_FALSE(col->DropIndex("").ok());
ASSERT_FALSE(col->AddColumn("", nullptr, "").ok());
ASSERT_FALSE(col->AddColumn(nullptr, "").ok());
ASSERT_FALSE(col->AlterColumn("", "", nullptr).ok());
ASSERT_FALSE(col->DropColumn("").ok());
ASSERT_FALSE(col->CreateIndex("", nullptr).ok());
Expand Down Expand Up @@ -3592,8 +3592,7 @@ TEST_F(CollectionTest, Feature_AddColumn_General) {
ASSERT_EQ(stats.doc_count, doc_count);
auto field_schema =
std::make_shared<FieldSchema>("add_int32", DataType::INT32, false);
auto s = collection->AddColumn("add_int32", field_schema, "int32",
AddColumnOptions());
auto s = collection->AddColumn(field_schema, "int32", AddColumnOptions());
if (!s.ok()) {
std::cout << "status: " << s.message() << std::endl;
ASSERT_TRUE(false);
Expand Down Expand Up @@ -3667,24 +3666,15 @@ TEST_F(CollectionTest, Feature_AddColumn_CornerCase) {
ASSERT_TRUE(result.has_value());
auto collection = result.value();

auto s =
collection->AddColumn("int32", nullptr, "int32", AddColumnOptions());
auto s = collection->AddColumn(nullptr, "int32", AddColumnOptions());
ASSERT_FALSE(s.ok());

s = collection->AddColumn("add_int32", nullptr, "", AddColumnOptions());
s = collection->AddColumn(nullptr, "", AddColumnOptions());
ASSERT_FALSE(s.ok());

auto field_schema =
std::make_shared<FieldSchema>("add_int32", DataType::INT32, false);
s = collection->AddColumn("", field_schema, "int32", AddColumnOptions());
ASSERT_FALSE(s.ok());


s = collection->AddColumn("add_int32_invalid", field_schema, "int32",
AddColumnOptions());
ASSERT_FALSE(s.ok());

s = collection->AddColumn("add_int32", field_schema, "non_exist_field",
s = collection->AddColumn(field_schema, "non_exist_field",
AddColumnOptions());
ASSERT_FALSE(s.ok());
}
Expand All @@ -3697,8 +3687,7 @@ TEST_F(CollectionTest, Feature_AddColumn_CornerCase) {

auto field_schema =
std::make_shared<FieldSchema>("add_int32", DataType::INT32, false);
auto s = collection->AddColumn("add_int32", field_schema, "int32",
AddColumnOptions());
auto s = collection->AddColumn(field_schema, "int32", AddColumnOptions());
if (!s.ok()) {
std::cout << "status: " << s.message() << std::endl;
ASSERT_TRUE(false);
Expand Down Expand Up @@ -3754,8 +3743,8 @@ TEST_F(CollectionTest, Feature_AddColumn_CornerCase) {

auto field_schema =
std::make_shared<FieldSchema>("add_int32_dup", DataType::INT32, false);
auto s = collection->AddColumn("add_int32_dup", field_schema, "add_int32",
AddColumnOptions());
auto s =
collection->AddColumn(field_schema, "add_int32", AddColumnOptions());
if (!s.ok()) {
std::cout << "status: " << s.message() << std::endl;
ASSERT_TRUE(false);
Expand Down Expand Up @@ -3956,8 +3945,7 @@ TEST_F(CollectionTest, Feature_Column_MixOperation) {
// add column
auto field_schema =
std::make_shared<FieldSchema>("add_int32", DataType::INT32, false);
s = collection->AddColumn("add_int32", field_schema, "int32",
AddColumnOptions());
s = collection->AddColumn(field_schema, "int32", AddColumnOptions());
if (!s.ok()) {
std::cout << "status: " << s.message() << std::endl;
ASSERT_TRUE(false);
Expand Down Expand Up @@ -4055,8 +4043,7 @@ TEST_F(CollectionTest, Feature_Column_MixOperation_Empty) {
// add column
auto field_schema =
std::make_shared<FieldSchema>("add_int32", DataType::INT32, false);
auto s = collection->AddColumn("add_int32", field_schema, "int32",
AddColumnOptions());
auto s = collection->AddColumn(field_schema, "int32", AddColumnOptions());
ASSERT_TRUE(s.ok());

auto new_schema = collection->Schema().value();
Expand Down
15 changes: 5 additions & 10 deletions tests/db/index/segment/segment_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,6 @@ TEST_P(SegmentTest, FetchPerf) {
ASSERT_TRUE(segment != nullptr);

s = segment->add_column(
"add_int32",
std::make_shared<FieldSchema>("add_int32", DataType::INT32, false),
"int32 + 1", AddColumnOptions());
EXPECT_TRUE(s.ok());
Expand Down Expand Up @@ -1575,7 +1574,6 @@ TEST_P(SegmentTest, AddColumn) {
ASSERT_TRUE(segment != nullptr);

auto s = segment->add_column(
"add_int32",
std::make_shared<FieldSchema>("add_int32", DataType::INT32, false),
"int32 + 1", AddColumnOptions());
EXPECT_FALSE(s.ok());
Expand Down Expand Up @@ -1633,13 +1631,11 @@ TEST_P(SegmentTest, AddColumn) {
ASSERT_TRUE(segment != nullptr);

s = segment->add_column(
"add_int32",
std::make_shared<FieldSchema>("add_int32", DataType::INT32, false), "",
AddColumnOptions());
EXPECT_FALSE(s.ok());

s = segment->add_column("add_undefined",
std::make_shared<FieldSchema>(
s = segment->add_column(std::make_shared<FieldSchema>(
"add_undefined", DataType::UNDEFINED, false),
"", AddColumnOptions());
EXPECT_FALSE(s.ok());
Expand All @@ -1655,12 +1651,11 @@ TEST_P(SegmentTest, AddColumn) {
}

int add_column_cnt = 0;
auto func = [&](const std::string &column_name,
const std::shared_ptr<FieldSchema> &field_schema,
auto func = [&](const std::shared_ptr<FieldSchema> &field_schema,
const std::string &expression) {
auto &column_name = field_schema->name();
AddColumnOptions add_options;
status =
segment->add_column(column_name, field_schema, expression, add_options);
status = segment->add_column(field_schema, expression, add_options);
EXPECT_TRUE(status.ok());

// after add column
Expand Down Expand Up @@ -1796,7 +1791,7 @@ TEST_P(SegmentTest, AddColumn) {
field_schema->name(), field_schema->data_type(),
field_schema->nullable(), field_schema->index_params());
new_field_schema->set_name(col_name);
func(col_name, new_field_schema, expression);
func(new_field_schema, expression);
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions tests/db/sqlengine/mock_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,7 @@ class MockSegment : public Segment {
return 0;
}

Status add_column(const std::string &column_name,
FieldSchema::Ptr column_schema,
Status add_column(FieldSchema::Ptr column_schema,
const std::string &expression,
const AddColumnOptions &options) override {
return Status::InternalError();
Expand Down