Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [2.4] Fix consume blocked due to too many consumers #38916

Open
wants to merge 3 commits into
base: 2.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ mq:
mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge
targetBufSize: 16 # the lenth of channel buffer for targe
maxTolerantLag: 3 # Default value: "3", the timeout(in seconds) that target sends msgPack
maxDispatcherNumPerPchannel: 10 # The maximum number of dispatchers per physical channel, primarily to limit the number of consumers and prevent performance issues(e.g., during recovery when a large number of channels are watched).
retrySleep: 5 # register retry sleep time in seconds
retryTimeout: 300 # register retry timeout in seconds

# Related configuration of pulsar, used to manage Milvus logs of recent mutation operations, output streaming log, and provide log publish-subscribe services.
pulsar:
Expand Down
39 changes: 33 additions & 6 deletions internal/datanode/flow_graph_dmstream_input_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -41,24 +43,49 @@ import (
// flowgraph ddNode.
func newDmInputNode(initCtx context.Context, dispatcherClient msgdispatcher.Client, seekPos *msgpb.MsgPosition, dmNodeConfig *nodeConfig) (*flowgraph.InputNode, error) {
log := log.With(zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Int64("collectionID", dmNodeConfig.collectionID),
zap.String("vchannel", dmNodeConfig.vChannelName))
var err error
var input <-chan *msgstream.MsgPack

var (
input <-chan *msgstream.MsgPack
err error
start = time.Now()
)

if seekPos != nil && len(seekPos.MsgID) != 0 {
input, err = dispatcherClient.Register(initCtx, dmNodeConfig.vChannelName, seekPos, common.SubscriptionPositionUnknown)
err := retry.Handle(initCtx, func() (bool, error) {
input, err = dispatcherClient.Register(initCtx, dmNodeConfig.vChannelName, seekPos, common.SubscriptionPositionUnknown)
if err != nil {
log.Warn("datanode consume failed", zap.Error(err))
return errors.Is(err, msgdispatcher.ErrTooManyConsumers), err
}
return false, nil
}, retry.Sleep(paramtable.Get().MQCfg.RetrySleep.GetAsDuration(time.Second)), // 5 seconds
retry.MaxSleepTime(paramtable.Get().MQCfg.RetryTimeout.GetAsDuration(time.Second))) // 5 minutes
if err != nil {
log.Warn("datanode consume failed after retried", zap.Error(err))
return nil, err
}

log.Info("datanode seek successfully when register to msgDispatcher",
zap.ByteString("msgID", seekPos.GetMsgID()),
zap.Time("tsTime", tsoutil.PhysicalTime(seekPos.GetTimestamp())),
zap.Duration("tsLag", time.Since(tsoutil.PhysicalTime(seekPos.GetTimestamp()))))
zap.Duration("tsLag", time.Since(tsoutil.PhysicalTime(seekPos.GetTimestamp()))),
zap.Duration("dur", time.Since(start)))
} else {
input, err = dispatcherClient.Register(initCtx, dmNodeConfig.vChannelName, nil, common.SubscriptionPositionEarliest)
err = retry.Handle(initCtx, func() (bool, error) {
input, err = dispatcherClient.Register(initCtx, dmNodeConfig.vChannelName, nil, common.SubscriptionPositionEarliest)
if err != nil {
log.Warn("datanode consume failed", zap.Error(err))
return errors.Is(err, msgdispatcher.ErrTooManyConsumers), err
}
return false, nil
}, retry.Sleep(paramtable.Get().MQCfg.RetrySleep.GetAsDuration(time.Second)), // 5 seconds
retry.MaxSleepTime(paramtable.Get().MQCfg.RetryTimeout.GetAsDuration(time.Second))) // 5 minutes
if err != nil {
log.Warn("datanode consume failed after retried", zap.Error(err))
return nil, err
}

log.Info("datanode consume successfully when register to msgDispatcher")
}

Expand Down
26 changes: 24 additions & 2 deletions internal/util/pipeline/stream_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,26 @@
"sync"
"time"

<<<<<<< HEAD

Check failure on line 24 in internal/util/pipeline/stream_pipeline.go

View workflow job for this annotation

GitHub Actions / Code Checker MacOS 13

missing import path
=======
"github.com/cockroachdb/errors"
"go.uber.org/atomic"
>>>>>>> 1bcf3d9ee7 (fix: Fix consume blocked due to too many consumers)
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
<<<<<<< HEAD
=======
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
>>>>>>> 1bcf3d9ee7 (fix: Fix consume blocked due to too many consumers)
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)

Expand Down Expand Up @@ -71,11 +84,20 @@
}

start := time.Now()
p.input, err = p.dispatcher.Register(ctx, p.vChannel, position, common.SubscriptionPositionUnknown)
err = retry.Handle(ctx, func() (bool, error) {
p.input, err = p.dispatcher.Register(ctx, p.vChannel, position, common.SubscriptionPositionUnknown)
if err != nil {
log.Warn("dispatcher register failed", zap.String("channel", position.ChannelName), zap.Error(err))
return errors.Is(err, msgdispatcher.ErrTooManyConsumers), err
}
return false, nil
}, retry.Sleep(paramtable.Get().MQCfg.RetrySleep.GetAsDuration(time.Second)), // 5 seconds
retry.MaxSleepTime(paramtable.Get().MQCfg.RetryTimeout.GetAsDuration(time.Second))) // 5 minutes
if err != nil {
log.Error("dispatcher register failed", zap.String("channel", position.ChannelName))
log.Error("dispatcher register failed after retried", zap.String("channel", position.ChannelName), zap.Error(err))
return WrapErrRegDispather(err)
}

ts, _ := tsoutil.ParseTS(position.GetTimestamp())
log.Info("stream pipeline seeks from position with msgDispatcher",
zap.String("pchannel", position.ChannelName),
Expand Down
2 changes: 2 additions & 0 deletions internal/util/pipeline/stream_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

type StreamPipelineSuite struct {
Expand All @@ -43,6 +44,7 @@ type StreamPipelineSuite struct {
}

func (suite *StreamPipelineSuite) SetupTest() {
paramtable.Init()
suite.channel = "test-channel"
suite.inChannel = make(chan *msgstream.MsgPack, 1)
suite.outChannel = make(chan msgstream.Timestamp)
Expand Down
64 changes: 40 additions & 24 deletions pkg/mq/msgdispatcher/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@ package msgdispatcher

import (
"context"
"sync"
"time"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var ErrTooManyConsumers = errors.New("consumer number limit exceeded")

type (
Pos = msgpb.MsgPosition
MsgPack = msgstream.MsgPack
Expand All @@ -46,70 +52,80 @@ var _ Client = (*client)(nil)
type client struct {
role string
nodeID int64
managers map[string]DispatcherManager
managerMut sync.Mutex
managers *typeutil.ConcurrentMap[string, DispatcherManager]
managerMut *lock.KeyLock[string]
factory msgstream.Factory
}

func NewClient(factory msgstream.Factory, role string, nodeID int64) Client {
return &client{
role: role,
nodeID: nodeID,
factory: factory,
managers: make(map[string]DispatcherManager),
role: role,
nodeID: nodeID,
factory: factory,
managers: typeutil.NewConcurrentMap[string, DispatcherManager](),
managerMut: lock.NewKeyLock[string](),
}
}

func (c *client) Register(ctx context.Context, vchannel string, pos *Pos, subPos SubPos) (<-chan *MsgPack, error) {
log := log.With(zap.String("role", c.role),
zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel))
pchannel := funcutil.ToPhysicalChannel(vchannel)
c.managerMut.Lock()
defer c.managerMut.Unlock()
start := time.Now()
c.managerMut.Lock(pchannel)
defer c.managerMut.Unlock(pchannel)
var manager DispatcherManager
manager, ok := c.managers[pchannel]
manager, ok := c.managers.Get(pchannel)
if !ok {
manager = NewDispatcherManager(pchannel, c.role, c.nodeID, c.factory)
c.managers[pchannel] = manager
c.managers.Insert(pchannel, manager)
go manager.Run()
}
// Check if the consumer number limit has been reached.
if manager.Num() >= paramtable.Get().MQCfg.MaxDispatcherNumPerPchannel.GetAsInt() {
return nil, ErrTooManyConsumers
}
// Begin to register
ch, err := manager.Add(ctx, vchannel, pos, subPos)
if err != nil {
if manager.Num() == 0 {
manager.Close()
delete(c.managers, pchannel)
c.managers.Remove(pchannel)
}
log.Error("register failed", zap.Error(err))
return nil, err
}
log.Info("register done")
log.Info("register done", zap.Duration("dur", time.Since(start)))
return ch, nil
}

func (c *client) Deregister(vchannel string) {
pchannel := funcutil.ToPhysicalChannel(vchannel)
c.managerMut.Lock()
defer c.managerMut.Unlock()
if manager, ok := c.managers[pchannel]; ok {
start := time.Now()
c.managerMut.Lock(pchannel)
defer c.managerMut.Unlock(pchannel)
if manager, ok := c.managers.Get(pchannel); ok {
manager.Remove(vchannel)
if manager.Num() == 0 {
manager.Close()
delete(c.managers, pchannel)
c.managers.Remove(pchannel)
}
log.Info("deregister done", zap.String("role", c.role),
zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel))
log.Info("deregister done", zap.String("role", c.role), zap.Int64("nodeID", c.nodeID),
zap.String("vchannel", vchannel), zap.Duration("dur", time.Since(start)))
}
}

func (c *client) Close() {
log := log.With(zap.String("role", c.role),
zap.Int64("nodeID", c.nodeID))
c.managerMut.Lock()
defer c.managerMut.Unlock()
for pchannel, manager := range c.managers {

c.managers.Range(func(pchannel string, manager DispatcherManager) bool {
c.managerMut.Lock(pchannel)
defer c.managerMut.Unlock(pchannel)
log.Info("close manager", zap.String("channel", pchannel))
delete(c.managers, pchannel)
c.managers.Remove(pchannel)
manager.Close()
}
return true
})
log.Info("dispatcher client closed")
}
4 changes: 1 addition & 3 deletions pkg/mq/msgdispatcher/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ func TestClient_Concurrency(t *testing.T) {
expected := int(total - deregisterCount.Load())

c := client1.(*client)
c.managerMut.Lock()
n := len(c.managers)
c.managerMut.Unlock()
n := c.managers.Len()
assert.Equal(t, expected, n)
}
36 changes: 33 additions & 3 deletions pkg/util/paramtable/service_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,12 @@ type MQConfig struct {
IgnoreBadPosition ParamItem `refreshable:"true"`

// msgdispatcher
MergeCheckInterval ParamItem `refreshable:"false"`
TargetBufSize ParamItem `refreshable:"false"`
MaxTolerantLag ParamItem `refreshable:"true"`
MergeCheckInterval ParamItem `refreshable:"false"`
TargetBufSize ParamItem `refreshable:"false"`
MaxTolerantLag ParamItem `refreshable:"true"`
MaxDispatcherNumPerPchannel ParamItem `refreshable:"true"`
RetrySleep ParamItem `refreshable:"true"`
RetryTimeout ParamItem `refreshable:"true"`
}

// Init initializes the MQConfig object with a BaseTable.
Expand All @@ -535,6 +538,33 @@ Valid values: [default, pulsar, kafka, rocksmq, natsmq]`,
}
p.MaxTolerantLag.Init(base.mgr)

p.MaxDispatcherNumPerPchannel = ParamItem{
Key: "mq.dispatcher.maxDispatcherNumPerPchannel",
Version: "2.4.19",
DefaultValue: "10",
Doc: `The maximum number of dispatchers per physical channel, primarily to limit the number of consumers and prevent performance issues(e.g., during recovery when a large number of channels are watched).`,
Export: true,
}
p.MaxDispatcherNumPerPchannel.Init(base.mgr)

p.RetrySleep = ParamItem{
Key: "mq.dispatcher.retrySleep",
Version: "2.4.19",
DefaultValue: "5",
Doc: `register retry sleep time in seconds`,
Export: true,
}
p.RetrySleep.Init(base.mgr)

p.RetryTimeout = ParamItem{
Key: "mq.dispatcher.retryTimeout",
Version: "2.4.19",
DefaultValue: "300",
Doc: `register retry timeout in seconds`,
Export: true,
}
p.RetryTimeout.Init(base.mgr)

p.TargetBufSize = ParamItem{
Key: "mq.dispatcher.targetBufSize",
Version: "2.4.4",
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/paramtable/service_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ func TestServiceParam(t *testing.T) {
var SParams ServiceParam
bt := NewBaseTable(SkipRemote(true))
SParams.init(bt)

t.Run("test MQConfig", func(t *testing.T) {
Params := &SParams.MQCfg
assert.Equal(t, 1*time.Second, Params.MergeCheckInterval.GetAsDuration(time.Second))
assert.Equal(t, 16, Params.TargetBufSize.GetAsInt())
assert.Equal(t, 3*time.Second, Params.MaxTolerantLag.GetAsDuration(time.Second))
assert.Equal(t, 10, Params.MaxDispatcherNumPerPchannel.GetAsInt())
assert.Equal(t, 5*time.Second, Params.RetrySleep.GetAsDuration(time.Second))
assert.Equal(t, 300*time.Second, Params.RetryTimeout.GetAsDuration(time.Second))
})

t.Run("test etcdConfig", func(t *testing.T) {
Params := &SParams.EtcdCfg

Expand Down
Loading