Skip to content

Horizon: allocation management #1123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 59 additions & 38 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
DeploymentManagementMode,
SubgraphStatus,
sequentialTimerMap,
HorizonTransitionValue,
} from '@graphprotocol/indexer-common'

import PQueue from 'p-queue'
Expand All @@ -47,7 +48,11 @@ import mapValues from 'lodash.mapvalues'
import zip from 'lodash.zip'
import { AgentConfigs, NetworkAndOperator } from './types'

type ActionReconciliationContext = [AllocationDecision[], number, bigint]
type ActionReconciliationContext = [
AllocationDecision[],
number,
HorizonTransitionValue,
]

const deploymentInList = (
list: SubgraphDeploymentID[],
Expand Down Expand Up @@ -271,21 +276,22 @@ export class Agent {
},
)

const maxAllocationEpochs: Eventual<NetworkMapped<bigint>> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching max allocation epochs', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.contracts.LegacyStaking.maxAllocationEpochs()
}),
{
onError: error =>
logger.warn(`Failed to fetch max allocation epochs`, { error }),
},
)
const maxAllocationDuration: Eventual<
NetworkMapped<HorizonTransitionValue>
> = sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching max allocation duration', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.maxAllocationDuration()
}),
{
onError: error =>
logger.warn(`Failed to fetch max allocation duration`, { error }),
},
)

const indexingRules: Eventual<NetworkMapped<IndexingRuleAttributes[]>> =
sequentialTimerMap(
Expand Down Expand Up @@ -653,7 +659,7 @@ export class Agent {
join({
ticker: timer(requestIntervalLarge),
currentEpochNumber,
maxAllocationEpochs,
maxAllocationDuration,
activeDeployments,
targetDeployments,
activeAllocations,
Expand All @@ -663,7 +669,7 @@ export class Agent {
}).pipe(
async ({
currentEpochNumber,
maxAllocationEpochs,
maxAllocationDuration,
activeDeployments,
targetDeployments,
activeAllocations,
Expand Down Expand Up @@ -746,7 +752,7 @@ export class Agent {
await this.reconcileActions(
networkDeploymentAllocationDecisions,
currentEpochNumber,
maxAllocationEpochs,
maxAllocationDuration,
)
} catch (err) {
logger.warn(`Exited early while reconciling actions`, {
Expand Down Expand Up @@ -1008,18 +1014,25 @@ export class Agent {
activeAllocations: Allocation[],
deploymentAllocationDecision: AllocationDecision,
epoch: number,
maxAllocationEpochs: bigint,
maxAllocationDuration: HorizonTransitionValue,
network: Network,
): Promise<Allocation[]> {
const desiredAllocationLifetime = deploymentAllocationDecision.ruleMatch
.rule?.allocationLifetime
? deploymentAllocationDecision.ruleMatch.rule.allocationLifetime
: Math.max(1, Number(maxAllocationEpochs) - 1)

// Identify expiring allocations
let expiredAllocations = activeAllocations.filter(
allocation =>
epoch >= allocation.createdAtEpoch + desiredAllocationLifetime,
async (allocation: Allocation) => {
let desiredAllocationLifetime: number = 0
if (allocation.isLegacy) {
desiredAllocationLifetime = deploymentAllocationDecision.ruleMatch
.rule?.allocationLifetime
? deploymentAllocationDecision.ruleMatch.rule.allocationLifetime
: Math.max(1, maxAllocationDuration.legacy - 1)
} else {
desiredAllocationLifetime = deploymentAllocationDecision.ruleMatch
.rule?.allocationLifetime
? deploymentAllocationDecision.ruleMatch.rule.allocationLifetime
: maxAllocationDuration.horizon
}
return epoch >= allocation.createdAtEpoch + desiredAllocationLifetime
},
)
// The allocations come from the network subgraph; due to short indexing
// latencies, this data may be slightly outdated. Cross-check with the
Expand All @@ -1029,9 +1042,17 @@ export class Agent {
expiredAllocations,
async (allocation: Allocation) => {
try {
const onChainAllocation =
await network.contracts.LegacyStaking.getAllocation(allocation.id)
return onChainAllocation.closedAtEpoch == 0n
if (allocation.isLegacy) {
const onChainAllocation =
await network.contracts.LegacyStaking.getAllocation(allocation.id)
return onChainAllocation.closedAtEpoch == 0n
} else {
const onChainAllocation =
await network.contracts.SubgraphService.getAllocation(
allocation.id,
)
return onChainAllocation.closedAt == 0n
}
} catch (err) {
this.logger.warn(
`Failed to cross-check allocation state with contracts; assuming it needs to be closed`,
Expand All @@ -1052,7 +1073,7 @@ export class Agent {
deploymentAllocationDecision: AllocationDecision,
activeAllocations: Allocation[],
epoch: number,
maxAllocationEpochs: bigint,
maxAllocationDuration: HorizonTransitionValue,
network: Network,
operator: Operator,
): Promise<void> {
Expand Down Expand Up @@ -1128,7 +1149,7 @@ export class Agent {
activeDeploymentAllocations,
deploymentAllocationDecision,
epoch,
maxAllocationEpochs,
maxAllocationDuration,
network,
)
if (expiringAllocations.length > 0) {
Expand All @@ -1147,7 +1168,7 @@ export class Agent {
async reconcileActions(
networkDeploymentAllocationDecisions: NetworkMapped<AllocationDecision[]>,
epoch: NetworkMapped<number>,
maxAllocationEpochs: NetworkMapped<bigint>,
maxAllocationDuration: NetworkMapped<HorizonTransitionValue>,
): Promise<void> {
// --------------------------------------------------------------------------------
// Filter out networks set to `manual` allocation management mode, and ensure the
Expand Down Expand Up @@ -1200,14 +1221,14 @@ export class Agent {
this.multiNetworks.zip3(
validatedAllocationDecisions,
epoch,
maxAllocationEpochs,
maxAllocationDuration,
),
async (
{ network, operator }: NetworkAndOperator,
[
allocationDecisions,
epoch,
maxAllocationEpochs,
maxAllocationDuration,
]: ActionReconciliationContext,
) => {
// Do nothing if there are already approved actions in the queue awaiting execution
Expand All @@ -1232,7 +1253,7 @@ export class Agent {
this.logger.trace(`Reconcile allocation actions`, {
protocolNetwork: network.specification.networkIdentifier,
epoch,
maxAllocationEpochs,
maxAllocationDuration,
targetDeployments: allocationDecisions
.filter(decision => decision.toAllocate)
.map(decision => decision.deployment.ipfsHash),
Expand All @@ -1248,7 +1269,7 @@ export class Agent {
decision,
activeAllocations,
epoch,
maxAllocationEpochs,
maxAllocationDuration,
network,
operator,
),
Expand Down
3 changes: 2 additions & 1 deletion packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ export const start = {
group: 'Ethereum',
})
.option('confirmation-blocks', {
description: 'The number of blocks to wait for a transaction to be confirmed',
description:
'The number of blocks to wait for a transaction to be confirmed',
type: 'number',
default: 3,
group: 'Ethereum',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ Manage indexer configuration
indexer rules delete Remove one or many indexing rules
indexer rules clear (reset) Clear one or more indexing rules
indexer rules Configure indexing rules
indexer provision thaw Thaw stake from the indexer's provision
indexer provision remove Remove thawed stake from the indexer's provision
indexer provision list-thaw List thaw requests for the indexer's provision
indexer provision get List indexer provision details
indexer provision add Add stake to the indexer's provision
indexer provision Manage indexer's provision
indexer disputes get Cross-check POIs submitted in the network
indexer disputes Configure allocation POI monitoring
indexer cost set model Update a cost model
Expand Down
6 changes: 6 additions & 0 deletions packages/indexer-cli/src/__tests__/references/indexer.stdout
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ Manage indexer configuration
indexer rules delete Remove one or many indexing rules
indexer rules clear (reset) Clear one or more indexing rules
indexer rules Configure indexing rules
indexer provision thaw Thaw stake from the indexer's provision
indexer provision remove Remove thawed stake from the indexer's provision
indexer provision list-thaw List thaw requests for the indexer's provision
indexer provision get List indexer provision details
indexer provision add Add stake to the indexer's provision
indexer provision Manage indexer's provision
indexer disputes get Cross-check POIs submitted in the network
indexer disputes Configure allocation POI monitoring
indexer cost set model Update a cost model
Expand Down
21 changes: 19 additions & 2 deletions packages/indexer-cli/src/allocations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export interface IndexerAllocation {
queryFeesCollected: bigint
status: string
protocolNetwork: string
isLegacy: boolean
}

const ALLOCATION_CONVERTERS_FROM_GRAPHQL: Record<
Expand All @@ -53,6 +54,7 @@ const ALLOCATION_CONVERTERS_FROM_GRAPHQL: Record<
queryFeesCollected: nullPassThrough((x: string) => BigInt(x)),
status: x => x,
protocolNetwork: x => x,
isLegacy: x => x,
}

const ALLOCATION_FORMATTERS: Record<
Expand All @@ -76,6 +78,7 @@ const ALLOCATION_FORMATTERS: Record<
queryFeesCollected: x => commify(formatGRT(x)),
status: x => x,
protocolNetwork: resolveChainAlias,
isLegacy: x => (x ? 'Yes' : 'No'),
}

/**
Expand Down Expand Up @@ -218,6 +221,8 @@ export const closeAllocation = async (
client: IndexerManagementClient,
allocationID: string,
poi: string | undefined,
blockNumber: number | undefined,
publicPOI: string | undefined,
force: boolean,
protocolNetwork: string,
): Promise<CloseAllocationResult> => {
Expand All @@ -227,26 +232,31 @@ export const closeAllocation = async (
mutation closeAllocation(
$allocation: String!
$poi: String
$blockNumber: Int
$publicPOI: String
$force: Boolean
$protocolNetwork: String!
) {
closeAllocation(
allocation: $allocation
poi: $poi
blockNumber: $blockNumber
publicPOI: $publicPOI
force: $force
protocolNetwork: $protocolNetwork
) {
allocation
allocatedTokens
indexingRewards
receiptsWorthCollecting
protocolNetwork
}
}
`,
{
allocation: allocationID,
poi,
blockNumber,
publicPOI,
force,
protocolNetwork,
},
Expand All @@ -264,6 +274,8 @@ export const reallocateAllocation = async (
client: IndexerManagementClient,
allocationID: string,
poi: string | undefined,
blockNumber: number | undefined,
publicPOI: string | undefined,
amount: bigint,
force: boolean,
protocolNetwork: string,
Expand All @@ -274,20 +286,23 @@ export const reallocateAllocation = async (
mutation reallocateAllocation(
$allocation: String!
$poi: String
$blockNumber: Int
$publicPOI: String
$amount: String!
$force: Boolean
$protocolNetwork: String!
) {
reallocateAllocation(
allocation: $allocation
poi: $poi
blockNumber: $blockNumber
publicPOI: $publicPOI
amount: $amount
force: $force
protocolNetwork: $protocolNetwork
) {
closedAllocation
indexingRewardsCollected
receiptsWorthCollecting
createdAllocation
createdAllocationStake
protocolNetwork
Expand All @@ -297,6 +312,8 @@ export const reallocateAllocation = async (
{
allocation: allocationID,
poi,
blockNumber,
publicPOI,
amount: amount.toString(),
force,
protocolNetwork,
Expand Down
Loading
Loading