Skip to content

Commit

Permalink
replicate fix of retry logic on univ3 and panv3
Browse files Browse the repository at this point in the history
  • Loading branch information
mwamedacen committed Jul 19, 2023
1 parent cd45d44 commit f4ecb8c
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 125 deletions.
134 changes: 71 additions & 63 deletions src/dex/pancakeswap-v3/pancakeswap-v3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,23 +157,30 @@ export class PancakeswapV3
fee: bigint,
blockNumber: number,
): Promise<PancakeSwapV3EventPool | null> {
let pool =
this.eventPools[this.getPoolIdentifier(srcAddress, destAddress, fee)];

if (
pool &&
pool.initFailed &&
pool.initRetryCount % this.config.initRetryFrequency === 0
) {
// if init failed then prefer to early return pool with empty state to fallback to rpc call
return pool;
let pool = this.eventPools[
this.getPoolIdentifier(srcAddress, destAddress, fee)
] as PancakeSwapV3EventPool | null | undefined;

if (pool === null) return null;

if (pool) {
if (!pool.initFailed) {
return pool;
} else {
// if init failed then prefer to early return pool with empty state to fallback to rpc call
if (++pool.initRetryCount % this.config.initRetryFrequency === 0) {
return pool;
}
// else pursue with re-try initialization
}
}

if (pool === undefined) {
const [token0, token1] = this._sortTokens(srcAddress, destAddress);
const [token0, token1] = this._sortTokens(srcAddress, destAddress);

const key = `${token0}_${token1}_${fee}`.toLowerCase();
const key = `${token0}_${token1}_${fee}`.toLowerCase();

// no need to run this logic on retry initialisation scenario
if (!pool) {
const notExistingPoolScore = await this.dexHelper.cache.zscore(
this.notExistingPoolSetKey,
key,
Expand All @@ -196,9 +203,12 @@ export class PancakeswapV3
fee: fee.toString(),
}),
);
}

this.logger.trace(`starting to listen to new pool: ${key}`);
pool = new PancakeSwapV3EventPool(
this.logger.trace(`starting to listen to new pool: ${key}`);
pool =
pool ||
new PancakeSwapV3EventPool(
this.dexHelper,
this.dexKey,
this.stateMultiContract,
Expand All @@ -213,59 +223,57 @@ export class PancakeswapV3
this.config.deployer,
);

try {
await pool.initialize(blockNumber, {
initCallback: (state: DeepReadonly<PoolState>) => {
//really hacky, we need to push poolAddress so that we subscribeToLogs in StatefulEventSubscriber
pool!.addressesSubscribed[0] = state.pool;
pool!.poolAddress = state.pool;
pool!.initFailed = false;
pool!.initRetryCount = 0;
},
});
} catch (e) {
if (e instanceof Error && e.message.endsWith('Pool does not exist')) {
// no need to await we want the set to have the pool key but it's not blocking
this.dexHelper.cache.zadd(
this.notExistingPoolSetKey,
[Date.now(), key],
'NX',
);

// Pool does not exist for this feeCode, so we can set it to null
// to prevent more requests for this pool
pool = null;
this.logger.trace(
`${this.dexHelper}: Pool: srcAddress=${srcAddress}, destAddress=${destAddress}, fee=${fee} not found`,
e,
);
} else {
// on unkown error mark as failed and increase retryCount for retry init strategy
// note: state would be null by default which allows to fallback
this.logger.warn(
`${this.dexKey}: Can not generate pool state for srcAddress=${srcAddress}, destAddress=${destAddress}pool fallback to rpc and retry every ${this.config.initRetryFrequency} times, initRetryCount=${pool.initRetryCount}`,
e,
);
pool.initFailed = true;
pool.initRetryCount++;
}
}
try {
await pool.initialize(blockNumber, {
initCallback: (state: DeepReadonly<PoolState>) => {
//really hacky, we need to push poolAddress so that we subscribeToLogs in StatefulEventSubscriber
pool!.addressesSubscribed[0] = state.pool;
pool!.poolAddress = state.pool;
pool!.initFailed = false;
pool!.initRetryCount = 0;
},
});
} catch (e) {
if (e instanceof Error && e.message.endsWith('Pool does not exist')) {
// no need to await we want the set to have the pool key but it's not blocking
this.dexHelper.cache.zadd(
this.notExistingPoolSetKey,
[Date.now(), key],
'NX',
);

if (pool !== null) {
const allEventPools = Object.values(this.eventPools);
this.logger.info(
`starting to listen to new non-null pool: ${key}. Already following ${allEventPools
// Not that I like this reduce, but since it is done only on initialization, expect this to be ok
.reduce(
(acc, curr) => (curr !== null ? ++acc : acc),
0,
)} non-null pools or ${allEventPools.length} total pools`,
// Pool does not exist for this feeCode, so we can set it to null
// to prevent more requests for this pool
pool = null;
this.logger.trace(
`${this.dexHelper}: Pool: srcAddress=${srcAddress}, destAddress=${destAddress}, fee=${fee} not found`,
e,
);
} else {
// on unkown error mark as failed and increase retryCount for retry init strategy
// note: state would be null by default which allows to fallback
this.logger.warn(
`${this.dexKey}: Can not generate pool state for srcAddress=${srcAddress}, destAddress=${destAddress}, fee=${fee} pool fallback to rpc and retry every ${this.config.initRetryFrequency} times, initRetryCount=${pool.initRetryCount}`,
e,
);
pool.initFailed = true;
}
}

this.eventPools[this.getPoolIdentifier(srcAddress, destAddress, fee)] =
pool;
if (pool !== null) {
const allEventPools = Object.values(this.eventPools);
this.logger.info(
`starting to listen to new non-null pool: ${key}. Already following ${allEventPools
// Not that I like this reduce, but since it is done only on initialization, expect this to be ok
.reduce(
(acc, curr) => (curr !== null ? ++acc : acc),
0,
)} non-null pools or ${allEventPools.length} total pools`,
);
}

this.eventPools[this.getPoolIdentifier(srcAddress, destAddress, fee)] =
pool;
return pool;
}

Expand Down
131 changes: 69 additions & 62 deletions src/dex/uniswap-v3/uniswap-v3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,23 +182,29 @@ export class UniswapV3
fee: bigint,
blockNumber: number,
): Promise<UniswapV3EventPool | null> {
let pool =
this.eventPools[this.getPoolIdentifier(srcAddress, destAddress, fee)];
let pool = this.eventPools[
this.getPoolIdentifier(srcAddress, destAddress, fee)
] as UniswapV3EventPool | null | undefined;

if (
pool &&
pool.initFailed &&
pool.initRetryCount % this.config.initRetryFrequency === 0
) {
// if init failed then prefer to early return pool with empty state to fallback to rpc call
return pool;
if (pool === null) return null;

if (pool) {
if (!pool.initFailed) {
return pool;
} else {
// if init failed then prefer to early return pool with empty state to fallback to rpc call
if (++pool.initRetryCount % this.config.initRetryFrequency === 0) {
return pool;
}
// else pursue with re-try initialization
}
}

if (pool === undefined) {
const [token0, token1] = this._sortTokens(srcAddress, destAddress);
const [token0, token1] = this._sortTokens(srcAddress, destAddress);

const key = `${token0}_${token1}_${fee}`.toLowerCase();
const key = `${token0}_${token1}_${fee}`.toLowerCase();

if (!pool) {
const notExistingPoolScore = await this.dexHelper.cache.zscore(
this.notExistingPoolSetKey,
key,
Expand All @@ -221,9 +227,12 @@ export class UniswapV3
fee: fee.toString(),
}),
);
}

this.logger.trace(`starting to listen to new pool: ${key}`);
pool = new UniswapV3EventPool(
this.logger.trace(`starting to listen to new pool: ${key}`);
pool =
pool ||
new UniswapV3EventPool(
this.dexHelper,
this.dexKey,
this.stateMultiContract,
Expand All @@ -237,59 +246,57 @@ export class UniswapV3
this.config.initHash,
);

try {
await pool.initialize(blockNumber, {
initCallback: (state: DeepReadonly<PoolState>) => {
//really hacky, we need to push poolAddress so that we subscribeToLogs in StatefulEventSubscriber
pool!.addressesSubscribed[0] = state.pool;
pool!.poolAddress = state.pool;
pool!.initFailed = false;
pool!.initRetryCount = 0;
},
});
} catch (e) {
if (e instanceof Error && e.message.endsWith('Pool does not exist')) {
// no need to await we want the set to have the pool key but it's not blocking
this.dexHelper.cache.zadd(
this.notExistingPoolSetKey,
[Date.now(), key],
'NX',
);

// Pool does not exist for this feeCode, so we can set it to null
// to prevent more requests for this pool
pool = null;
this.logger.trace(
`${this.dexHelper}: Pool: srcAddress=${srcAddress}, destAddress=${destAddress}, fee=${fee} not found`,
e,
);
} else {
// on unkown error mark as failed and increase retryCount for retry init strategy
// note: state would be null by default which allows to fallback
this.logger.warn(
`${this.dexKey}: Can not generate pool state for srcAddress=${srcAddress}, destAddress=${destAddress}pool fallback to rpc and retry every ${this.config.initRetryFrequency} times, initRetryCount=${pool.initRetryCount}`,
e,
);
pool.initFailed = true;
pool.initRetryCount++;
}
}
try {
await pool.initialize(blockNumber, {
initCallback: (state: DeepReadonly<PoolState>) => {
//really hacky, we need to push poolAddress so that we subscribeToLogs in StatefulEventSubscriber
pool!.addressesSubscribed[0] = state.pool;
pool!.poolAddress = state.pool;
pool!.initFailed = false;
pool!.initRetryCount = 0;
},
});
} catch (e) {
if (e instanceof Error && e.message.endsWith('Pool does not exist')) {
// no need to await we want the set to have the pool key but it's not blocking
this.dexHelper.cache.zadd(
this.notExistingPoolSetKey,
[Date.now(), key],
'NX',
);

if (pool !== null) {
const allEventPools = Object.values(this.eventPools);
this.logger.info(
`starting to listen to new non-null pool: ${key}. Already following ${allEventPools
// Not that I like this reduce, but since it is done only on initialization, expect this to be ok
.reduce(
(acc, curr) => (curr !== null ? ++acc : acc),
0,
)} non-null pools or ${allEventPools.length} total pools`,
// Pool does not exist for this feeCode, so we can set it to null
// to prevent more requests for this pool
pool = null;
this.logger.trace(
`${this.dexHelper}: Pool: srcAddress=${srcAddress}, destAddress=${destAddress}, fee=${fee} not found`,
e,
);
} else {
// on unkown error mark as failed and increase retryCount for retry init strategy
// note: state would be null by default which allows to fallback
this.logger.warn(
`${this.dexKey}: Can not generate pool state for srcAddress=${srcAddress}, destAddress=${destAddress}, fee=${fee} pool fallback to rpc and retry every ${this.config.initRetryFrequency} times, initRetryCount=${pool.initRetryCount}`,
e,
);
pool.initFailed = true;
}
}

this.eventPools[this.getPoolIdentifier(srcAddress, destAddress, fee)] =
pool;
if (pool !== null) {
const allEventPools = Object.values(this.eventPools);
this.logger.info(
`starting to listen to new non-null pool: ${key}. Already following ${allEventPools
// Not that I like this reduce, but since it is done only on initialization, expect this to be ok
.reduce(
(acc, curr) => (curr !== null ? ++acc : acc),
0,
)} non-null pools or ${allEventPools.length} total pools`,
);
}

this.eventPools[this.getPoolIdentifier(srcAddress, destAddress, fee)] =
pool;
return pool;
}

Expand Down

0 comments on commit f4ecb8c

Please sign in to comment.