Skip to content

Commit 1fc8cff

Browse files
committed
fix(reku): edge cases
1 parent 1f0286a commit 1fc8cff

File tree

4 files changed

+126
-23
lines changed

4 files changed

+126
-23
lines changed

packages/orap/tests/test-case.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import dotenv from 'dotenv'
2+
import { RekuProviderManager } from '@ora-io/reku'
3+
// import { sleep } from '@ora-io/utils'
4+
import { startDemo } from '../examples/declarativeDemo/app'
5+
6+
dotenv.config({ path: './packages/orap/tests/.env' })
7+
8+
const chain = 'mainnet'
9+
10+
const wsProvider: RekuProviderManager = new RekuProviderManager(
11+
process.env[`${chain.toUpperCase()}_WSS`]!,
12+
{
13+
// heartbeatInterval: 100,
14+
disabledHeartbeat: true,
15+
},
16+
)
17+
const httpProvider: RekuProviderManager = new RekuProviderManager(
18+
process.env[`${chain.toUpperCase()}_HTTP`]!,
19+
{
20+
// heartbeatInterval: 500,
21+
},
22+
)
23+
const storeConfig = { port: parseInt(process.env.REDIS_PORT!), host: process.env.REDIS_HOST }
24+
25+
setTimeout(async () => {
26+
// for (let i = 0; i < 50; i++) {
27+
// wsProvider.reconnect()
28+
// await sleep(1000)
29+
// }
30+
// setInterval(() => {
31+
// httpProvider.reconnect()
32+
// }, 2000)
33+
}, 10000 * 2)
34+
35+
startDemo({ wsProvider, httpProvider }, storeConfig)

packages/reku/src/event/crosschecker/autochecker.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ export class AutoCrossChecker extends BaseCrossChecker {
5858

5959
this.cache = new CrossCheckerCacheManager(options?.store, { keyPrefix: options?.storeKeyPrefix, ttl: options?.storeTtl })
6060

61-
let latestBlockNum = await timeoutWithRetry(() => this.provider.provider?.getBlockNumber(), 15 * 1000, 3)
61+
let latestBlockNum = await timeoutWithRetry(() => {
62+
if (!this.provider || !this.provider.provider)
63+
throw new Error('provider not ready')
64+
return this.provider.provider?.getBlockNumber()
65+
}, 15 * 1000, 3)
6266

6367
// resume checkpoint priority: options.fromBlock > cache > latestBlockNum + 1
6468
const defaultInitCheckpoint = await this.cache.getCheckpoint() ?? (latestBlockNum)
@@ -87,7 +91,11 @@ export class AutoCrossChecker extends BaseCrossChecker {
8791
}
8892

8993
const waitNextCrosscheck = async (): Promise<boolean> => {
90-
latestBlockNum = await timeoutWithRetry(() => this.provider.provider?.getBlockNumber(), 15 * 1000, 3)
94+
latestBlockNum = await timeoutWithRetry(() => {
95+
if (!this.provider || !this.provider.provider)
96+
throw new Error('provider not ready')
97+
return this.provider.provider?.getBlockNumber()
98+
}, 15 * 1000, 3)
9199

92100
// If auto-follow is enabled, update toBlock and check block range
93101
if (options.autoFollowLatestBlock) {
@@ -145,7 +153,9 @@ export class AutoCrossChecker extends BaseCrossChecker {
145153
else {
146154
debug('Because the latest block %d is too old, skip this cross check', latestBlockNum)
147155
}
148-
return endingCondition()
156+
const end = endingCondition()
157+
debug('polling ending condition: %s', end)
158+
return end
149159
}, pollingInterval)
150160
}
151161

packages/reku/src/event/crosschecker/basechecker.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@ export class BaseCrossChecker {
3535

3636
// define from, to
3737
// TODO: use blockNumber for performance
38-
const block = await timeoutWithRetry(() => this.provider.provider.getBlock('latest'), 15 * 1000, 3)
38+
const block = await timeoutWithRetry(() => {
39+
if (!this.provider || !this.provider.provider)
40+
throw new Error('provider not ready')
41+
return this.provider.provider.getBlock('latest')
42+
}, 15 * 1000, 3)
3943
if (!block) {
4044
console.warn('crosscheck failed to get latest block')
4145
return
@@ -58,7 +62,11 @@ export class BaseCrossChecker {
5862
ccfOptions: CrossCheckFromParam,
5963
) {
6064
// TODO: use blockNumber for performance
61-
const block = await timeoutWithRetry(() => this.provider.provider.getBlock('latest'), 15 * 1000, 3)
65+
const block = await timeoutWithRetry(() => {
66+
if (!this.provider || !this.provider.provider)
67+
throw new Error('provider not ready')
68+
return this.provider.provider.getBlock('latest')
69+
}, 15 * 1000, 3)
6270
if (!block) {
6371
console.warn('crosscheck failed to get latest block')
6472
return
@@ -121,7 +129,11 @@ export class BaseCrossChecker {
121129
...(topics && { topics }),
122130
}
123131
if (this.provider.provider) {
124-
const logs = await timeoutWithRetry(() => this.provider.provider.getLogs(params), 15 * 1000, 3)
132+
const logs = await timeoutWithRetry(() => {
133+
if (!this.provider || !this.provider.provider)
134+
throw new Error('provider not ready')
135+
return this.provider.provider.getLogs(params)
136+
}, 15 * 1000, 3)
125137
// get ignoreLogs keys
126138
const ignoreLogs = options.ignoreLogs
127139

packages/reku/src/provider/provider.ts

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import { EventEmitter } from 'node:events'
22
import type { InterfaceAbi } from 'ethers'
33
import { Interface, WebSocketProvider, ethers } from 'ethers'
4-
import type { ErrorEvent, WebSocket } from 'ws'
5-
import type { ContractAddress } from '@ora-io/utils'
4+
import { WebSocket } from 'ws'
5+
import type { ErrorEvent } from 'ws'
6+
import { type ContractAddress, isInstanceof, to } from '@ora-io/utils'
67
import { debug } from '../debug'
78
import { RekuContractManager } from './contract'
89

@@ -39,13 +40,17 @@ export class RekuProviderManager {
3940
}
4041

4142
connect() {
42-
const url = new URL(this.providerUrl)
43-
if (url.protocol === 'ws:' || url.protocol === 'wss:')
43+
if (this.isWebSocketProviderUrl)
4444
this._provider = new ethers.WebSocketProvider(this.providerUrl)
4545
else
4646
this._provider = new ethers.JsonRpcProvider(this.providerUrl)
4747
}
4848

49+
get isWebSocketProviderUrl() {
50+
const url = new URL(this.providerUrl)
51+
return url.protocol === 'ws:' || url.protocol === 'wss:'
52+
}
53+
4954
get provider() {
5055
return this._provider as ethers.JsonRpcProvider | WebSocketProvider
5156
}
@@ -54,6 +59,16 @@ export class RekuProviderManager {
5459
return this._contracts
5560
}
5661

62+
get websocket() {
63+
if (isInstanceof(this._provider, ethers.WebSocketProvider))
64+
return this._provider.websocket
65+
return undefined
66+
}
67+
68+
get destroyed() {
69+
return this._provider?.destroyed
70+
}
71+
5772
addContract(address: ContractAddress, contract: ethers.Contract): RekuContractManager | undefined
5873
addContract(address: ContractAddress, abi: Interface | InterfaceAbi): RekuContractManager | undefined
5974
addContract(address: ContractAddress, abi: Interface | InterfaceAbi | ethers.Contract): RekuContractManager | undefined {
@@ -156,7 +171,18 @@ export class RekuProviderManager {
156171
socket.onerror = null
157172
debug('remove all listeners of websocket provider')
158173
}
159-
this._provider?.destroy()
174+
debug('reconnect destroyed: %s', this._provider?.destroyed)
175+
if (this._provider && !this._provider.destroyed) {
176+
if (isInstanceof(this._provider, ethers.WebSocketProvider)) {
177+
debug('reconnect websocket readyState: %s', this.websocket?.readyState)
178+
if (this.websocket?.readyState !== WebSocket.CONNECTING)
179+
to(Promise.resolve(this._provider.destroy()))
180+
}
181+
else {
182+
to(Promise.resolve(this._provider.destroy()))
183+
}
184+
}
185+
160186
this._provider = undefined
161187

162188
setTimeout(() => {
@@ -186,21 +212,41 @@ export class RekuProviderManager {
186212
if (this._options?.disabledHeartbeat)
187213
return
188214
debug('start heartbeat')
189-
this._heartbeatTimer = setInterval(() => {
190-
debug('heartbeat running...')
191-
debug('heartbeat has provider: %s', !!this._provider)
192-
this._provider?.send('net_version', [])
193-
.then((res) => {
194-
debug('heartbeat response: %s', res)
195-
})
196-
.catch((err) => {
197-
this.reconnect()
198-
this._event?.emit('error', err)
199-
debug('heartbeat error: %s', err)
200-
})
215+
this._heartbeatTimer = setInterval(async () => {
216+
if (!this.destroyed) {
217+
debug('heartbeat running...')
218+
const hasProvider = this._hasProvider()
219+
debug('heartbeat has provider: %s', hasProvider)
220+
this._provider?.send('net_version', [])
221+
.then((res) => {
222+
debug('heartbeat response: %s', res)
223+
})
224+
.catch((err) => {
225+
this.reconnect()
226+
this._event?.emit('error', err)
227+
debug('heartbeat error: %s', err)
228+
})
229+
.finally(() => {
230+
debug('heartbeat finally')
231+
})
232+
}
233+
else {
234+
debug('heartbeat destroyed')
235+
}
201236
}, this._heartbeatInterval)
202237
}
203238

239+
private _hasProvider() {
240+
const hasProvider = !!this._provider && !!this._provider.provider
241+
let isInstance = false
242+
if (this.isWebSocketProviderUrl)
243+
isInstance = isInstanceof(this._provider, ethers.WebSocketProvider) && isInstanceof(this._provider.provider, ethers.WebSocketProvider)
244+
else
245+
isInstance = isInstanceof(this._provider, ethers.JsonRpcProvider) && isInstanceof(this._provider.provider, ethers.JsonRpcProvider)
246+
247+
return hasProvider && isInstance && !this._provider?.destroyed
248+
}
249+
204250
private _clearHeartbeat() {
205251
if (this._heartbeatTimer) {
206252
debug('clear heartbeat')

0 commit comments

Comments
 (0)