Skip to content

Commit

Permalink
enhance: Move SyncCreatedPartition to proxy
Browse files Browse the repository at this point in the history
Related to milvus-io#38275

This PR move sync created partition step to proxy to avoid potential
logic deadlock when create partition happens with target segment change.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Dec 9, 2024
1 parent 7a5aea1 commit c052847
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 34 deletions.
1 change: 1 addition & 0 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create
Condition: NewTaskCondition(ctx),
CreatePartitionRequest: request,
rootCoord: node.rootCoord,
queryCoord: node.queryCoord,
result: nil,
}

Expand Down
25 changes: 22 additions & 3 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,9 +1069,10 @@ type createPartitionTask struct {
baseTask
Condition
*milvuspb.CreatePartitionRequest
ctx context.Context
rootCoord types.RootCoordClient
result *commonpb.Status
ctx context.Context
rootCoord types.RootCoordClient
queryCoord types.QueryCoordClient
result *commonpb.Status
}

func (t *createPartitionTask) TraceCtx() context.Context {
Expand Down Expand Up @@ -1139,6 +1140,24 @@ func (t *createPartitionTask) PreExecute(ctx context.Context) error {

func (t *createPartitionTask) Execute(ctx context.Context) (err error) {
t.result, err = t.rootCoord.CreatePartition(ctx, t.CreatePartitionRequest)
if err := merr.CheckRPCCall(t.result, err); err != nil {
return err
}
collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.GetCollectionName())
if err != nil {
t.result = merr.Status(err)
return err
}
partitionID, err := globalMetaCache.GetPartitionID(ctx, t.GetDbName(), t.GetCollectionName(), t.GetPartitionName())
if err != nil {
t.result = merr.Status(err)
return err
}
t.result, err = t.queryCoord.SyncNewCreatedPartition(ctx, &querypb.SyncNewCreatedPartitionRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ReleasePartitions)),
CollectionID: collectionID,
PartitionID: partitionID,
})
return merr.CheckRPCCall(t.result, err)
}

Expand Down
29 changes: 23 additions & 6 deletions internal/proxy/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1380,9 +1380,19 @@ func TestDescribeCollectionTask_ShardsNum2(t *testing.T) {
}

func TestCreatePartitionTask(t *testing.T) {
rc := NewRootCoordMock()
rc := mocks.NewMockRootCoordClient(t)
qc := mocks.NewMockQueryCoordClient(t)

mockCache := NewMockCache(t)
mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(newSchemaInfo(&schemapb.CollectionSchema{
EnableDynamicField: false,
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "ID", DataType: schemapb.DataType_Int64},
{FieldID: 101, Name: "Vector", DataType: schemapb.DataType_FloatVector},
},
}), nil)
globalMetaCache = mockCache

defer rc.Close()
ctx := context.Background()
prefix := "TestCreatePartitionTask"
dbName := ""
Expand All @@ -1401,9 +1411,10 @@ func TestCreatePartitionTask(t *testing.T) {
CollectionName: collectionName,
PartitionName: partitionName,
},
ctx: ctx,
rootCoord: rc,
result: nil,
ctx: ctx,
rootCoord: rc,
queryCoord: qc,
result: nil,
}
task.OnEnqueue()
task.PreExecute(ctx)
Expand All @@ -1413,8 +1424,14 @@ func TestCreatePartitionTask(t *testing.T) {
assert.Equal(t, Timestamp(100), task.BeginTs())
assert.Equal(t, Timestamp(100), task.EndTs())
assert.Equal(t, paramtable.GetNodeID(), task.GetBase().GetSourceID())

// setup global meta cache
mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(100, nil).Once()
mockCache.EXPECT().GetPartitionID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(1000, nil).Once()
rc.EXPECT().CreatePartition(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
qc.EXPECT().SyncNewCreatedPartition(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
err := task.Execute(ctx)
assert.Error(t, err)
assert.NoError(t, err)

task.CollectionName = "#0xc0de"
err = task.PreExecute(ctx)
Expand Down
6 changes: 0 additions & 6 deletions internal/rootcoord/create_partition_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,6 @@ func (t *createPartitionTask) Execute(ctx context.Context) error {
partitionIDs: []int64{partID},
})

undoTask.AddStep(&syncNewCreatedPartitionStep{
baseStep: baseStep{core: t.core},
collectionID: t.collMeta.CollectionID,
partitionID: partID,
}, &nullStep{})

undoTask.AddStep(&changePartitionStateStep{
baseStep: baseStep{core: t.core},
collectionID: t.collMeta.CollectionID,
Expand Down
19 changes: 0 additions & 19 deletions internal/rootcoord/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,25 +327,6 @@ func (s *releasePartitionsStep) Weight() stepPriority {
return stepPriorityUrgent
}

type syncNewCreatedPartitionStep struct {
baseStep
collectionID UniqueID
partitionID UniqueID
}

func (s *syncNewCreatedPartitionStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.broker.SyncNewCreatedPartition(ctx, s.collectionID, s.partitionID)
return nil, err
}

func (s *syncNewCreatedPartitionStep) Desc() string {
return fmt.Sprintf("sync new partition, collectionID=%d, partitionID=%d", s.partitionID, s.partitionID)
}

func (s *syncNewCreatedPartitionStep) Weight() stepPriority {
return stepPriorityUrgent
}

type dropIndexStep struct {
baseStep
collID UniqueID
Expand Down

0 comments on commit c052847

Please sign in to comment.