-
Notifications
You must be signed in to change notification settings - Fork 9
/
index.ts
301 lines (256 loc) · 8.49 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
import { logger } from '@libp2p/logger'
import drain from 'it-drain'
import PQueue from 'p-queue'
import defer from 'p-defer'
import errCode from 'err-code'
import anySignal from 'any-signal'
import type { AbortOptions } from 'ipfs-core-types/src/utils'
import type { ContentRouting } from '@libp2p/interface-content-routing'
import type { PeerInfo } from '@libp2p/interface-peer-info'
import type { Startable } from '@libp2p/interfaces/startable'
import type { CID } from 'multiformats/cid'
import type { PeerId } from '@libp2p/interface-peer-id'
const log = logger('libp2p:delegated-content-routing')
const DEFAULT_TIMEOUT = 30e3 // 30 second default
const CONCURRENT_HTTP_REQUESTS = 4
const CONCURRENT_HTTP_REFS_REQUESTS = 2
export interface HTTPClientExtraOptions {
headers?: Record<string, string>
searchParams?: URLSearchParams
}
export enum EventTypes {
SENDING_QUERY = 0,
PEER_RESPONSE,
FINAL_PEER,
QUERY_ERROR,
PROVIDER,
VALUE,
ADDING_PEER,
DIALING_PEER
}
/**
* The types of messages set/received during DHT queries
*/
export enum MessageType {
PUT_VALUE = 0,
GET_VALUE,
ADD_PROVIDER,
GET_PROVIDERS,
FIND_NODE,
PING
}
export type MessageName = keyof typeof MessageType
export interface DHTRecord {
key: Uint8Array
value: Uint8Array
timeReceived?: Date
}
export interface SendingQueryEvent {
type: EventTypes.SENDING_QUERY
name: 'SENDING_QUERY'
}
export interface PeerResponseEvent {
from: PeerId
type: EventTypes.PEER_RESPONSE
name: 'PEER_RESPONSE'
messageType: MessageType
messageName: MessageName
providers: PeerInfo[]
closer: PeerInfo[]
record?: DHTRecord
}
export interface FinalPeerEvent {
peer: PeerInfo
type: EventTypes.FINAL_PEER
name: 'FINAL_PEER'
}
export interface QueryErrorEvent {
type: EventTypes.QUERY_ERROR
name: 'QUERY_ERROR'
error: Error
}
export interface ProviderEvent {
type: EventTypes.PROVIDER
name: 'PROVIDER'
providers: PeerInfo[]
}
export interface ValueEvent {
type: EventTypes.VALUE
name: 'VALUE'
value: Uint8Array
}
export interface AddingPeerEvent {
type: EventTypes.ADDING_PEER
name: 'ADDING_PEER'
peer: PeerId
}
export interface DialingPeerEvent {
peer: PeerId
type: EventTypes.DIALING_PEER
name: 'DIALING_PEER'
}
export type QueryEvent = SendingQueryEvent | PeerResponseEvent | FinalPeerEvent | QueryErrorEvent | ProviderEvent | ValueEvent | AddingPeerEvent | DialingPeerEvent
export interface DHTProvideOptions extends AbortOptions {
recursive?: boolean
}
export interface StatResult {
cid: CID
size: number
}
export interface Delegate {
getEndpointConfig: () => { protocol: string, host: string, port: string }
block: {
stat: (cid: CID, options?: AbortOptions) => Promise<StatResult>
}
dht: {
findProvs: (cid: CID, options?: HTTPClientExtraOptions & AbortOptions) => AsyncIterable<QueryEvent>
provide: (cid: CID, options?: HTTPClientExtraOptions & DHTProvideOptions) => AsyncIterable<QueryEvent>
put: (key: string | Uint8Array, value: Uint8Array, options?: HTTPClientExtraOptions & AbortOptions) => AsyncIterable<QueryEvent>
get: (key: string | Uint8Array, options?: HTTPClientExtraOptions & AbortOptions) => AsyncIterable<QueryEvent>
}
}
/**
* An implementation of content routing, using a delegated peer
*/
class DelegatedContentRouting implements ContentRouting, Startable {
private readonly client: Delegate
private readonly httpQueue: PQueue
private readonly httpQueueRefs: PQueue
private started: boolean
private abortController: AbortController
/**
* Create a new DelegatedContentRouting instance
*/
constructor (client: Delegate) {
if (client == null) {
throw new Error('missing ipfs http client')
}
this.client = client
this.started = false
this.abortController = new AbortController()
// limit concurrency to avoid request flood in web browser
// https://github.com/libp2p/js-libp2p-delegated-content-routing/issues/12
this.httpQueue = new PQueue({
concurrency: CONCURRENT_HTTP_REQUESTS
})
// sometimes refs requests take long time, they need separate queue
// to not suffocate regular business
this.httpQueueRefs = new PQueue({
concurrency: CONCURRENT_HTTP_REFS_REQUESTS
})
const {
protocol,
host,
port
} = client.getEndpointConfig()
log(`enabled DelegatedContentRouting via ${protocol}://${host}:${port}`)
}
isStarted () {
return this.started
}
start () {
this.started = true
}
stop () {
this.httpQueue.clear()
this.httpQueueRefs.clear()
this.abortController.abort()
this.abortController = new AbortController()
this.started = false
}
/**
* Search the dht for providers of the given CID.
*
* - call `findProviders` on the delegated node.
*/
async * findProviders (key: CID, options: HTTPClientExtraOptions & AbortOptions = {}) {
log('findProviders starts: %c', key)
options.timeout = options.timeout ?? DEFAULT_TIMEOUT
options.signal = anySignal([this.abortController.signal].concat((options.signal != null) ? [options.signal] : []))
const onStart = defer()
const onFinish = defer()
void this.httpQueue.add(async () => {
onStart.resolve()
return await onFinish.promise
})
try {
await onStart.promise
for await (const event of this.client.dht.findProvs(key, options)) {
if (event.name === 'PROVIDER') {
yield * event.providers.map(prov => {
const peerInfo: PeerInfo = {
id: prov.id,
protocols: [],
multiaddrs: prov.multiaddrs
}
return peerInfo
})
}
}
} catch (err) {
log.error('findProviders errored:', err)
throw err
} finally {
onFinish.resolve()
log('findProviders finished: %c', key)
}
}
/**
* Announce to the network that the delegated node can provide the given key.
*
* Currently this uses the following hack
* - delegate is one of bootstrap nodes, so we are always connected to it
* - call block stat on the delegated node, so it fetches the content
* - call dht provide with the passed cid
*
* N.B. this must be called for every block in the dag you want provided otherwise
* the delegate will only be able to supply the root block of the dag when asked
* for the data by an interested peer.
*/
async provide (key: CID, options: HTTPClientExtraOptions & AbortOptions = {}) {
log('provide starts: %c', key)
options.timeout = options.timeout ?? DEFAULT_TIMEOUT
options.signal = anySignal([this.abortController.signal].concat((options.signal != null) ? [options.signal] : []))
await this.httpQueueRefs.add(async () => {
await this.client.block.stat(key, options)
await drain(this.client.dht.provide(key, options))
})
log('provide finished: %c', key)
}
/**
* Stores a value in the backing key/value store of the delegated content router.
* This may fail if the delegated node's content routing implementation does not
* use a key/value store, or if the delegated operation fails.
*/
async put (key: Uint8Array, value: Uint8Array, options: HTTPClientExtraOptions & AbortOptions = {}) {
log('put value start: %b', key)
options.timeout = options.timeout ?? DEFAULT_TIMEOUT
options.signal = anySignal([this.abortController.signal].concat((options.signal != null) ? [options.signal] : []))
await this.httpQueue.add(async () => {
await drain(this.client.dht.put(key, value, options))
})
log('put value finished: %b', key)
}
/**
* Fetches an value from the backing key/value store of the delegated content router.
* This may fail if the delegated node's content routing implementation does not
* use a key/value store, or if the delegated operation fails.
*/
async get (key: Uint8Array, options: HTTPClientExtraOptions & AbortOptions = {}) {
log('get value start: %b', key)
options.timeout = options.timeout ?? DEFAULT_TIMEOUT
options.signal = anySignal([this.abortController.signal].concat((options.signal != null) ? [options.signal] : []))
return await this.httpQueue.add(async () => {
for await (const event of this.client.dht.get(key, options)) {
if (event.name === 'VALUE') {
log('get value finished: %b', key)
return event.value
}
}
throw errCode(new Error('Not found'), 'ERR_NOT_FOUND')
})
}
}
export function delegatedContentRouting (client: Delegate): (components?: any) => ContentRouting {
return () => new DelegatedContentRouting(client)
}