Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 110 additions & 2 deletions src/services/consensus-api/cl.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
import { ConsensusApiService, makeConsensusApi } from './service.js'
import { LoggerService, RequestService, makeRequest } from '../../lib/index.js'
import nock from 'nock'
import {
ConsensusApiService,
makeConsensusApi,
FAR_FUTURE_EPOCH,
} from './service.js'
import {
LoggerService,
RequestService,
makeRequest,
makeLogger,
retry,
logger as loggerMiddleware,
abort,
} from '../../lib/index.js'
import {
exitRequestMock,
genesisMock,
Expand Down Expand Up @@ -79,4 +92,99 @@ describe('makeConsensusApi', () => {
// Since exitRequest doesn't return a value, we simply check that no error is thrown
expect(true).toBe(true)
})

it('should correctly count exiting validators in mixed batch', async () => {
const indices = ['1', '2', '3', '4']
const mockResponse = {
data: [
{ validator: { exit_epoch: '100' } },
{ validator: { exit_epoch: FAR_FUTURE_EPOCH } },
{ validator: { exit_epoch: '200' } },
{ validator: { exit_epoch: FAR_FUTURE_EPOCH } },
],
}
nock(config.CONSENSUS_NODE)
.get('/eth/v1/beacon/states/head/validators?id=1,2,3,4')
.reply(200, mockResponse)

const count = await api.getExitingValidatorsCount(indices, 1000)
expect(count).toBe(2)
})

it('should return 0 when validators not found', async () => {
const indices = ['1', '2', '3', '4']
const mockResponse = {
data: [],
}
nock(config.CONSENSUS_NODE)
.get('/eth/v1/beacon/states/head/validators?id=1,2,3,4')
.reply(200, mockResponse)

const count = await api.getExitingValidatorsCount(indices, 1000)
expect(count).toBe(0)
})

it('should handle batch size correctly with large index array', async () => {
const indices = Array.from({ length: 1500 }, (_, i) => (i + 1).toString())
const mockBatch1 = {
data: Array(1000).fill({ validator: { exit_epoch: '100' } }),
}
const mockBatch2 = {
data: Array(500).fill({ validator: { exit_epoch: FAR_FUTURE_EPOCH } }),
}
nock(config.CONSENSUS_NODE)
.get(
'/eth/v1/beacon/states/head/validators?id=' +
indices.slice(0, 1000).join(',')
)
.reply(200, mockBatch1)
nock(config.CONSENSUS_NODE)
.get(
'/eth/v1/beacon/states/head/validators?id=' +
indices.slice(1000).join(',')
)
.reply(200, mockBatch2)

const count = await api.getExitingValidatorsCount(indices, 1000)
expect(count).toBe(1000)
})

it('should throw an error on server error (500)', async () => {
const indices = ['1']
nock(config.CONSENSUS_NODE)
.get('/eth/v1/beacon/states/head/validators?id=1')
.reply(500, { message: 'Internal Server Error' })

await expect(api.getExitingValidatorsCount(indices, 1000)).rejects.toThrow()
})
})

describe('makeConsensusApi e2e', () => {
let api: ConsensusApiService
let logger: LoggerService
let config: ConfigService

beforeEach(() => {
logger = makeLogger({
level: 'error',
format: 'simple',
})

config = mockConfig(logger, {
CONSENSUS_NODE:
process.env.CONSENSUS_NODE ??
'https://ethereum-beacon-api.publicnode.com',
})
api = makeConsensusApi(
makeRequest([retry(3), loggerMiddleware(logger), abort(30_000)]),
logger,
config
)
})

it('should handle batch size correctly with large index array e2e', async () => {
const indices = Array.from({ length: 3000 }, (_, i) => (i + 1).toString())
const count = await api.getExitingValidatorsCount(indices, 1000, 11724253)
expect(count).toStrictEqual(1226)
})
})
26 changes: 25 additions & 1 deletion src/services/consensus-api/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
depositContractDTO,
} from './dto.js'

const FAR_FUTURE_EPOCH = String(2n ** 64n - 1n)
export const FAR_FUTURE_EPOCH = String(2n ** 64n - 1n)

export type ConsensusApiService = ReturnType<typeof makeConsensusApi>

Expand Down Expand Up @@ -148,6 +148,29 @@ export const makeConsensusApi = (
return (await depositContract()).chain_id
}

const getExitingValidatorsCount = async (
indices: string[],
batchSize = 1000,
state: string | number = 'head'
) => {
let totalCount = 0
for (let i = 0; i < indices.length; i += batchSize) {
const batch = indices.slice(i, i + batchSize)
const url = `${normalizedUrl}/eth/v1/beacon/states/${state}/validators?id=${batch.join(
','
)}`
const res = await request(url, { middlewares: [notOkError()] })
const json = await safelyParseJsonResponse(res, logger)
if (!json.data || !Array.isArray(json.data)) {
throw new Error('Invalid response from consensus node')
}
totalCount += json.data.filter(
(v) => v.validator?.exit_epoch !== FAR_FUTURE_EPOCH
).length
}
return totalCount
}

return {
syncing,
checkSync,
Expand All @@ -159,5 +182,6 @@ export const makeConsensusApi = (
spec,
depositContract,
chainId,
getExitingValidatorsCount,
}
}
1 change: 0 additions & 1 deletion src/services/exit-logs/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export const makeExitLogsService = (
metrics: MetricsService
) => {
const verifier = makeVerifier(logger, el, {
STAKING_MODULE_ID,
ORACLE_ADDRESSES_ALLOWLIST,
})

Expand Down
50 changes: 1 addition & 49 deletions src/services/exit-logs/verifier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { makeLogger } from '../../lib/index.js'

import { ethers } from 'ethers'

import { funcDTO, txDTO } from './dto.js'
import { txDTO } from './dto.js'
import { ExecutionApiService } from '../../services/execution-api/service.js'

// This is the number of blocks to look back when searching for
Expand All @@ -17,10 +17,8 @@ export const makeVerifier = (
logger: ReturnType<typeof makeLogger>,
el: ExecutionApiService,
{
STAKING_MODULE_ID,
ORACLE_ADDRESSES_ALLOWLIST,
}: {
STAKING_MODULE_ID: string
ORACLE_ADDRESSES_ALLOWLIST: string[]
}
) => {
Expand Down Expand Up @@ -205,53 +203,7 @@ export const makeVerifier = (
}
}

const lastRequestedValidatorIndex = async (operatorId: number) => {
const func = ethers.utils.Fragment.from(
'function getLastRequestedValidatorIndices(uint256 moduleId, uint256[] nodeOpIds) view returns (int256[])'
)
const iface = new ethers.utils.Interface([func])
const sig = iface.encodeFunctionData(func.name, [
STAKING_MODULE_ID,
[operatorId],
])

try {
const json = await el.elRequest({
method: 'POST',
body: JSON.stringify({
jsonrpc: '2.0',
method: 'eth_call',
params: [
{
from: null,
to: el.exitBusAddress,
data: sig,
},
'finalized',
],
id: 1,
}),
})

const { result } = funcDTO(json)

// One last index or -1 if no exit requests have been sent yet, in BigNumber
const decoded = iface.decodeFunctionResult(func.name, result)

logger.debug('Fetched last requested validator exit for NO')

const plainNumber = parseInt(decoded.toString())

return plainNumber
} catch (e) {
const msg = 'Unable to retrieve last requested validator exit for NO'
logger.error(msg, e)
throw new Error(msg)
}
}

return {
verifyEvent,
lastRequestedValidatorIndex,
}
}
28 changes: 16 additions & 12 deletions src/services/job-processor/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,23 @@ export const makeJobProcessor = ({
logger.error(`Unable to process exit for ${event.validatorPubkey}`, e)
metrics.exitActions.inc({ result: 'error' })
}
}

logger.info('Updating exit messages left metrics from contract state')
try {
const lastRequestedValIx =
await exitLogs.verifier.lastRequestedValidatorIndex(
event.nodeOperatorId
)
metrics.updateLeftMessages(messageStorage, lastRequestedValIx)
} catch {
logger.error(
'Unable to update exit messages left metrics from contract state'
)
}
logger.info('Updating exit messages left metrics from validator statuses')

try {
const validatorIndices = messageStorage.messages.map(
(msg) => msg.message.validator_index
)
const exitingCount = await consensusApi.getExitingValidatorsCount(
validatorIndices
)
metrics.updateLeftMessages(messageStorage, exitingCount)
} catch (e) {
logger.error(
'Unable to update exit messages left metrics from validator statuses',
e
)
}

logger.info('Job finished')
Expand Down
6 changes: 2 additions & 4 deletions src/services/prom/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,9 @@ export const makeMetrics = ({

const updateLeftMessages = (
messageStorage: MessageStorage,
lastRequestedIx: number
exitingCount: number
) => {
const numberLeft = messageStorage.messages.filter(
(msg) => parseInt(msg.message.validator_index) > lastRequestedIx
).length
const numberLeft = messageStorage.size - exitingCount
exitMessagesLeftNumber.set(numberLeft)

const percentLeft =
Expand Down
Loading