Skip to content

Commit

Permalink
Do not use DataCoord context when DataNode is handling import task (#…
Browse files Browse the repository at this point in the history
…19732)

So that when DataCoord is done, DataNode can still proceed.

/kind bug

issue: #19730
Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
  • Loading branch information
soothing-rain authored Oct 14, 2022
1 parent cb51d41 commit 8b8df0a
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 29 deletions.
59 changes: 38 additions & 21 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ const (

// ConnectEtcdMaxRetryTime is used to limit the max retry time for connection etcd
ConnectEtcdMaxRetryTime = 100

// ImportCallTimeout is the timeout used in Import() method calls.
ImportCallTimeout = 30 * time.Second
)

var getFlowGraphServiceAttempts = uint(50)
Expand All @@ -95,14 +98,15 @@ var rateCol *rateCollector
// services in datanode package.
//
// DataNode implements `types.Component`, `types.DataNode` interfaces.
// `etcdCli` is a connection of etcd
// `rootCoord` is a grpc client of root coordinator.
// `dataCoord` is a grpc client of data service.
// `NodeID` is unique to each datanode.
// `State` is current statement of this data node, indicating whether it's healthy.
//
// `clearSignal` is a signal channel for releasing the flowgraph resources.
// `segmentCache` stores all flushing and flushed segments.
// `etcdCli` is a connection of etcd
// `rootCoord` is a grpc client of root coordinator.
// `dataCoord` is a grpc client of data service.
// `NodeID` is unique to each datanode.
// `State` is current statement of this data node, indicating whether it's healthy.
//
// `clearSignal` is a signal channel for releasing the flowgraph resources.
// `segmentCache` stores all flushing and flushed segments.
type DataNode struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -570,10 +574,11 @@ func (node *DataNode) ReadyToFlush() error {
}

// FlushSegments packs flush messages into flowGraph through flushChan.
// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored.
// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed.
//
// One precondition: The segmentID in req is in ascending order.
// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored.
// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed.
//
// One precondition: The segmentID in req is in ascending order.
func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
metrics.DataNodeFlushReqCounter.WithLabelValues(
fmt.Sprint(Params.DataNodeCfg.GetNodeID()),
Expand Down Expand Up @@ -719,7 +724,7 @@ func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
}, nil
}

//ShowConfigurations returns the configurations of DataNode matching req.Pattern
// ShowConfigurations returns the configurations of DataNode matching req.Pattern
func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
log.Debug("DataNode.ShowConfigurations", zap.String("pattern", req.Pattern))
if !node.isHealthy() {
Expand Down Expand Up @@ -962,11 +967,15 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
AutoIds: make([]int64, 0),
RowCount: 0,
}
// func to report import state to rootcoord

// Spawn a new context to ignore cancellation from parental context.
newCtx, cancel := context.WithTimeout(context.TODO(), ImportCallTimeout)
defer cancel()
// func to report import state to RootCoord.
reportFunc := func(res *rootcoordpb.ImportResult) error {
status, err := node.rootCoord.ReportImport(ctx, res)
status, err := node.rootCoord.ReportImport(newCtx, res)
if err != nil {
log.Error("fail to report import state to root coord", zap.Error(err))
log.Error("fail to report import state to RootCoord", zap.Error(err))
return err
}
if status != nil && status.ErrorCode != commonpb.ErrorCode_Success {
Expand All @@ -979,7 +988,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
log.Warn("DataNode import failed",
zap.Int64("collection ID", req.GetImportTask().GetCollectionId()),
zap.Int64("partition ID", req.GetImportTask().GetPartitionId()),
zap.Int64("taskID", req.GetImportTask().GetTaskId()),
zap.Int64("task ID", req.GetImportTask().GetTaskId()),
zap.Error(errDataNodeIsUnhealthy(Params.DataNodeCfg.GetNodeID())))

return &commonpb.Status{
Expand All @@ -989,7 +998,8 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
}

// get a timestamp for all the rows
rep, err := node.rootCoord.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{
// Ignore cancellation from parent context.
rep, err := node.rootCoord.AllocTimestamp(newCtx, &rootcoordpb.AllocTimestampRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RequestTSO,
MsgID: 0,
Expand All @@ -1005,7 +1015,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
importResult.State = commonpb.ImportState_ImportFailed
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: msg})
if reportErr := reportFunc(importResult); reportErr != nil {
log.Warn("fail to report import state to root coord", zap.Error(reportErr))
log.Warn("fail to report import state to RootCoord", zap.Error(reportErr))
}
if err != nil {
return &commonpb.Status{
Expand All @@ -1019,13 +1029,17 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)

// get collection schema and shard number
metaService := newMetaService(node.rootCoord, req.GetImportTask().GetCollectionId())
colInfo, err := metaService.getCollectionInfo(ctx, req.GetImportTask().GetCollectionId(), 0)
colInfo, err := metaService.getCollectionInfo(newCtx, req.GetImportTask().GetCollectionId(), 0)
if err != nil {
log.Warn("failed to get collection info for collection ID",
zap.Int64("task ID", req.GetImportTask().GetTaskId()),
zap.Int64("collection ID", req.GetImportTask().GetCollectionId()),
zap.Error(err))
importResult.State = commonpb.ImportState_ImportFailed
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: err.Error()})
reportErr := reportFunc(importResult)
if reportErr != nil {
log.Warn("fail to report import state to root coord", zap.Error(err))
log.Warn("fail to report import state to RootCoord", zap.Error(err))
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Expand All @@ -1035,15 +1049,18 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)

// parse files and generate segments
segmentSize := int64(Params.DataCoordCfg.SegmentMaxSize) * 1024 * 1024
importWrapper := importutil.NewImportWrapper(ctx, colInfo.GetSchema(), colInfo.GetShardsNum(), segmentSize, node.rowIDAllocator, node.chunkManager,
importWrapper := importutil.NewImportWrapper(newCtx, colInfo.GetSchema(), colInfo.GetShardsNum(), segmentSize, node.rowIDAllocator, node.chunkManager,
importFlushReqFunc(node, req, importResult, colInfo.GetSchema(), ts), importResult, reportFunc)
err = importWrapper.Import(req.GetImportTask().GetFiles(), req.GetImportTask().GetRowBased(), false)
if err != nil {
log.Warn("import wrapper failed to parse import request",
zap.Int64("task ID", req.GetImportTask().GetTaskId()),
zap.Error(err))
importResult.State = commonpb.ImportState_ImportFailed
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: err.Error()})
reportErr := reportFunc(importResult)
if reportErr != nil {
log.Warn("fail to report import state to root coord", zap.Error(err))
log.Warn("fail to report import state to RootCoord", zap.Error(err))
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Expand Down
12 changes: 8 additions & 4 deletions internal/datanode/data_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/importutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -76,6 +77,8 @@ func TestMain(t *testing.M) {
}

func TestDataNode(t *testing.T) {
importutil.ReportImportAttempts = 1

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -465,7 +468,7 @@ func TestDataNode(t *testing.T) {
},
}
node.rootCoord.(*RootCoordFactory).ReportImportErr = true
_, err = node.Import(context.WithValue(ctx, ctxKey{}, ""), req)
_, err = node.Import(node.ctx, req)
assert.NoError(t, err)
node.rootCoord.(*RootCoordFactory).ReportImportErr = false

Expand Down Expand Up @@ -548,8 +551,9 @@ func TestDataNode(t *testing.T) {

t.Run("Test Import report import error", func(t *testing.T) {
node.rootCoord = &RootCoordFactory{
collectionID: 100,
pkType: schemapb.DataType_Int64,
collectionID: 100,
pkType: schemapb.DataType_Int64,
ReportImportErr: true,
}
content := []byte(`{
"rows":[
Expand All @@ -573,7 +577,7 @@ func TestDataNode(t *testing.T) {
RowBased: true,
},
}
stat, err := node.Import(context.WithValue(node.ctx, ctxKey{}, returnError), req)
stat, err := node.Import(node.ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode())
})
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (ds *DataCoordFactory) BroadcastAlteredCollection(ctx context.Context, req

func (ds *DataCoordFactory) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
if ds.GetSegmentInfosError {
return nil, errors.New("mock error")
return nil, errors.New("mock get segment info error")
}
if ds.GetSegmentInfosNotSuccess {
return &datapb.GetSegmentInfoResponse{
Expand Down Expand Up @@ -1027,7 +1027,7 @@ func (m *RootCoordFactory) ReportImport(ctx context.Context, req *rootcoordpb.Im
if m.ReportImportErr {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, fmt.Errorf("mock error")
}, fmt.Errorf("mock report import error")
}
if m.ReportImportNotSuccess {
return &commonpb.Status{
Expand Down
7 changes: 5 additions & 2 deletions internal/util/importutil/import_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ const (
MaxTotalSizeInMemory = 2 * 1024 * 1024 * 1024 // 2GB
)

// ReportImportAttempts is the maximum # of attempts to retry when import fails.
var ReportImportAttempts uint = 10

type ImportWrapper struct {
ctx context.Context // for canceling parse process
cancel context.CancelFunc // for canceling parse process
Expand Down Expand Up @@ -342,9 +345,9 @@ func (p *ImportWrapper) reportPersisted() error {
// persist state task is valuable, retry more times in case fail this task only because of network error
reportErr := retry.Do(p.ctx, func() error {
return p.reportFunc(p.importResult)
}, retry.Attempts(10))
}, retry.Attempts(ReportImportAttempts))
if reportErr != nil {
log.Warn("import wrapper: fail to report import state to root coord", zap.Error(reportErr))
log.Warn("import wrapper: fail to report import state to RootCoord", zap.Error(reportErr))
return reportErr
}
return nil
Expand Down

0 comments on commit 8b8df0a

Please sign in to comment.