-
-
Notifications
You must be signed in to change notification settings - Fork 23
Reimplementation of offline subscriptions #31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- switch all redis operations to _getPipeline
|
tests are failing 'cause of this issue |
persistence.js
Outdated
| // this.clientId = clientId | ||
| // this.topic = topic | ||
| // this.qos = qos | ||
| // } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please remove this if it is not needed anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sure
|
Do you have a benchmark result for this? |
Doing this right now... Will post results. |
|
|
a single Aedes process is able to handle 250 connections each with 3 subscriptions per second (around 700 subs/sec) in my tests, but any thing less or near eats cpu above 90%, |
|
These are impressive numbers. Definitely +1. @GavinDmello what do you think? |
persistence.js
Outdated
| multi.exec(cb) | ||
| var pipeline = this._getPipeline() | ||
| pipeline.rpush(listKey, key) | ||
| pipeline.set(key, msgpack.encode(new Packet(packet)), cb) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simply use db.set here ? These keys can be sharded in case of nutcracker. It won't be sharded this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may revert all _getPipeline usages to direct _db to support redis cluster/nutcracker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
persistence.js
Outdated
|
|
||
| multi.exec(cb) | ||
| var pipeline = this._getPipeline() | ||
| pipeline.rpush(listKey, key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be a Redis set too for the sake of dedupe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain a scenario we may need deduping on packet.brokerId + ':' + packet.brokerCounter inside outgoing messages for a client @GavinDmello ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just to be safe. Using a list was a bad idea here, I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by SMEMBERS we can't control the offline message flow to the client.
lets see what @mcollina thinks about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the list is the correct behavior, as we to retrieve the elements in insertion order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright 👍
behrad
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good news: _db is performing better than _getPipeline in _setup where we are doing heavy hgetalls.
persistence.js
Outdated
| multi.exec(cb) | ||
| var pipeline = this._getPipeline() | ||
| pipeline.rpush(listKey, key) | ||
| pipeline.set(key, msgpack.encode(new Packet(packet)), cb) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may revert all _getPipeline usages to direct _db to support redis cluster/nutcracker
|
implemented createRetainedStreamCombi here & upgraded to aedes-cached-persistence 4.0.0 @mcollina |
|
@mcollina ping |
mcollina
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove completely the pipeline support? It is mostly gone now, so we can completely remove it then.
persistence.js
Outdated
| offlines.push(sub.topic) | ||
| count++ | ||
| that._waitFor(client, sub.topic, finish) | ||
| // TODO don't wait the client an extra tick |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Despite mosca, aedes is publishing sub topics, and even the local process is waiting for the subscribed handler (inside cached-persistence). My points:
-
to call the callback sooner, we may go mosca way and check broker.id inside message handler. Or we may even call the callback without this check and let the subscription take effect in background after client subacked !?
-
for subscription publication between nodes, cached-persistence is using broker's pub/sub. Could this be handled any lower level, not to pollute MQTT pub/sub with inter-node messaging!?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
this is needed to provide a stable unit testing base. Mosca had so many spurious unit tests, and some of them relies on
setTimeoutto work. One of the underlining goal of aedes is to make maintenance and evolution simpler, and this goes into that direction. -
no
I'm ok if you would like to try and remove that (or maybe disable it with an option?), but let's drop the TODO for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed TODO 👍
persistence.js
Outdated
|
|
||
| patterns.map(function (pattern) { | ||
| qlobber.add(pattern, true) | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should use a for loop here, it's way faster is this is going to be in a hot path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| } | ||
|
|
||
| opts = opts || {} | ||
| this.maxSessionDelivery = opts.maxSessionDelivery || 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please document this option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
persistence.js
Outdated
|
|
||
| this._db.srem(subsKey, subs) // TODO matcher.match should be checked | ||
|
|
||
| this._db.hdel(clientSubKey, subs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should specify callbacks for this, and deal with potential errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you check this now @mcollina ?
persistence.js
Outdated
| splitStream.emit('error', err) | ||
| } else { | ||
| splitStream.write(results) | ||
| splitStream.write(clientIds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can completely avoid the use of splitStream here, just do the processing and call write on the throughv instance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
mcollina
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but I would like @GavinDmello review as well.
|
@GavinDmello ping |
| var pipeline = this._getPipeline() | ||
| pipeline.lrem(willKey, 0, key) | ||
| pipeline.getBuffer(key, function getClientWill (err, packet) { | ||
| var that = this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is not required here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not? that is used in line 479
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this ok @GavinDmello ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant we can just use this as its in the same scope :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using that in 479 inside a callback, Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean the getBuffer callback? This is weird, it shows me it's in the same scope. i.e 479 is outside the getClientWill function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, del should have been inside the getBuffer after multi being removed, fixed it just now @GavinDmello
|
Minor change I mentioned above. I'm 👍 with this PR . |
|
I'm not sure why node v5 is failed! |
|
Let me try running it again |
counter:offline:subscriptionskey. offline subs are SCARD ofsubsSETcounter:offline:clientskey. offline clients are SCARD ofclientsSETsub:client, no subscriptions index of clientIds are stored any more. all subscription list api methods are implemented VIA internal Qlobber.client:sub:toclient:clientsSET added containing all client idssubsSET added containing all topics_getPipelineWould you please review these @mcollina @GavinDmello ?