@@ -6,6 +6,22 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
66import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
77import { streamResponse } from '../../utils/stream-response.js'
88import pushable from 'it-pushable'
9+ import { base64url } from 'multiformats/bases/base64'
10+
11+ const preDecodeTopicFromHttpRpc = {
12+ assign : 'topic' ,
13+ /**
14+ * @param {import('../../types').Request } request
15+ * @param {import('@hapi/hapi').ResponseToolkit } _h
16+ */
17+ method : async ( request , _h ) => {
18+ try {
19+ return uint8ArrayToString ( base64url . decode ( request . query . topic ) )
20+ } catch ( /** @type {any } */ err ) {
21+ throw Boom . boomify ( err , { message : `Failed to decode topic from HTTP RPC form ${ request . query . topic } ` } )
22+ }
23+ }
24+ }
925
1026export const subscribeResource = {
1127 options : {
@@ -24,7 +40,8 @@ export const subscribeResource = {
2440 override : true ,
2541 ignoreUndefined : true
2642 } )
27- }
43+ } ,
44+ pre : [ preDecodeTopicFromHttpRpc ]
2845 } ,
2946 /**
3047 * @param {import('../../types').Request } request
@@ -40,8 +57,8 @@ export const subscribeResource = {
4057 ipfs
4158 }
4259 } ,
43- query : {
44- topic
60+ pre : {
61+ topic // decoded version created by preDecodeTopicFromHttpRpc
4562 }
4663 } = request
4764
@@ -56,13 +73,11 @@ export const subscribeResource = {
5673 * @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn }
5774 */
5875 const handler = ( msg ) => {
59- // TODO: data, seqno and topicIDs in multibase
60- // TODO: from should use canonical toString from peerid libp2p lib
6176 output . push ( {
62- from : uint8ArrayToString ( uint8ArrayFromString ( msg . from , 'base58btc' ) , 'base64pad' ) ,
63- data : uint8ArrayToString ( msg . data , 'base64pad' ) ,
64- seqno : uint8ArrayToString ( msg . seqno , 'base64pad' ) ,
65- topicIDs : msg . topicIDs
77+ from : msg . from , // TODO: switch to PeerId.parse(msg.from).toString() when go-ipfs defaults to CIDv1
78+ data : base64url . encode ( msg . data ) ,
79+ seqno : base64url . encode ( msg . seqno ) ,
80+ topicIDs : msg . topicIDs . map ( t => base64url . encode ( uint8ArrayFromString ( t ) ) )
6681 } )
6782 }
6883
@@ -92,7 +107,7 @@ export const publishResource = {
92107 parse : false ,
93108 output : 'stream'
94109 } ,
95- pre : [ {
110+ pre : [ preDecodeTopicFromHttpRpc , {
96111 assign : 'data' ,
97112 /**
98113 * @param {import('../../types').Request } request
@@ -149,16 +164,15 @@ export const publishResource = {
149164 }
150165 } ,
151166 pre : {
167+ topic,
152168 data
153169 } ,
154170 query : {
155- topic,
156171 timeout
157172 }
158173 } = request
159174
160175 try {
161- // TODO: unwrap topic from multibase?
162176 await ipfs . pubsub . publish ( topic , data , {
163177 signal,
164178 timeout
@@ -212,8 +226,7 @@ export const lsResource = {
212226 throw Boom . boomify ( err , { message : 'Failed to list subscriptions' } )
213227 }
214228
215- // TODO: multibase topic names in Strings array
216- return h . response ( { Strings : subscriptions } )
229+ return h . response ( { Strings : subscriptions . map ( s => base64url . encode ( uint8ArrayFromString ( s ) ) ) } )
217230 }
218231}
219232
@@ -232,7 +245,8 @@ export const peersResource = {
232245 override : true ,
233246 ignoreUndefined : true
234247 } )
235- }
248+ } ,
249+ pre : [ preDecodeTopicFromHttpRpc ]
236250 } ,
237251 /**
238252 * @param {import('../../types').Request } request
@@ -248,15 +262,16 @@ export const peersResource = {
248262 ipfs
249263 }
250264 } ,
265+ pre : {
266+ topic
267+ } ,
251268 query : {
252- topic,
253269 timeout
254270 }
255271 } = request
256272
257273 let peers
258274 try {
259- // TODO: unwrap topic from multibase
260275 peers = await ipfs . pubsub . peers ( topic , {
261276 signal,
262277 timeout
0 commit comments