-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Enable Nats-server as a optional MQ for Milvus #24445
Conversation
@chyezh ut workflow job failed, comment |
@chyezh E2e jenkins job failed, comment |
bc5370b
to
bf349a9
Compare
@chyezh ut workflow job failed, comment |
bf349a9
to
f543ff9
Compare
@chyezh ut workflow job failed, comment |
Codecov Report
@@ Coverage Diff @@
## master #24445 +/- ##
==========================================
+ Coverage 82.18% 82.63% +0.45%
==========================================
Files 777 776 -1
Lines 103129 102013 -1116
==========================================
- Hits 84756 84301 -455
+ Misses 15358 14678 -680
- Partials 3015 3034 +19
|
f543ff9
to
31eebdf
Compare
@chyezh ut workflow job failed, comment |
31eebdf
to
49577f8
Compare
@chyezh E2e jenkins job failed, comment |
/run-cpu-e2e |
// Wait for server to be ready for connections | ||
if !Nmq.ReadyForConnections(initializeTimeout) { | ||
finalErr = fmt.Errorf("nmq is not ready with timeout %s", initializeTimeout) | ||
log.Warn("nmq is not ready", zap.Duration("timeout", initializeTimeout), zap.Reflect("options", opts)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might cause some exception behaviors if nmq server is not ready and skip it within a singleton.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.Fatal is likely better here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new commit changed:
log.Fatal with error field is added into MustInitNatsMQ
|
||
f.msgStreamFactory = f.initMQRemoteService(params) | ||
func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error { | ||
mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable()}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just invoke validateMQType to check parameters, then create msgstream factory, it is more simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some rule is applied if mqType = default
, validateMQType
is not enough.
return errors.Newf("mq.type %s is invalid", mqType) | ||
} | ||
if !standalone && (mqType == mqTypeRocksmq || mqType == mqTypeNatsmq) { | ||
return errors.Newf("mq %s is only valid in standalone mode") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mq type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mq.type is the field of milvus.yaml.
pkg/util/paramtable/service_param.go
Outdated
func (p *MQConfig) Init(base *BaseTable) { | ||
p.Type = ParamItem{ | ||
Key: "mq.type", | ||
Version: "2.2.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be 2.3.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new commit changed:
all field of this commit in yaml is changed to 2.3.0
} | ||
|
||
// Init sets up a new NatsmqConfig instance using the provided BaseTable | ||
func (r *NatsmqConfig) Init(base *BaseTable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set version to 2.3.0 for all new parameters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OOC, where do we read this "version" field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just indicates this parameter bringing into Milvus with this version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, we probably don't need to have it in every single param. A global param is good enough.
For this pr, we can have a local/global variable instead of specifying "2.2.0"/"2.3.0" per param.
pkg/util/paramtable/service_param.go
Outdated
r.Server.MaxMemoryStore = ParamItem{ | ||
Key: "natsmq.server.maxMemoryStore", | ||
Version: "2.2.0", | ||
DefaultValue: "536870912", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
256 MB?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new commit changed:
memory storage is removed, it's not used in natsmq.
pkg/util/paramtable/service_param.go
Outdated
r.Server.MaxFileStore = ParamItem{ | ||
Key: "natsmq.server.maxFileStore", | ||
Version: "2.2.0", | ||
DefaultValue: "68719476736", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it too large?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new commit changed:
16G.
commit changed:
|
8431f07
to
f26e1b8
Compare
@chyezh E2e jenkins job failed, comment |
/run-cpu-e2e |
f26e1b8
to
c6b4e89
Compare
Thanks, is it ready for review now? |
/assign @czs007 |
Signed-off-by: yiwangdr <yiwangdr@gmail.com> bug fixup, configurable natsmq, add unittest, pass e2e. Signed-off-by: chyezh <ye.zhen@zilliz.com> move natsmq to pkg project Signed-off-by: chyezh <ye.zhen@zilliz.com>
c6b4e89
to
e9f22af
Compare
@chyezh E2e jenkins job failed, comment |
/run-cpu-e2e |
@chyezh E2e jenkins job failed, comment |
/run-cpu-e2e |
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: chyezh, czs007, yiwangdr The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Enable Nats-server as a optional MQ for Milvus.
prototype and unit test by @yiwangdr #23606
more unit test, bug fix and pass e2e.
Related issue: #23611