Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

WIP: feat/pubsub \o/ #610

Merged
merged 10 commits into from
Dec 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
"form-data": "^2.1.2",
"fs-pull-blob-store": "^0.4.1",
"gulp": "^3.9.1",
"interface-ipfs-core": "^0.22.0",
"interface-ipfs-core": "git+https://github.com/ipfs/interface-ipfs-core.git#5c7df414a8f627f8adb50a52ef8d2b629381285f",
"left-pad": "^1.1.3",
"lodash": "^4.17.2",
"ncp": "^2.0.0",
Expand All @@ -80,7 +80,7 @@
"hapi": "^16.0.0",
"hapi-set-header": "^1.0.2",
"idb-pull-blob-store": "^0.5.1",
"ipfs-api": "^12.0.0",
"ipfs-api": "git+https://github.com/ipfs/js-ipfs-api.git#01044a1f59fb866e4e08b06aae4e74d968615931",
"ipfs-bitswap": "^0.8.1",
"ipfs-block": "^0.5.0",
"ipfs-block-service": "^0.7.0",
Expand All @@ -91,8 +91,9 @@
"ipld-resolver": "^0.3.0",
"isstream": "^0.1.2",
"joi": "^10.0.1",
"libp2p-ipfs-nodejs": "^0.16.1",
"libp2p-floodsub": "0.3.1",
"libp2p-ipfs-browser": "^0.17.0",
"libp2p-ipfs-nodejs": "^0.16.1",
"lodash.flatmap": "^4.5.0",
"lodash.get": "^4.4.2",
"lodash.has": "^4.5.2",
Expand All @@ -102,6 +103,7 @@
"mafmt": "^2.1.2",
"multiaddr": "^2.1.1",
"multihashes": "^0.3.0",
"ndjson": "1.5.0",
"path-exists": "^3.0.0",
"peer-book": "^0.3.0",
"peer-id": "^0.8.0",
Expand Down Expand Up @@ -149,4 +151,4 @@
"nginnever <ginneversource@gmail.com>",
"npmcdn-to-unpkg-bot <npmcdn-to-unpkg-bot@users.noreply.github.com>"
]
}
}
17 changes: 17 additions & 0 deletions src/cli/commands/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict'

// The command count bump from 56 to 60 depends on:
// ipfs/interface-ipfs-core.git#5c7df414a8f627f8adb50a52ef8d2b629381285f
// ipfs/js-ipfs-api.git#01044a1f59fb866e4e08b06aae4e74d968615931
module.exports = {
command: 'pubsub',

description: 'pubsub commands',

builder (yargs) {
return yargs
.commandDir('pubsub')
},

handler (argv) {}
}
30 changes: 30 additions & 0 deletions src/cli/commands/pubsub/ls.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'ls',

describe: 'Get your list of subscriptions',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.pubsub.ls((err, subscriptions) => {
if (err) {
throw err
}

console.log(JSON.stringify(subscriptions, null, 2))
})
})
}
}
30 changes: 30 additions & 0 deletions src/cli/commands/pubsub/peers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'peers <topic>',

describe: 'Get all peers subscribed to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.pubsub.peers(argv.topic, (err, peers) => {
if (err) {
throw err
}

console.log(JSON.stringify(peers, null, 2))
})
})
}
}
28 changes: 28 additions & 0 deletions src/cli/commands/pubsub/publish.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'publish <topic> <data>',

describe: 'Publish data to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.pubsub.publish(argv.topic, argv.data, (err) => {
if (err) {
throw err
}
})
})
}
}
32 changes: 32 additions & 0 deletions src/cli/commands/pubsub/subscribe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'subscribe <topic>',

alias: 'sub',

describe: 'Subscribe to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.pubsub.subscribe(argv.topic, (err, stream) => {
if (err) {
throw err
}

console.log(stream.toString())
})
})
}
}
6 changes: 6 additions & 0 deletions src/core/components/go-online.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const series = require('async/series')
const Bitswap = require('ipfs-bitswap')
const FloodSub = require('libp2p-floodsub')

module.exports = function goOnline (self) {
return (cb) => {
Expand All @@ -21,6 +22,11 @@ module.exports = function goOnline (self) {
)
self._bitswap.start()
self._blockService.goOnline(self._bitswap)

//
self._pubsub = new FloodSub(self._libp2pNode)
//

cb()
})
}
Expand Down
131 changes: 131 additions & 0 deletions src/core/components/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
'use strict'

const promisify = require('promisify-es6')
const Readable = require('stream').Readable
const _values = require('lodash.values')

const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR

let subscriptions = {}

const addSubscription = (topic, request, stream) => {
subscriptions[topic] = { request: request, stream: stream }
}

const removeSubscription = promisify((topic, callback) => {
if (!subscriptions[topic]) {
return callback(new Error(`Not subscribed to ${topic}`))
}

subscriptions[topic].stream.emit('end')
delete subscriptions[topic]

if (callback) {
callback(null)
}
})

module.exports = function pubsub (self) {
return {
subscribe: promisify((topic, options, callback) => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}

if (typeof options === 'function') {
callback = options
options = {}
}

if (subscriptions[topic]) {
return callback(`Error: Already subscribed to '${topic}'`)
}

const stream = new Readable({ objectMode: true })

stream._read = () => {}

// There is no explicit unsubscribe; subscriptions have a cancel event
stream.cancel = promisify((cb) => {
self._pubsub.unsubscribe(topic)
removeSubscription(topic, cb)
})

self._pubsub.on(topic, (data) => {
stream.emit('data', {
data: data.toString(),
topicIDs: [topic]
})
})

try {
self._pubsub.subscribe(topic)
} catch (err) {
return callback(err)
}

// Add the request to the active subscriptions and return the stream
addSubscription(topic, null, stream)
callback(null, stream)
}),

publish: promisify((topic, data, callback) => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}

const buf = Buffer.isBuffer(data) ? data : new Buffer(data)

try {
self._pubsub.publish(topic, buf)
} catch (err) {
return callback(err)
}

callback(null)
}),

ls: promisify((callback) => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}

let subscriptions = []

try {
subscriptions = self._pubsub.getSubscriptions()
} catch (err) {
return callback(err)
}

callback(null, subscriptions)
}),

peers: promisify((topic, callback) => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}

if (!subscriptions[topic]) {
return callback(`Error: Not subscribed to '${topic}'`)
}

let peers = []

try {
const peerSet = self._pubsub.getPeerSet()
_values(peerSet).forEach((peer) => {
const idB58Str = peer.peerInfo.id.toB58String()
const index = peer.topics.indexOf(topic)
if (index > -1) {
peers.push(idB58Str)
}
})
} catch (err) {
return callback(err)
}

callback(null, peers)
})
}
}
3 changes: 3 additions & 0 deletions src/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const swarm = require('./components/swarm')
const ping = require('./components/ping')
const files = require('./components/files')
const bitswap = require('./components/bitswap')
const pubsub = require('./components/pubsub')

exports = module.exports = IPFS

Expand All @@ -44,6 +45,7 @@ function IPFS (repoInstance) {
this._bitswap = null
this._blockService = new BlockService(this._repo)
this._ipldResolver = new IPLDResolver(this._blockService)
this._pubsub = null

// IPFS Core exposed components

Expand All @@ -67,4 +69,5 @@ function IPFS (repoInstance) {
this.files = files(this)
this.bitswap = bitswap(this)
this.ping = ping(this)
this.pubsub = pubsub(this)
}
1 change: 1 addition & 0 deletions src/http-api/resources/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ exports.block = require('./block')
exports.swarm = require('./swarm')
exports.bitswap = require('./bitswap')
exports.files = require('./files')
exports.pubsub = require('./pubsub')
Loading