Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [2.4] Fix consume blocked due to too many consumers #38916

Open
wants to merge 3 commits into
base: 2.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: Fix consume blocked due to too many consumers
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper committed Jan 1, 2025
commit 5d42822906cc285a98ba50d5096c08b4a669ea6e
3 changes: 3 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ mq:
mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge
targetBufSize: 16 # the lenth of channel buffer for targe
maxTolerantLag: 3 # Default value: "3", the timeout(in seconds) that target sends msgPack
maxDispatcherNumPerPchannel: 10 # The maximum number of dispatchers per physical channel, primarily to limit the number of consumers and prevent performance issues(e.g., during recovery when a large number of channels are watched).
retrySleep: 5 # register retry sleep time in seconds
retryTimeout: 300 # register retry timeout in seconds

# Related configuration of pulsar, used to manage Milvus logs of recent mutation operations, output streaming log, and provide log publish-subscribe services.
pulsar:
Expand Down
5 changes: 5 additions & 0 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/lock"
<<<<<<< HEAD
"github.com/milvus-io/milvus/pkg/util/logutil"
=======
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
>>>>>>> 1bcf3d9ee7 (fix: Fix consume blocked due to too many consumers)
)

// ChannelManagerImpl manages the allocation and the balance between channels and data nodes.
Expand Down
39 changes: 33 additions & 6 deletions internal/datanode/flow_graph_dmstream_input_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -41,24 +43,49 @@ import (
// flowgraph ddNode.
func newDmInputNode(initCtx context.Context, dispatcherClient msgdispatcher.Client, seekPos *msgpb.MsgPosition, dmNodeConfig *nodeConfig) (*flowgraph.InputNode, error) {
log := log.With(zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Int64("collectionID", dmNodeConfig.collectionID),
zap.String("vchannel", dmNodeConfig.vChannelName))
var err error
var input <-chan *msgstream.MsgPack

var (
input <-chan *msgstream.MsgPack
err error
start = time.Now()
)

if seekPos != nil && len(seekPos.MsgID) != 0 {
input, err = dispatcherClient.Register(initCtx, dmNodeConfig.vChannelName, seekPos, common.SubscriptionPositionUnknown)
err := retry.Handle(initCtx, func() (bool, error) {
input, err = dispatcherClient.Register(initCtx, dmNodeConfig.vChannelName, seekPos, common.SubscriptionPositionUnknown)
if err != nil {
log.Warn("datanode consume failed", zap.Error(err))
return errors.Is(err, msgdispatcher.ErrTooManyConsumers), err
}
return false, nil
}, retry.Sleep(paramtable.Get().MQCfg.RetrySleep.GetAsDuration(time.Second)), // 5 seconds
retry.MaxSleepTime(paramtable.Get().MQCfg.RetryTimeout.GetAsDuration(time.Second))) // 5 minutes
if err != nil {
log.Warn("datanode consume failed after retried", zap.Error(err))
return nil, err
}

log.Info("datanode seek successfully when register to msgDispatcher",
zap.ByteString("msgID", seekPos.GetMsgID()),
zap.Time("tsTime", tsoutil.PhysicalTime(seekPos.GetTimestamp())),
zap.Duration("tsLag", time.Since(tsoutil.PhysicalTime(seekPos.GetTimestamp()))))
zap.Duration("tsLag", time.Since(tsoutil.PhysicalTime(seekPos.GetTimestamp()))),
zap.Duration("dur", time.Since(start)))
} else {
input, err = dispatcherClient.Register(initCtx, dmNodeConfig.vChannelName, nil, common.SubscriptionPositionEarliest)
err = retry.Handle(initCtx, func() (bool, error) {
input, err = dispatcherClient.Register(initCtx, dmNodeConfig.vChannelName, nil, common.SubscriptionPositionEarliest)
if err != nil {
log.Warn("datanode consume failed", zap.Error(err))
return errors.Is(err, msgdispatcher.ErrTooManyConsumers), err
}
return false, nil
}, retry.Sleep(paramtable.Get().MQCfg.RetrySleep.GetAsDuration(time.Second)), // 5 seconds
retry.MaxSleepTime(paramtable.Get().MQCfg.RetryTimeout.GetAsDuration(time.Second))) // 5 minutes
if err != nil {
log.Warn("datanode consume failed after retried", zap.Error(err))
return nil, err
}

log.Info("datanode consume successfully when register to msgDispatcher")
}

Expand Down
26 changes: 24 additions & 2 deletions internal/util/pipeline/stream_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,26 @@ import (
"sync"
"time"

<<<<<<< HEAD
=======
"github.com/cockroachdb/errors"
"go.uber.org/atomic"
>>>>>>> 1bcf3d9ee7 (fix: Fix consume blocked due to too many consumers)
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
<<<<<<< HEAD
=======
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
>>>>>>> 1bcf3d9ee7 (fix: Fix consume blocked due to too many consumers)
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)

Expand Down Expand Up @@ -71,11 +84,20 @@ func (p *streamPipeline) ConsumeMsgStream(ctx context.Context, position *msgpb.M
}

start := time.Now()
p.input, err = p.dispatcher.Register(ctx, p.vChannel, position, common.SubscriptionPositionUnknown)
err = retry.Handle(ctx, func() (bool, error) {
p.input, err = p.dispatcher.Register(ctx, p.vChannel, position, common.SubscriptionPositionUnknown)
if err != nil {
log.Warn("dispatcher register failed", zap.String("channel", position.ChannelName), zap.Error(err))
return errors.Is(err, msgdispatcher.ErrTooManyConsumers), err
}
return false, nil
}, retry.Sleep(paramtable.Get().MQCfg.RetrySleep.GetAsDuration(time.Second)), // 5 seconds
retry.MaxSleepTime(paramtable.Get().MQCfg.RetryTimeout.GetAsDuration(time.Second))) // 5 minutes
if err != nil {
log.Error("dispatcher register failed", zap.String("channel", position.ChannelName))
log.Error("dispatcher register failed after retried", zap.String("channel", position.ChannelName), zap.Error(err))
return WrapErrRegDispather(err)
}

ts, _ := tsoutil.ParseTS(position.GetTimestamp())
log.Info("stream pipeline seeks from position with msgDispatcher",
zap.String("pchannel", position.ChannelName),
Expand Down
2 changes: 2 additions & 0 deletions internal/util/pipeline/stream_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

type StreamPipelineSuite struct {
Expand All @@ -43,6 +44,7 @@ type StreamPipelineSuite struct {
}

func (suite *StreamPipelineSuite) SetupTest() {
paramtable.Init()
suite.channel = "test-channel"
suite.inChannel = make(chan *msgstream.MsgPack, 1)
suite.outChannel = make(chan msgstream.Timestamp)
Expand Down
18 changes: 15 additions & 3 deletions pkg/mq/msgdispatcher/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package msgdispatcher

import (
"context"
"time"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
Expand All @@ -27,9 +29,12 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var ErrTooManyConsumers = errors.New("consumer number limit exceeded")

type (
Pos = msgpb.MsgPosition
MsgPack = msgstream.MsgPack
Expand Down Expand Up @@ -66,6 +71,7 @@ func (c *client) Register(ctx context.Context, vchannel string, pos *Pos, subPos
log := log.With(zap.String("role", c.role),
zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel))
pchannel := funcutil.ToPhysicalChannel(vchannel)
start := time.Now()
c.managerMut.Lock(pchannel)
defer c.managerMut.Unlock(pchannel)
var manager DispatcherManager
Expand All @@ -75,6 +81,11 @@ func (c *client) Register(ctx context.Context, vchannel string, pos *Pos, subPos
c.managers.Insert(pchannel, manager)
go manager.Run()
}
// Check if the consumer number limit has been reached.
if manager.Num() >= paramtable.Get().MQCfg.MaxDispatcherNumPerPchannel.GetAsInt() {
return nil, ErrTooManyConsumers
}
// Begin to register
ch, err := manager.Add(ctx, vchannel, pos, subPos)
if err != nil {
if manager.Num() == 0 {
Expand All @@ -84,12 +95,13 @@ func (c *client) Register(ctx context.Context, vchannel string, pos *Pos, subPos
log.Error("register failed", zap.Error(err))
return nil, err
}
log.Info("register done")
log.Info("register done", zap.Duration("dur", time.Since(start)))
return ch, nil
}

func (c *client) Deregister(vchannel string) {
pchannel := funcutil.ToPhysicalChannel(vchannel)
start := time.Now()
c.managerMut.Lock(pchannel)
defer c.managerMut.Unlock(pchannel)
if manager, ok := c.managers.Get(pchannel); ok {
Expand All @@ -98,8 +110,8 @@ func (c *client) Deregister(vchannel string) {
manager.Close()
c.managers.Remove(pchannel)
}
log.Info("deregister done", zap.String("role", c.role),
zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel))
log.Info("deregister done", zap.String("role", c.role), zap.Int64("nodeID", c.nodeID),
zap.String("vchannel", vchannel), zap.Duration("dur", time.Since(start)))
}
}

Expand Down
36 changes: 33 additions & 3 deletions pkg/util/paramtable/service_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,12 @@ type MQConfig struct {
IgnoreBadPosition ParamItem `refreshable:"true"`

// msgdispatcher
MergeCheckInterval ParamItem `refreshable:"false"`
TargetBufSize ParamItem `refreshable:"false"`
MaxTolerantLag ParamItem `refreshable:"true"`
MergeCheckInterval ParamItem `refreshable:"false"`
TargetBufSize ParamItem `refreshable:"false"`
MaxTolerantLag ParamItem `refreshable:"true"`
MaxDispatcherNumPerPchannel ParamItem `refreshable:"true"`
RetrySleep ParamItem `refreshable:"true"`
RetryTimeout ParamItem `refreshable:"true"`
}

// Init initializes the MQConfig object with a BaseTable.
Expand All @@ -535,6 +538,33 @@ Valid values: [default, pulsar, kafka, rocksmq, natsmq]`,
}
p.MaxTolerantLag.Init(base.mgr)

p.MaxDispatcherNumPerPchannel = ParamItem{
Key: "mq.dispatcher.maxDispatcherNumPerPchannel",
Version: "2.4.19",
DefaultValue: "10",
Doc: `The maximum number of dispatchers per physical channel, primarily to limit the number of consumers and prevent performance issues(e.g., during recovery when a large number of channels are watched).`,
Export: true,
}
p.MaxDispatcherNumPerPchannel.Init(base.mgr)

p.RetrySleep = ParamItem{
Key: "mq.dispatcher.retrySleep",
Version: "2.4.19",
DefaultValue: "5",
Doc: `register retry sleep time in seconds`,
Export: true,
}
p.RetrySleep.Init(base.mgr)

p.RetryTimeout = ParamItem{
Key: "mq.dispatcher.retryTimeout",
Version: "2.4.19",
DefaultValue: "300",
Doc: `register retry timeout in seconds`,
Export: true,
}
p.RetryTimeout.Init(base.mgr)

p.TargetBufSize = ParamItem{
Key: "mq.dispatcher.targetBufSize",
Version: "2.4.4",
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/paramtable/service_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ func TestServiceParam(t *testing.T) {
var SParams ServiceParam
bt := NewBaseTable(SkipRemote(true))
SParams.init(bt)

t.Run("test MQConfig", func(t *testing.T) {
Params := &SParams.MQCfg
assert.Equal(t, 1*time.Second, Params.MergeCheckInterval.GetAsDuration(time.Second))
assert.Equal(t, 16, Params.TargetBufSize.GetAsInt())
assert.Equal(t, 3*time.Second, Params.MaxTolerantLag.GetAsDuration(time.Second))
assert.Equal(t, 10, Params.MaxDispatcherNumPerPchannel.GetAsInt())
assert.Equal(t, 5*time.Second, Params.RetrySleep.GetAsDuration(time.Second))
assert.Equal(t, 300*time.Second, Params.RetryTimeout.GetAsDuration(time.Second))
})

t.Run("test etcdConfig", func(t *testing.T) {
Params := &SParams.EtcdCfg

Expand Down