Skip to content

Commit

Permalink
Added test on broker.subscribe & broker.unsubscribe function (#418)
Browse files Browse the repository at this point in the history
* Fixed typings

* Added test on broker.[un]subscribe

* Fixed race condition

* programmatically subscribe topic in server side

* Fixed custom function in broker.unsubscribe test
  • Loading branch information
gnought authored Feb 11, 2020
1 parent c3026bc commit 5110824
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 8 deletions.
4 changes: 2 additions & 2 deletions aedes.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ declare namespace aedes {
): void
subscribe (
topic: string,
deliverfunc: (packet: ISubscribePacket, callback: () => void) => void,
deliverfunc: (packet: IPublishPacket, callback: () => void) => void,
callback: () => void
): void
unsubscribe (
topic: string,
deliverfunc: (packet: IUnsubscribePacket, callback: () => void) => void,
deliverfunc: (packet: IPublishPacket, callback: () => void) => void,
callback: () => void
): void
close (callback?: () => void): void
Expand Down
122 changes: 121 additions & 1 deletion test/client-pub-sub.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const { test } = require('tap')
const { setup, connect } = require('./helper')
const { setup, connect, subscribe, noError } = require('./helper')
const aedes = require('../')

test('publish direct to a single client QoS 0', function (t) {
Expand Down Expand Up @@ -604,3 +604,123 @@ test('should not receive a message on negated subscription', function (t) {
t.fail('Packet should not be received')
})
})

test('programmatically add custom subscribe', function (t) {
t.plan(6)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

const s = connect(setup(broker))
const expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
retain: false,
length: 12,
dup: false
}
var deliverP = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
retain: false
}
subscribe(t, s, 'hello', 0, function () {
broker.subscribe('hello', deliver, function () {
t.pass('subscribed')
})
s.outStream.on('data', function (packet) {
t.deepEqual(packet, expected, 'packet matches')
})
s.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'world',
qos: 0,
messageId: 42
})
})
function deliver (packet, cb) {
deliverP.brokerId = s.broker.id
deliverP.brokerCounter = s.broker.counter
t.deepEqual(packet, deliverP, 'packet matches')
cb()
}
})

test('custom function in broker.subscribe', function (t) {
t.plan(4)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

const s = setup(broker)
var expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 1,
retain: false,
messageId: undefined
}
connect(s, {}, function () {
broker.subscribe('hello', deliver, function () {
t.pass('subscribed')
})
s.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'world',
qos: 1,
messageId: 42
})
})
broker.on('publish', function (packet, client) {
if (client) {
t.equal(packet.topic, 'hello')
t.equal(packet.messageId, 42)
}
})
function deliver (packet, cb) {
expected.brokerId = s.broker.id
expected.brokerCounter = s.broker.counter
t.deepEqual(packet, expected, 'packet matches')
cb()
}
})

test('custom function in broker.unsubscribe', function (t) {
t.plan(3)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

const s = noError(setup(broker))
connect(s, {}, function () {
broker.subscribe('hello', deliver, function () {
t.pass('subscribed')
broker.unsubscribe('hello', deliver, function () {
t.pass('unsubscribe')
s.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'word',
qos: 1,
messageId: 42
})
})
})
})
broker.on('publish', function (packet, client) {
if (client) {
t.pass('publish')
}
})
function deliver (packet, cb) {
t.fail('shoudl not be called')
cb()
}
})
10 changes: 5 additions & 5 deletions test/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/* eslint no-undef: 0 */

import { Server, Client, AuthenticateError } from '../../aedes'
import { IPublishPacket, ISubscribePacket, ISubscription, IUnsubscribePacket } from 'mqtt-packet'
import { IPublishPacket, ISubscription } from 'mqtt-packet'
import { createServer } from 'net'

const broker = Server({
Expand Down Expand Up @@ -122,17 +122,17 @@ broker.on('unsubscribe', (subscriptions, client) => {
console.log(`client: ${client.id} subsribe`)
})

broker.subscribe('aaaa', (packet: ISubscribePacket, cb) => {
broker.subscribe('aaaa', (packet: IPublishPacket, cb) => {
console.log('cmd')
console.log(packet.subscriptions)
console.log(packet.cmd)
cb()
}, () => {
console.log('done subscribing')
})

broker.unsubscribe('aaaa', (packet: IUnsubscribePacket, cb) => {
broker.unsubscribe('aaaa', (packet: IPublishPacket, cb) => {
console.log('cmd')
console.log(packet.unsubscriptions)
console.log(packet.cmd)
cb()
}, () => {
console.log('done unsubscribing')
Expand Down

0 comments on commit 5110824

Please sign in to comment.