Skip to content

Commit

Permalink
fix: Use pk from binlog during import (#32118) (#32194)
Browse files Browse the repository at this point in the history
During binlog import, even if the primary key's autoID is set to true,
the primary key from the binlog should be used instead of being
reassigned.

issue: #31943,
#28521

pr: #32118

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper authored Apr 15, 2024
1 parent 68b7469 commit b68af20
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 3 deletions.
11 changes: 11 additions & 0 deletions internal/datanode/importv2/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -161,6 +162,11 @@ func NewPreImportTask(req *datapb.PreImportRequest) Task {
}
})
ctx, cancel := context.WithCancel(context.Background())
// During binlog import, even if the primary key's autoID is set to true,
// the primary key from the binlog should be used instead of being reassigned.
if importutilv2.IsBackup(req.GetOptions()) {
UnsetAutoID(req.GetSchema())
}
return &PreImportTask{
PreImportTask: &datapb.PreImportTask{
JobID: req.GetJobID(),
Expand Down Expand Up @@ -230,6 +236,11 @@ type ImportTask struct {

func NewImportTask(req *datapb.ImportRequest) Task {
ctx, cancel := context.WithCancel(context.Background())
// During binlog import, even if the primary key's autoID is set to true,
// the primary key from the binlog should be used instead of being reassigned.
if importutilv2.IsBackup(req.GetOptions()) {
UnsetAutoID(req.GetSchema())
}
task := &ImportTask{
ImportTaskV2: &datapb.ImportTaskV2{
JobID: req.GetJobID(),
Expand Down
9 changes: 9 additions & 0 deletions internal/datanode/importv2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,12 @@ func LogStats(manager TaskManager) {
tasks = manager.GetBy(WithType(ImportTaskType))
logFunc(tasks, ImportTaskType)
}

func UnsetAutoID(schema *schemapb.CollectionSchema) {
for _, field := range schema.GetFields() {
if field.GetIsPrimaryKey() && field.GetAutoID() {
field.AutoID = false
return
}
}
}
24 changes: 24 additions & 0 deletions internal/datanode/importv2/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,27 @@ func Test_AppendSystemFieldsData(t *testing.T) {
assert.Equal(t, count, insertData.Data[common.RowIDField].RowNum())
assert.Equal(t, count, insertData.Data[common.TimeStampField].RowNum())
}

func Test_UnsetAutoID(t *testing.T) {
pkField := &schemapb.FieldSchema{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
AutoID: true,
}
vecField := &schemapb.FieldSchema{
FieldID: 101,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
}

schema := &schemapb.CollectionSchema{}
schema.Fields = []*schemapb.FieldSchema{pkField, vecField}
UnsetAutoID(schema)
for _, field := range schema.GetFields() {
if field.GetIsPrimaryKey() {
assert.False(t, schema.GetFields()[0].GetAutoID())
}
}
}
17 changes: 14 additions & 3 deletions tests/integration/import/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand All @@ -37,7 +38,7 @@ import (
"github.com/milvus-io/milvus/tests/integration"
)

func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64) {
func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64, *schemapb.IDs) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := s.Cluster
Expand Down Expand Up @@ -86,6 +87,7 @@ func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64) {
})
s.NoError(err)
s.Equal(int32(0), insertResult.GetStatus().GetCode())
insertedIDs := insertResult.GetIDs()

// flush
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
Expand Down Expand Up @@ -148,7 +150,7 @@ func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64) {
// get collectionID and partitionID
collectionID := showCollectionsResp.GetCollectionIds()[0]
partitionID := showPartitionsResp.GetPartitionIDs()[0]
return collectionID, partitionID
return collectionID, partitionID, insertedIDs
}

func (s *BulkInsertSuite) TestBinlogImport() {
Expand All @@ -157,7 +159,7 @@ func (s *BulkInsertSuite) TestBinlogImport() {
endTs = "548373346338803234"
)

collectionID, partitionID := s.PrepareCollectionA()
collectionID, partitionID, insertedIDs := s.PrepareCollectionA()

c := s.Cluster
ctx, cancel := context.WithTimeout(c.GetContext(), 60*time.Second)
Expand Down Expand Up @@ -252,4 +254,13 @@ func (s *BulkInsertSuite) TestBinlogImport() {
err = merr.CheckRPCCall(searchResult, err)
s.NoError(err)
s.Equal(nq*topk, len(searchResult.GetResults().GetScores()))
// check ids from collectionA, because during binlog import, even if the primary key's autoID is set to true,
// the primary key from the binlog should be used instead of being reassigned.
insertedIDsMap := lo.SliceToMap(insertedIDs.GetIntId().GetData(), func(id int64) (int64, struct{}) {
return id, struct{}{}
})
for _, id := range searchResult.GetResults().GetIds().GetIntId().GetData() {
_, ok := insertedIDsMap[id]
s.True(ok)
}
}

0 comments on commit b68af20

Please sign in to comment.