Skip to content

Commit

Permalink
fix: sync task still running after DataNode has stopped
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <yun.zhang@zilliz.com>
  • Loading branch information
jaime0815 committed Dec 13, 2024
1 parent 1da4ac4 commit 927cbb3
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 27 deletions.
7 changes: 7 additions & 0 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,13 @@ func (node *DataNode) Stop() error {
node.writeBufferManager.Stop()
}

if node.syncMgr != nil {
err := node.syncMgr.Close()
if err != nil {
log.Error("sync manager close failed", zap.Error(err))
}
}

if node.allocator != nil {
log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole))
node.allocator.Close()
Expand Down
12 changes: 6 additions & 6 deletions internal/datanode/importv2/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() {
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.cm = cm

s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
return future
return future, nil
})
importReq := &datapb.ImportRequest{
JobID: 10,
Expand Down Expand Up @@ -307,11 +307,11 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() {
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.cm = cm

s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, errors.New("mock err")
})
return future
return future, nil
})
importReq := &datapb.ImportRequest{
JobID: 10,
Expand Down Expand Up @@ -384,11 +384,11 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() {
}

func (s *SchedulerSuite) TestScheduler_ImportFile() {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
return future
return future, nil
})
var once sync.Once
data, err := testutil.CreateInsertData(s.schema, s.numRows)
Expand Down
6 changes: 5 additions & 1 deletion internal/datanode/importv2/task_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []sy
if err != nil {
return nil, nil, err
}
future := t.syncMgr.SyncData(t.ctx, syncTask)
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
if err != nil {
log.Warn("sync data failed", WrapLogFields(t, zap.Error(err))...)
continue
}
futures = append(futures, future)
syncTasks = append(syncTasks, syncTask)
}
Expand Down
6 changes: 5 additions & 1 deletion internal/datanode/importv2/task_l0_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,11 @@ func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future
if err != nil {
return nil, nil, err
}
future := t.syncMgr.SyncData(t.ctx, syncTask)
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
if err != nil {
log.Warn("failed to sync l0 delete data", WrapLogFields(t, zap.Error(err))...)
continue
}
futures = append(futures, future)
syncTasks = append(syncTasks, syncTask)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/importv2/task_l0_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *L0ImportSuite) TestL0PreImport() {

func (s *L0ImportSuite) TestL0Import() {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().Alloc(mock.Anything).Return(1, int64(s.delCnt)+1, nil)
task.(*syncmgr.SyncTask).WithAllocator(alloc)
Expand All @@ -147,7 +147,7 @@ func (s *L0ImportSuite) TestL0Import() {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
return future
return future, nil
})

req := &datapb.ImportRequest{
Expand Down
65 changes: 60 additions & 5 deletions internal/datanode/syncmgr/mock_sync_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 12 additions & 3 deletions internal/datanode/syncmgr/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type SyncMeta struct {
//go:generate mockery --name=SyncManager --structname=MockSyncManager --output=./ --filename=mock_sync_manager.go --with-expecter --inpackage
type SyncManager interface {
// SyncData is the method to submit sync task.
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}]
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error)
}

type syncManager struct {
Expand Down Expand Up @@ -97,15 +97,19 @@ func (mgr *syncManager) resizeHandler(evt *config.Event) {
}
}

func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
if mgr.workerPool.IsClosed() {
return nil, fmt.Errorf("sync manager is closed")
}

switch t := task.(type) {
case *SyncTask:
t.WithAllocator(mgr.allocator).WithChunkManager(mgr.chunkManager)
case *SyncTaskV2:
t.WithAllocator(mgr.allocator)
}

return mgr.safeSubmitTask(task, callbacks...)
return mgr.safeSubmitTask(task, callbacks...), nil
}

// safeSubmitTask submits task to SyncManager
Expand Down Expand Up @@ -147,3 +151,8 @@ func (mgr *syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPo
})
return segmentID, cp
}

func (mgr *syncManager) Close() error {
timeout := paramtable.Get().CommonCfg.SyncTaskPoolReleaseWaitTimeout.GetAsDuration(time.Second)

Check failure on line 156 in internal/datanode/syncmgr/sync_manager.go

View workflow job for this annotation

GitHub Actions / Code Checker MacOS 13

undefined: time
return mgr.workerPool.ReleaseTimeout(timeout)
}
22 changes: 18 additions & 4 deletions internal/datanode/syncmgr/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,26 @@ func (s *SyncManagerSuite) TestSubmit() {
Timestamp: 100,
})

f := manager.SyncData(context.Background(), task)
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
s.NotNil(f)

_, err = f.Await()
s.NoError(err)
}

func (s *SyncManagerSuite) TestClose() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)

err = manager.Close()
s.NoError(err)

f, err := manager.SyncData(context.Background(), nil)
s.Error(err)
s.Nil(f)
}

func (s *SyncManagerSuite) TestCompacted() {
var segmentID atomic.Int64
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Run(func(_ context.Context, req *datapb.SaveBinlogPathsRequest) {
Expand All @@ -200,7 +213,8 @@ func (s *SyncManagerSuite) TestCompacted() {
Timestamp: 100,
})

f := manager.SyncData(context.Background(), task)
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
s.NotNil(f)

_, err = f.Await()
Expand Down Expand Up @@ -271,7 +285,7 @@ func (s *SyncManagerSuite) TestUnexpectedError() {
task.EXPECT().Run().Return(merr.WrapErrServiceInternal("mocked")).Once()
task.EXPECT().HandleError(mock.Anything)

f := manager.SyncData(context.Background(), task)
f, _ := manager.SyncData(context.Background(), task)
_, err = f.Await()
s.Error(err)
}
Expand All @@ -286,7 +300,7 @@ func (s *SyncManagerSuite) TestTargetUpdateSameID() {
task.EXPECT().Run().Return(errors.New("mock err")).Once()
task.EXPECT().HandleError(mock.Anything)

f := manager.SyncData(context.Background(), task)
f, _ := manager.SyncData(context.Background(), task)
_, err = f.Await()
s.Error(err)
}
Expand Down
15 changes: 12 additions & 3 deletions internal/datanode/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
}
}

result = append(result, wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
future, err := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if err != nil {
return err
}
Expand All @@ -345,7 +345,12 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp())
}
return nil
}))
})
if err != nil {
log.Warn("failed to sync data", zap.Int64("segmentID", segmentID), zap.Error(err))
continue
}
result = append(result, future)
}
return result
}
Expand Down Expand Up @@ -652,7 +657,7 @@ func (wb *writeBufferBase) Close(drop bool) {
t.WithDrop()
}

f := wb.syncMgr.SyncData(context.Background(), syncTask, func(err error) error {
f, err := wb.syncMgr.SyncData(context.Background(), syncTask, func(err error) error {
if err != nil {
return err
}
Expand All @@ -661,6 +666,10 @@ func (wb *writeBufferBase) Close(drop bool) {
}
return nil
})
if err != nil {
log.Warn("failed to sync segment", zap.Int64("segmentID", id), zap.Error(err))
continue
}
futures = append(futures, f)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/writebuffer/write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (s *WriteBufferSuite) TestEvictBuffer() {
serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil)
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(conc.Go[struct{}](func() (struct{}, error) {
return struct{}{}, nil
}))
}), nil)
defer func() {
s.wb.mut.Lock()
defer s.wb.mut.Unlock()
Expand Down
11 changes: 10 additions & 1 deletion pkg/util/conc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
"fmt"
"strconv"
"sync"
"time"

ants "github.com/panjf2000/ants/v2"
"github.com/panjf2000/ants/v2"

"github.com/milvus-io/milvus/pkg/util/generic"
"github.com/milvus-io/milvus/pkg/util/hardware"
Expand Down Expand Up @@ -107,10 +108,18 @@ func (pool *Pool[T]) Free() int {
return pool.inner.Free()
}

func (pool *Pool[T]) IsClosed() bool {
return pool.inner.IsClosed()
}

func (pool *Pool[T]) Release() {
pool.inner.Release()
}

func (pool *Pool[T]) ReleaseTimeout(timeout time.Duration) error {
return pool.inner.ReleaseTimeout(timeout)
}

func (pool *Pool[T]) Resize(size int) error {
if pool.opt.preAlloc {
return merr.WrapErrServiceInternal("cannot resize pre-alloc pool")
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ type commonConfig struct {

// Local RPC enabled for milvus internal communication when mix or standalone mode.
LocalRPCEnabled ParamItem `refreshable:"false"`

SyncTaskPoolReleaseWaitTimeout ParamItem `refreshable:"true"`
}

func (p *commonConfig) init(base *BaseTable) {
Expand Down Expand Up @@ -946,6 +948,15 @@ This helps Milvus-CDC synchronize incremental data`,
Export: true,
}
p.LocalRPCEnabled.Init(base.mgr)

p.SyncTaskPoolReleaseWaitTimeout = ParamItem{
Key: "flush.sync.taskPoolReleaseWaitTimeout",
DefaultValue: "60s",
Version: "2.4.19",
Doc: "The maximum time to wait for the task to finish and release resources in the pool",
Export: true,
}
p.SyncTaskPoolReleaseWaitTimeout.Init(base.mgr)
}

type gpuConfig struct {
Expand Down
Loading

0 comments on commit 927cbb3

Please sign in to comment.