-
Notifications
You must be signed in to change notification settings - Fork 3
fix: timeout prisma, mempool race conditions & freezing of tx processing upon erroring #1070
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
WalkthroughReduced retry counts and batching sizes; added queue obliteration and auto-remove job flags; introduced generic retry helpers and Chronik tx retry/cleanup; made price fetching and transaction-price linking network-aware with batched upserts; tightened mempool/error handling; updated tests and logging. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Init as initJobs()
participant Queue as BullMQ
participant Worker as sync worker
Init->>Queue: create & obliterate queues (`pricesSync`, `blockchainSync`, `clientPaymentCleanup`)
Init->>Queue: add jobs with removeOnComplete/removeOnFail
Queue->>Worker: dispatch job
alt success
Worker->>Queue: complete (auto-removed)
else fail
Worker->>Worker: log error
Worker->>Worker: await multiBlockchainClient.destroy()
Worker->>Queue: fail (auto-removed)
end
sequenceDiagram
autonumber
participant Worker as sync worker
participant Chronik as ChronikClient
participant Retry as fetchTxWithRetry()
participant DB as DB/Commit
participant Logger as Logger
Worker->>Chronik: WS message requests tx
Chronik->>Retry: fetchTxWithRetry(txid)
alt transient 404 / error
Retry->>Retry: backoff & retry
end
Retry-->>Worker: tx or throw
alt tx obtained
Worker->>DB: enqueue for batched commit (non-FINAL / FINAL)
DB->>Logger: emit commit and triggers
else thrown
Worker->>Logger: log error
Worker->>Worker: ensure mempool counter decremented (finally)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Potential attention points:
Possibly related PRs
Suggested reviewers
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧬 Code graph analysis (1)tests/unittests/chronikService.test.ts (4)
🪛 ESLinttests/unittests/chronikService.test.ts[error] 1323-1324: More than 1 blank line not allowed. (no-multiple-empty-lines) [error] 1332-1332: Extra space before value for key 'confirmed'. (key-spacing) [error] 1387-1387: Require statement not part of import statement. (@typescript-eslint/no-var-requires) [error] 1429-1429: Require statement not part of import statement. (@typescript-eslint/no-var-requires) [error] 1430-1430: Require statement not part of import statement. (@typescript-eslint/no-var-requires) [error] 1481-1481: Expected a line break after this opening brace. (object-curly-newline) [error] 1485-1485: Expected a line break before this closing brace. (object-curly-newline) [error] 1485-1485: A space is required before '}'. (object-curly-spacing) ⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
🔇 Additional comments (7)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
prisma-local/seeds/prices.ts (1)
96-103: Timezone mismatch: store daily CSV timestamps in UTC to match flattenTimestamp.Prices are later fetched by UTC‑flattened timestamp. Using moment(price.date).unix() stores local midnight, which can be off by a day and break joins.
Apply this diff:
- timestamp: moment(price.date).unix(), + // Store price-day at UTC midnight to align with flattenTimestamp() + timestamp: moment.utc(price.date, PRICE_API_DATE_FORMAT).unix(),services/priceService.ts (1)
81-85: Unify day handling in UTC to prevent off‑by‑one price lookups and mislinked rows.Local time is used in multiple places while flattenTimestamp is UTC. This can fetch the wrong API day and store prices under the wrong timestamp.
Apply these diffs:
--- a/services/priceService.ts +++ b/services/priceService.ts @@ function getPriceURLForDayAndNetworkTicker (day: moment.Moment, networkTicker: string): string { validatePriceAPIUrlAndToken() validateNetworkTicker(networkTicker) - return `${config.priceAPIURL}/pricebydate/${process.env.PRICE_API_TOKEN!}/${networkTicker}+${day.format(PRICE_API_DATE_FORMAT)}` + // Always use UTC day when addressing the Price API + const dayUTC = day.clone().utc() + return `${config.priceAPIURL}/pricebydate/${process.env.PRICE_API_TOKEN!}/${networkTicker}+${dayUTC.format(PRICE_API_DATE_FORMAT)}` } @@ - await Promise.all( + await Promise.all( allXECPrices .filter(p => daysToRetrieve.includes(p.day)) - .map(async price => await upsertPricesForNetworkId(price, XEC_NETWORK_ID, moment(price.day).unix())) + // Persist daily price at UTC midnight + .map(async price => await upsertPricesForNetworkId(price, XEC_NETWORK_ID, moment.utc(price.day, PRICE_API_DATE_FORMAT).unix())) ) @@ - await Promise.all( + await Promise.all( allBCHPrices .filter(p => daysToRetrieve.includes(p.day)) - .map(async price => await upsertPricesForNetworkId(price, BCH_NETWORK_ID, moment(price.day).unix())) + // Persist daily price at UTC midnight + .map(async price => await upsertPricesForNetworkId(price, BCH_NETWORK_ID, moment.utc(price.day, PRICE_API_DATE_FORMAT).unix())) ) @@ async function renewPricesForTimestamp (timestamp: number): Promise<boolean> { try { - const xecPrice = await getPriceForDayAndNetworkTicker(moment(timestamp * 1000), NETWORK_TICKERS.ecash) + const xecPrice = await getPriceForDayAndNetworkTicker(moment.unix(timestamp).utc(), NETWORK_TICKERS.ecash) await upsertPricesForNetworkId(xecPrice, XEC_NETWORK_ID, timestamp) - const bchPrice = await getPriceForDayAndNetworkTicker(moment(timestamp * 1000), NETWORK_TICKERS.bitcoincash) + const bchPrice = await getPriceForDayAndNetworkTicker(moment.unix(timestamp).utc(), NETWORK_TICKERS.bitcoincash) await upsertPricesForNetworkId(bchPrice, BCH_NETWORK_ID, timestamp)Also applies to: 268-275, 173-191
🧹 Nitpick comments (8)
services/chronikService.ts (1)
817-883: Consider extracting duplicated commit logic.The commit and trigger logic is duplicated between batch processing (lines 817-843) and final flush (lines 857-883). Consider extracting to a helper method to improve maintainability:
private async commitBatchAndTrigger( commitPairs: RowWithRaw[], productionAddressesIds: string[], runTriggers: boolean, isFinal: boolean ): Promise<TransactionWithAddressAndPrices[]> { const rows = commitPairs.map(p => p.row) const createdTxs = await createManyTransactions(rows) console.log(`${this.CHRONIK_MSG_PREFIX} committed${isFinal ? ' FINAL' : ''} — created=${createdTxs.length}`) const createdForProd = createdTxs.filter(t => productionAddressesIds.includes(t.addressId)) if (createdForProd.length > 0) { await appendTxsToFile(createdForProd as unknown as Prisma.TransactionCreateManyInput[]) } if (createdTxs.length > 0) { const rawByHash = new Map(commitPairs.map(p => [p.raw.txid, p.raw])) const triggerBatch: BroadcastTxData[] = [] for (const createdTx of createdTxs) { const raw = rawByHash.get(createdTx.hash) if (raw == null) continue const bd = this.broadcastIncomingTx(createdTx.address.address, raw, createdTx) triggerBatch.push(bd) } if (runTriggers && triggerBatch.length > 0) { await executeTriggersBatch(triggerBatch, this.networkId) } } return createdTxs }constants/index.ts (1)
273-276: Extract chronik retry parameters as constants in constants/index.ts for consistency.The retry logic exists at
services/chronikService.ts:594with hardcoded defaults (tries = 3, delayMs = 400), but these values are not extracted as constants. Since the PR adds related constants (MEMPOOL_PROCESS_DELAY,MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME), consider addingCHRONIK_TX_RETRY_ATTEMPTSandCHRONIK_TX_RETRY_DELAY_MSfor consistency and maintainability.services/priceService.ts (3)
147-153: Harden mapping to exclude non‑day entries.If the API includes extra keys (e.g., success), Object.entries will incorrectly map them. Filter by objects containing both price fields.
- return Object.entries<IResponseData>(res.data).map(([day, priceData]) => ({ day, ...priceData })) + const entries = Object.entries(res.data) + .filter(([, v]) => v && typeof v === 'object' && 'Price_in_CAD' in v && 'Price_in_USD' in v) + return entries.map(([day, priceData]) => ({ day, ...(priceData as IResponseData) }))
114-120: Improve retry context for logs.Add networkTicker/day to retry context for easier tracing.
export async function getPriceForDayAndNetworkTicker (day: moment.Moment, networkTicker: string): Promise<IResponseData> { return await withRetries(async () => { @@ - }, { context: { day } }) + }, { context: { networkTicker, day: day.clone().utc().format(PRICE_API_DATE_FORMAT) } }) }
93-112: Optional: add backoff to withRetries and guard zero retries.Small resilience boost and safer default.
-async function withRetries<T> ( - fn: () => Promise<T>, - { maxRetries = PRICE_API_MAX_RETRIES, throwOnFailure = true, context = {} } = {} -): Promise<T | null> { +async function withRetries<T> ( + fn: () => Promise<T>, + { maxRetries = PRICE_API_MAX_RETRIES, throwOnFailure = true, context = {}, delayMs = 0 } = {} +): Promise<T | null> { let lastError: any - for (let attempt = 1; attempt <= maxRetries; attempt++) { + const retries = Math.max(1, maxRetries) + for (let attempt = 1; attempt <= retries; attempt++) { try { return await fn() } catch (error) { lastError = error console.error(`[Retry ${attempt}/${maxRetries}] ${String(error)}`, { ...context }) - if (attempt < maxRetries) continue + if (attempt < retries) { + if (delayMs > 0) await new Promise(r => setTimeout(r, delayMs)) + continue + } if (throwOnFailure) throw lastError console.error(`[Retry ${attempt}/${maxRetries}] skipping error:`) console.error(String(lastError)) return null } } return null }services/transactionService.ts (3)
641-654: Generalize “irregular prices” to any count != N_OF_QUOTES.Current equals:1 misses duplicates or future quote expansions.
export async function fetchAllTransactionsWithIrregularPrices (): Promise<TransactionWithNetwork[]> { - const grouped = await prisma.pricesOnTransactions.groupBy({ + const grouped = await prisma.pricesOnTransactions.groupBy({ by: ['transactionId'], _count: { transactionId: true }, - having: { transactionId: { _count: { equals: 1 } } } + having: { transactionId: { _count: { not: N_OF_QUOTES } } } })
398-399: Type nit: accept both Prisma.TransactionClient and PrismaClient.You pass prisma (client) here; widen the fetchPricesForNetworkAndTimestamp param type for clarity.
-export async function fetchPricesForNetworkAndTimestamp ( +export async function fetchPricesForNetworkAndTimestamp ( networkId: number, timestamp: number, - prismaTx: Prisma.TransactionClient, + prismaTx: Prisma.TransactionClient | typeof prisma, tryRenewing = true ): Promise<AllPrices> {
520-531: Operational note: transaction can grow large; consider chunking deleteMany.Deleting for thousands of txs at once may exceed statement timeouts on some DBs. Optional: split ids in chunks similar to inserts.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
prisma-local/seeds/prices.csvis excluded by!**/*.csvprisma-local/seeds/productionTxs.csvis excluded by!**/*.csv
📒 Files selected for processing (9)
constants/index.ts(2 hunks)jobs/initJobs.ts(2 hunks)jobs/workers.ts(1 hunks)prisma-local/seeds/prices.ts(1 hunks)scripts/updateAllPriceConnections.ts(1 hunks)services/chronikService.ts(12 hunks)services/priceService.ts(6 hunks)services/transactionService.ts(10 hunks)services/triggerService.ts(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (6)
jobs/workers.ts (1)
services/chronikService.ts (1)
multiBlockchainClient(1164-1164)
jobs/initJobs.ts (2)
redis/clientInstance.ts (1)
redisBullMQ(54-54)jobs/workers.ts (1)
syncBlockchainAndPricesWorker(34-66)
prisma-local/seeds/prices.ts (1)
services/priceService.ts (1)
getAllPricesByNetworkTicker(143-154)
services/transactionService.ts (2)
services/priceService.ts (3)
fetchPricesForNetworkAndTimestamp(241-266)flattenTimestamp(9-13)AllPrices(21-24)constants/index.ts (6)
PRICES_CONNECTION_BATCH_SIZE(291-291)CAD_QUOTE_ID(152-152)USD_QUOTE_ID(151-151)HUMAN_READABLE_DATE_FORMAT(165-165)PRICES_CONNECTION_TIMEOUT(293-293)UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT(200-200)
services/chronikService.ts (5)
constants/index.ts (6)
CHRONIK_MESSAGE_CACHE_DELAY(229-229)INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY(282-282)TX_EMIT_BATCH_SIZE(283-283)MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME(273-273)MEMPOOL_PROCESS_DELAY(275-275)DB_COMMIT_BATCH_SIZE(284-284)services/transactionService.ts (2)
upsertTransaction(374-406)createManyTransactions(554-607)services/triggerService.ts (2)
executeAddressTriggers(234-243)executeTriggersBatch(501-606)prisma-local/seeds/transactions.ts (1)
appendTxsToFile(12-20)ws-service/types.ts (1)
BroadcastTxData(5-9)
services/priceService.ts (2)
constants/index.ts (10)
PRICE_API_MAX_RETRIES(169-169)PRICE_API_TIMEOUT(168-168)RESPONSE_MESSAGES(8-108)PRICE_API_DATE_FORMAT(167-167)NETWORK_TICKERS(181-184)XEC_NETWORK_ID(148-148)BCH_NETWORK_ID(149-149)N_OF_QUOTES(153-153)USD_QUOTE_ID(151-151)CAD_QUOTE_ID(152-152)prisma-local/seeds/prices.ts (1)
getPrices(87-108)
🪛 GitHub Actions: Pull Request Tests
services/priceService.ts
[error] 250-250: The provided database string is invalid. Error parsing connection string: invalid port number in database URL.
🔇 Additional comments (19)
services/triggerService.ts (3)
523-523: Clearer batch context and quantified metrics in preparation log.The change adds currency context and transaction/address counts, improving observability for batch execution. This aligns with the PR objective to clarify logging.
554-554: Detailed task breakdown improves visibility into trigger execution.The updated log clearly delineates post vs. email task counts and affected user counts, making it easier to monitor batch execution characteristics and identify distribution anomalies.
605-605: Summary metrics in completion log provide actionable completion status.The updated log includes logs created, users charged, and breakdown of accepted triggers (email vs. post), enabling quick assessment of batch execution results and success rates.
scripts/updateAllPriceConnections.ts (1)
61-62: LGTM – adding network context to price data.The addition of
networkIdto the price selection aligns with the PR's broader introduction of network-aware price data. WhilenetworkIdisn't directly consumed in this script's logic (line 82 only usestimestamp), it's included in the data passed toconnectTransactionsListToPrices(line 90), which may require or benefit from network context.jobs/workers.ts (1)
58-65: LGTM! Good symmetric cleanup on failure.The failure handler now mirrors the completed handler by wrapping in an async IIFE and calling
multiBlockchainClient.destroy(). This ensures proper resource cleanup regardless of job outcome.services/chronikService.ts (7)
183-201: LGTM! Simplified timestamp arithmetic.Replacing moment-based time diff with direct arithmetic (
now - timestamp) is more efficient and clearer. The age calculation is correct.
594-606: LGTM! Well-implemented retry logic for mempool race conditions.The exponential backoff strategy (400ms, 800ms, 1600ms) with 404-specific handling correctly addresses the mempool race condition described in the PR objectives. The logic properly distinguishes between "not found yet" (404) and genuine errors.
633-663: LGTM! Critical fix for transaction processing freeze.Wrapping the entire mempool processing path in try/catch/finally directly addresses the freeze bug described in the PR objectives. The
finallyblock ensuresmempoolTxsBeingProcessedis always decremented, preventing the counter from becoming permanently stuck and blocking future transactions.Key improvements:
- Line 643: Uses
fetchTxWithRetry()to handle mempool race conditions- Lines 658-662:
finallyblock guarantees cleanup even on error- Defensive
Math.max(0, ...)prevents negative counter values
673-674: LGTM! Improved observability.Adding subscription count logging helps monitor the health of address subscriptions after block processing completes.
301-410: LGTM! Enhanced progress tracking and clearer logging.The improvements to logging and progress counters significantly enhance observability during parallel fetching:
- Lines 320-326: Progress counters show sync state
- Lines 336-383: Per-address transaction counts
- Lines 393-404: Clarified comments explain batch yielding and final flush logic
801-853: LGTM! Robust batch-scoped error handling.The introduction of
involvedAddrIds(line 801) and batch-scoped try/catch (lines 803-853) is excellent. When a batch fails:
- Only addresses in that specific batch are marked as failed (lines 847-851)
- Processing continues with remaining batches (line 852)
- Other addresses aren't penalized by one batch's failure
This directly addresses the Prisma timeout issues mentioned in the PR objectives by containing failures and allowing partial success.
892-902: LGTM! Clearer error and completion messages.The logging improvements make error severity and completion status more explicit:
- Line 892: "FATAL ERROR in parallel sync" clearly indicates scope and severity
- Line 902: Removed potentially confusing "PARALLEL" prefix from completion message
constants/index.ts (2)
282-284: Concurrency and batch size reductions align with fixing Prisma timeouts.These reductions (75-80%) should effectively address Prisma timeout errors by reducing parallel database load and committing smaller batches more frequently. All three constants are used correctly throughout the codebase for their intended batching and concurrency control purposes. Since the PR indicates these changes ran successfully in production for weeks, the performance trade-off appears acceptable.
169-169: Verify: Confirm the intentional trade-off of eliminating price API retries.Based on code analysis,
PRICE_API_MAX_RETRIES = 1means zero actual retries—only a single attempt before failure. This is a significant reduction in resilience compared toSYNC_TXS_JOBS_MAX_RETRIES = 3. No documentation in the code explains this choice.While this may intentionally prevent cascading timeouts, it makes the price fetch fail immediately on any transient API issue. Please confirm this trade-off aligns with your PR objectives and that the price API is sufficiently reliable to warrant this reduced tolerance.
jobs/initJobs.ts (3)
32-39: Cleanup configuration aligns with the fresh-start strategy.The addition of
removeOnComplete: trueandremoveOnFail: trueensures no job history accumulates. This is consistent with the obliterate-on-startup approach and helps prevent stale job state from interfering with new executions.This configuration pairs well with the worker's cleanup logic (in workers.ts lines 33-65) that destroys the
multiBlockchainClienton both completion and failure.
47-48: Standard cleanup configuration for repeating jobs.Adding
removeOnComplete: trueandremoveOnFail: trueto this repeating job prevents unbounded accumulation of job history records. This is a best practice for long-running repeating jobs and aligns with the overall cleanup strategy.
10-17: No issues found after verification — resolve as approved.The obliterate behavior is acceptable because both
syncMissedTransactions()andconnectAllTransactionsToPrices()are idempotent operations:
- They query for work to do (addresses to sync, transactions needing price connections) rather than resuming from checkpoints
- Restarting mid-process simply re-fetches and re-processes the same data
- The inline comment "// --- force fresh start ---" already documents the intent
This design intentionally sacrifices job history and aggressive cleanup to fix the "permanently block" bug by ensuring a clean state on every startup. The trade-off is acceptable.
prisma-local/seeds/prices.ts (1)
36-36: Seed call should throw on failures — good change.Passing throwOnFailure=true is appropriate for seeding; it surfaces API issues early.
services/transactionService.ts (1)
559-595: Good: batched upserts inside a single transaction with a timeout.BATCH_SIZE=50 and timeout guard match the PR goal to reduce Prisma timeouts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
tests/unittests/chronikService.test.ts(2 hunks)tests/unittests/transactionService.test.ts(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/unittests/transactionService.test.ts (2)
prisma-local/mockedClient.ts (1)
prismaMock(19-19)tests/mockedObjects.ts (1)
mockedUSDPriceOnTransaction(384-398)
🪛 ESLint
tests/unittests/chronikService.test.ts
[error] 1323-1324: More than 1 blank line not allowed.
(no-multiple-empty-lines)
[error] 1332-1332: Extra space before value for key 'confirmed'.
(key-spacing)
🔇 Additional comments (4)
tests/unittests/transactionService.test.ts (1)
51-52: LGTM! Improved error handling withfindUniqueOrThrow.The change from
findUniquetofindUniqueOrThrowprovides better fail-fast behavior when price data is missing, making error conditions more explicit rather than silently continuing with null values.tests/unittests/chronikService.test.ts (3)
24-24: LGTM! Mock addition enables transaction-related testing.The
txmock addition to the ChronikClient enables the new regression tests to validate transaction fetching and retry behavior.
1274-1297: Excellent regression test for counter leak prevention.This test validates the critical fix for the transaction processing freeze bug by ensuring the
mempoolTxsBeingProcessedcounter is properly decremented in the finally block even when errors occur. The test correctly:
- Simulates a failing chronik.tx call
- Verifies counter returns to 0 after error
- Confirms subsequent transactions can still be processed
1299-1321: Strong validation of retry logic for mempool race conditions.This test effectively validates the
fetchTxWithRetrymethod handles transient 404 errors (mempool race conditions) by retrying until success. The use ofsetImmediateto wait for async constructor initialization is good practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (4)
services/userService.ts (1)
75-80: Simplify by using Buffer.concat directly.The manual
Uint8Arrayconstruction with runtime type checks is unnecessarily complex. BothprefixPrivateEd25519(line 74) andseed(line 69) are alwaysBufferinstances, so theinstanceof Uint8Arraychecks are redundant. The originalBuffer.concat([prefixPrivateEd25519, seed])would be simpler and equally correct.Apply this diff to simplify:
- - const der = new Uint8Array(prefixPrivateEd25519.length + seed.length) - der.set(prefixPrivateEd25519 instanceof Uint8Array ? prefixPrivateEd25519 : new Uint8Array(prefixPrivateEd25519)) - der.set(seed instanceof Uint8Array ? seed : new Uint8Array(seed), prefixPrivateEd25519.length) - const derBuf = Buffer.from(der) - return crypto.createPrivateKey({ key: derBuf, format: 'der', type: 'pkcs8' }) + const der = Buffer.concat([prefixPrivateEd25519, seed]) + return crypto.createPrivateKey({ key: der, format: 'der', type: 'pkcs8' })utils/validators.ts (1)
251-258: Remove dead code branch.The conditional check at line 252 is unnecessary because
payloadis always astring(returned bygetSignaturePayloadat line 245 via.join('+')). The non-string branch (line 254) with theas unknown as Uint8Arraycast is unreachable dead code. BothBuffer.from(payload)andTextEncoder().encode(payload)use UTF-8 encoding for strings, so the encoding behavior is equivalent.Apply this diff to remove the dead code:
- - const data: Uint8Array = typeof payload === 'string' - ? new TextEncoder().encode(payload) - : payload as unknown as Uint8Array - const signature = crypto.sign( null, - data, + Buffer.from(payload), pk )Alternatively, if you prefer
TextEncoder:- - const data: Uint8Array = typeof payload === 'string' - ? new TextEncoder().encode(payload) - : payload as unknown as Uint8Array - + const data = new TextEncoder().encode(payload) const signature = crypto.sign( null, data, pk )services/chronikService.ts (2)
594-608: ResilientfetchTxWithRetrywith backoff — nice.Small nit: consider
console.warnfor expected 404-retry logs to reduce noise in error monitoring.
725-733: Deduplicate related addresses before DB lookups.Avoid redundant queries and processing:
- const inputAddresses = transaction.inputs.map(inp => outputScriptToAddress(this.networkSlug, inp.outputScript)) - const outputAddresses = transaction.outputs.map(out => outputScriptToAddress(this.networkSlug, out.outputScript)) - return [...inputAddresses, ...outputAddresses].filter(a => a !== undefined) + const inputAddresses = transaction.inputs.map(inp => outputScriptToAddress(this.networkSlug, inp.outputScript)) + const outputAddresses = transaction.outputs.map(out => outputScriptToAddress(this.networkSlug, out.outputScript)) + return Array.from( + new Set( + [...inputAddresses, ...outputAddresses].filter((a): a is string => a !== undefined) + ) + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
yarn.lockis excluded by!**/yarn.lock,!**/*.lock
📒 Files selected for processing (5)
package.json(1 hunks)services/chronikService.ts(12 hunks)services/userService.ts(1 hunks)tests/unittests/chronikService.test.ts(2 hunks)utils/validators.ts(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
services/chronikService.ts (5)
constants/index.ts (6)
CHRONIK_MESSAGE_CACHE_DELAY(229-229)INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY(282-282)TX_EMIT_BATCH_SIZE(283-283)MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME(273-273)MEMPOOL_PROCESS_DELAY(275-275)DB_COMMIT_BATCH_SIZE(284-284)services/transactionService.ts (2)
upsertTransaction(374-406)createManyTransactions(557-610)services/triggerService.ts (2)
executeAddressTriggers(234-243)executeTriggersBatch(501-606)prisma-local/seeds/transactions.ts (1)
appendTxsToFile(12-20)ws-service/types.ts (1)
BroadcastTxData(5-9)
tests/unittests/chronikService.test.ts (3)
services/addressService.ts (1)
fetchAddressesArray(189-201)services/transactionService.ts (3)
fetchUnconfirmedTransactions(612-620)deleteTransactions(622-630)upsertTransaction(374-406)services/triggerService.ts (1)
executeAddressTriggers(234-243)
🪛 ESLint
tests/unittests/chronikService.test.ts
[error] 1323-1324: More than 1 blank line not allowed.
(no-multiple-empty-lines)
[error] 1332-1332: Extra space before value for key 'confirmed'.
(key-spacing)
[error] 1387-1387: Require statement not part of import statement.
(@typescript-eslint/no-var-requires)
[error] 1429-1429: Require statement not part of import statement.
(@typescript-eslint/no-var-requires)
[error] 1430-1430: Require statement not part of import statement.
(@typescript-eslint/no-var-requires)
[error] 1481-1481: Expected a line break after this opening brace.
(object-curly-newline)
[error] 1485-1485: Expected a line break before this closing brace.
(object-curly-newline)
[error] 1485-1485: A space is required before '}'.
(object-curly-spacing)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Run Tests
🔇 Additional comments (3)
tests/unittests/chronikService.test.ts (1)
1273-1344: Great regression coverage for mempool, retries, TTL, and WS matrix.The tests exercise the retry/backoff, mempool counter finally, and block finalization flow well.
Also applies to: 1347-1520
services/chronikService.ts (2)
183-201: TTL unit mismatch risk inclearOldMessages(seconds vs milliseconds).You store timestamps via
moment().unix()(seconds) and compareageDifftoCHRONIK_MESSAGE_CACHE_DELAY. If that constant is in milliseconds (e.g., 3000 = 3s), entries will live ~50 minutes. Either rename the constant to clarify units or convert consistently.Option A: keep seconds everywhere, convert constant:
- const ageDiff = now - Number(this.lastProcessedMessages.unconfirmed[key]) - if (ageDiff > CHRONIK_MESSAGE_CACHE_DELAY) { + const ageSec = now - Number(this.lastProcessedMessages.unconfirmed[key]) + if (ageSec > CHRONIK_MESSAGE_CACHE_DELAY / 1000) { const { [key]: _, ...rest } = this.lastProcessedMessages.unconfirmed this.lastProcessedMessages.unconfirmed = rest }Repeat for the
confirmedloop. Alternatively, switch toDate.now()and store ms.
635-663: Mempool handler finally-decrements counter — good safeguard.This prevents freezing under exceptions and caps concurrency via
MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME.
| }, | ||
| "devDependencies": { | ||
| "@types/jest": "^27.5.1", | ||
| "@types/jest": "^30.0.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Align Jest types with Jest/ts-jest versions.
@types/jest is ^30 while jest is ^29.7 and ts-jest is ^29.2.5. This can cause TS inference/compile breakage.
Two safe paths:
- Keep Jest 29: downgrade types.
- "@types/jest": "^30.0.0",
+ "@types/jest": "^29.5.12",- Or upgrade the stack together (only if you intend to move): bump
jestandts-jestto matching 30.x.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "@types/jest": "^30.0.0", | |
| "@types/jest": "^29.5.12", |
🤖 Prompt for AI Agents
In package.json around line 75, @types/jest is ^30 while jest is ^29.7 and
ts-jest is ^29.2.5 causing type/compile mismatch; fix by either downgrading
@types/jest to a 29.x version that aligns with jest (e.g., ^29.0.0) or by
upgrading both jest and ts-jest to matching 30.x versions (bump jest and ts-jest
to 30.x, update any config if required), then run install and the test/type
checks to verify no remaining type errors.
|
|
||
| it('clearOldMessages expires entries by TTL and from the correct maps', () => { | ||
| process.env.WS_AUTH_KEY = 'test-auth-key' | ||
| const client = new ChronikBlockchainClient('ecash') | ||
|
|
||
| const nowSec = Math.floor(Date.now() / 1000) | ||
| // build state: one old and one fresh in each map | ||
| ;(client as any).lastProcessedMessages = { | ||
| unconfirmed: { u_old: nowSec - 999999, u_new: nowSec - 1 }, | ||
| confirmed: { c_old: nowSec - 999999, c_new: nowSec - 1 } | ||
| } | ||
|
|
||
| // run cleanup | ||
| ;(client as any).clearOldMessages() | ||
|
|
||
| // old entries should be gone, fresh ones should remain | ||
| expect((client as any).lastProcessedMessages.unconfirmed.u_old).toBeUndefined() | ||
| expect((client as any).lastProcessedMessages.confirmed.c_old).toBeUndefined() | ||
| expect((client as any).lastProcessedMessages.unconfirmed.u_new).toBeDefined() | ||
| expect((client as any).lastProcessedMessages.confirmed.c_new).toBeDefined() | ||
| }) | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix ESLint formatting: blank lines, key spacing, object-curly newlines/spacing.
Apply these minimal fixes:
- Collapse extra blank line and fix key spacing.
- ;(client as any).lastProcessedMessages = {
- unconfirmed: { u_old: nowSec - 999999, u_new: nowSec - 1 },
- confirmed: { c_old: nowSec - 999999, c_new: nowSec - 1 }
- }
+ ;(client as any).lastProcessedMessages = {
+ unconfirmed: { u_old: nowSec - 999999, u_new: nowSec - 1 },
+ confirmed: { c_old: nowSec - 999999, c_new: nowSec - 1 }
+ }- Conform object-curly-newline/spacing when overriding
client.chronik.
- ;(client.chronik as any) = { tx: jest.fn(async () => {
- attempts += 1
- if (attempts < 3) throw new Error('Transaction not found in the index')
- return { txid: 'tx404', inputs: [], outputs: [] }
- })}
+ ;(client.chronik as any) = {
+ tx: jest.fn(async () => {
+ attempts += 1
+ if (attempts < 3) throw new Error('Transaction not found in the index')
+ return { txid: 'tx404', inputs: [], outputs: [] }
+ })
+ }Also applies to: 1481-1485
🧰 Tools
🪛 ESLint
[error] 1323-1324: More than 1 blank line not allowed.
(no-multiple-empty-lines)
[error] 1332-1332: Extra space before value for key 'confirmed'.
(key-spacing)
🤖 Prompt for AI Agents
In tests/unittests/chronikService.test.ts around lines 1323-1344 (also apply
same fixes at 1481-1485): remove the extra blank line, normalize key spacing so
object keys align as { u_old: ..., u_new: ... } and { c_old: ..., c_new: ... },
and fix object-curly-newline/spacing when overriding client.chronik (ensure
braces and properties are on the same line or follow the repo's
object-curly-newline rule consistently). Make only these minimal formatting
changes to satisfy ESLint.
| const { fetchUnconfirmedTransactions, deleteTransactions } = require('../../services/transactionService') | ||
| fetchUnconfirmedTransactions.mockResolvedValueOnce(['tx-to-del']) | ||
| deleteTransactions.mockResolvedValueOnce(undefined) | ||
|
|
||
| await client.processWsMessage({ type: 'Tx', msgType: 'TX_REMOVED_FROM_MEMPOOL', txid: 'deadbeef' }) | ||
|
|
||
| expect(fetchUnconfirmedTransactions).toHaveBeenCalledWith('deadbeef') | ||
| expect(deleteTransactions).toHaveBeenCalledWith(['tx-to-del']) | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace inline require with typed imports (or silence with ESLint directive).
Prefer imports for mocked modules to satisfy @typescript-eslint/no-var-requires:
Add near the top (module scope):
import * as addressService from '../../services/addressService'
import * as txService from '../../services/transactionService'
import * as triggerService from '../../services/triggerService'Then update usages in the tests:
- const { fetchUnconfirmedTransactions, deleteTransactions } = require('../../services/transactionService')
- fetchUnconfirmedTransactions.mockResolvedValueOnce(['tx-to-del'])
- deleteTransactions.mockResolvedValueOnce(undefined)
+ const fetchUnconfirmedTransactions = jest.spyOn(txService, 'fetchUnconfirmedTransactions').mockResolvedValueOnce(['tx-to-del'] as any)
+ const deleteTransactions = jest.spyOn(txService, 'deleteTransactions').mockResolvedValueOnce(undefined as any)- ;({ fetchAddressesArray } = require('../../services/addressService'))
- fetchAddressesArray.mockResolvedValue([
+ fetchAddressesArray = jest.spyOn(addressService, 'fetchAddressesArray').mockResolvedValue([
{ id: 'addr-1', address: 'ecash:qqkv9...', networkId: 1, syncing: false, lastSynced: new Date().toISOString() }
] as any)- const { upsertTransaction } = require('../../services/transactionService')
- const { executeAddressTriggers } = require('../../services/triggerService')
+ const upsertTransaction = jest.spyOn(txService, 'upsertTransaction')
+ const executeAddressTriggers = jest.spyOn(triggerService, 'executeAddressTriggers')If you prefer to keep require, add per-line disables:
// eslint-disable-next-line @typescript-eslint/no-var-requiresAlso applies to: 1429-1435
🧰 Tools
🪛 ESLint
[error] 1387-1387: Require statement not part of import statement.
(@typescript-eslint/no-var-requires)
🤖 Prompt for AI Agents
In tests/unittests/chronikService.test.ts around lines 1387 to 1395, the test
uses inline require(...) to import mocked services which violates
@typescript-eslint/no-var-requires; replace the inline require calls with
module-scope typed imports (e.g. import * as txService from
'../../services/transactionService' and likewise for addressService and
triggerService) and update the mocked calls to use the imported names (e.g.
txService.fetchUnconfirmedTransactions). Alternatively, if you must keep
require, add an eslint disable comment directly above each require line to
silence the rule. Also apply the same change to the similar block at lines
1429-1435.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
services/chronikService.ts (1)
625-634: Wrap TX_CONFIRMED handler with try/catch to prevent unhandled rejections.The TX_CONFIRMED path is not protected by error handling. If
fetchTxWithRetryexhausts retries, or ifgetAddressesForTransactionorhandleUpdateClientPaymentStatusfail, the rejection will be unhandled sinceprocessWsMessageis called fire-and-forget (line 504). This is the same issue flagged in the previous review.Apply this diff to add error handling:
} else if (msg.msgType === 'TX_CONFIRMED') { + try { const transaction = await this.fetchTxWithRetry(msg.txid) const addressesWithTransactions = await this.getAddressesForTransaction(transaction) console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] ${msg.txid}`) this.confirmedTxsHashesFromLastBlock = [...this.confirmedTxsHashesFromLastBlock, msg.txid] for (const addressWithTransaction of addressesWithTransactions) { const { amount, opReturn } = addressWithTransaction.transaction await this.handleUpdateClientPaymentStatus(amount, opReturn, 'CONFIRMED' as ClientPaymentStatus, addressWithTransaction.address.address) } + } catch (e) { + console.error(`${this.CHRONIK_MSG_PREFIX}: confirmed handler failed for ${msg.txid}`, e) + } } else if (msg.msgType === 'TX_ADDED_TO_MEMPOOL') {
🧹 Nitpick comments (1)
services/chronikService.ts (1)
376-378: Redundant stopping condition check.Lines 376-378 check
if (newTxsInThisPage === 0 && oldestTs < lastSyncedTimestampSeconds)but line 371-373 already setshasReachedStoppingCondition = truewhenoldestTs < lastSyncedTimestampSeconds. This second check is redundant and can be removed for clarity.Apply this diff to remove the redundant check:
if (oldestTs < lastSyncedTimestampSeconds) { hasReachedStoppingCondition = true } nextBurstBasePageIndex += 1 - if (newTxsInThisPage === 0 && oldestTs < lastSyncedTimestampSeconds) { - hasReachedStoppingCondition = true - } newTxs += newTxsInThisPage }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
services/chronikService.ts(13 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
services/chronikService.ts (6)
components/Transaction/PaybuttonTransactions.tsx (1)
moment(40-124)constants/index.ts (6)
CHRONIK_MESSAGE_CACHE_DELAY(229-229)INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY(282-282)TX_EMIT_BATCH_SIZE(283-283)MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME(273-273)MEMPOOL_PROCESS_DELAY(275-275)DB_COMMIT_BATCH_SIZE(284-284)services/transactionService.ts (2)
upsertTransaction(374-406)createManyTransactions(557-610)services/triggerService.ts (2)
executeAddressTriggers(234-243)executeTriggersBatch(501-606)prisma-local/seeds/transactions.ts (1)
appendTxsToFile(12-20)ws-service/types.ts (1)
BroadcastTxData(5-9)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Run Tests
🔇 Additional comments (7)
services/chronikService.ts (7)
594-608: LGTM! Solid retry implementation with exponential backoff.The
fetchTxWithRetryhelper correctly implements exponential backoff for 404 errors (mempool race conditions), while immediately throwing for other error types. This addresses the PR's objective of fixing mempool race conditions by retrying Chronik transaction lookups.
634-664: LGTM! Robust error handling prevents tx processing freeze.The TX_ADDED_TO_MEMPOOL handler now properly wraps all processing in try/catch/finally, ensuring that:
- Errors don't block subsequent transaction processing (addressing the PR's freeze bug)
- The
mempoolTxsBeingProcessedcounter is always decremented via finally blockfetchTxWithRetryhandles mempool race conditions with retriesThe use of
Math.max(0, ...)on line 662 is defensive and prevents counter corruption.
674-676: LGTM! Useful diagnostic logging.Adding subscription count logging after block finalization helps monitor the health of WebSocket subscriptions.
802-854: LGTM! Improved error isolation in batch processing.The addition of
involvedAddrIds(line 802) and scoped error handling (lines 845-854) ensures that when a batch fails, only the addresses involved in that batch are marked as failed, allowing other addresses to continue processing. This improves resilience and addresses the Prisma timeout concerns mentioned in the PR objectives.
858-884: LGTM! Final flush properly handles remaining transactions.The final flush logic correctly processes any remaining transactions after all address batches complete, maintaining consistency with the in-loop commit logic and properly executing triggers for newly created transactions.
892-899: LGTM! Fatal error handler preserves partial success.The fatal error handler (lines 892-899) only marks addresses that made zero progress as failed, allowing addresses that successfully synced some transactions to be considered successful. This is a good resilience pattern.
903-903: LGTM! Clear summary logging.The final log statement provides a clear summary of the sync operation with transaction count, address count, and error count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (6)
services/chronikService.ts (1)
625-637: Excellent: TX_CONFIRMED error handling implemented.The try-catch block addresses the critical issue flagged in previous reviews. Failures in
fetchTxWithRetryor downstream processing are now caught and logged, preventing unhandled rejections that could stall the processing flow.tests/unittests/chronikService.test.ts (5)
1323-1343: Fix ESLint formatting issues.The formatting issues flagged in previous reviews remain unresolved.
Apply this diff to fix blank line and key spacing:
}) - it('clearOldMessages expires entries by TTL and from the correct maps', () => { process.env.WS_AUTH_KEY = 'test-auth-key' const client = new ChronikBlockchainClient('ecash') const nowSec = Math.floor(Date.now() / 1000) // build state: one old and one fresh in each map ;(client as any).lastProcessedMessages = { unconfirmed: { u_old: nowSec - 999999, u_new: nowSec - 1 }, - confirmed: { c_old: nowSec - 999999, c_new: nowSec - 1 } + confirmed: { c_old: nowSec - 999999, c_new: nowSec - 1 } }
1370-1381: Replace inlinerequirewith typed imports.The inline
requirestatements violate@typescript-eslint/no-var-requires. As noted in previous reviews, prefer module-scope imports:+import * as addressService from '../../services/addressService' beforeEach(() => { jest.clearAllMocks() // ... client setup ... - ;({ fetchAddressesArray } = require('../../services/addressService')) + fetchAddressesArray = jest.spyOn(addressService, 'fetchAddressesArray').mockResolvedValue([ - fetchAddressesArray.mockResolvedValue([ { id: 'addr-1', address: 'ecash:qqkv9wr69ry2p9l53lxp635va4h86wv435995w8p2h', networkId: 1, syncing: false, lastSynced: new Date().toISOString() } - ]) + ] as any)
1428-1465: Replace inlinerequirewith typed imports.Same issue as earlier - use module-scope imports instead of
require:+import * as transactionService from '../../services/transactionService' +import * as triggerService from '../../services/triggerService' it('handles TX_ADDED_TO_MEMPOOL → calls fetchTxWithRetry and upserts, triggers once', async () => { - const { upsertTransaction } = require('../../services/transactionService') - const { executeAddressTriggers } = require('../../services/triggerService') + const upsertTransaction = jest.spyOn(transactionService, 'upsertTransaction') + const executeAddressTriggers = jest.spyOn(triggerService, 'executeAddressTriggers')
1476-1501: Fix object-curly-newline and spacing issues.The ESLint errors flagged in previous reviews remain:
let attempts = 0 // drive underlying chronik.tx via the real fetchTxWithRetry - ;(client.chronik as any) = { tx: jest.fn(async () => { - attempts += 1 - if (attempts < 3) throw new Error('Transaction not found in the index') - return { txid: 'tx404', inputs: [], outputs: [] } - })} + ;(client.chronik as any) = { + tx: jest.fn(async () => { + attempts += 1 + if (attempts < 3) throw new Error('Transaction not found in the index') + return { txid: 'tx404', inputs: [], outputs: [] } + }) + }
1386-1395: Use explicit module imports with jest.spyOn() for reliable mock setupThe implementation imports
fetchUnconfirmedTransactionsanddeleteTransactionsat module scope from'./transactionService'(line 6-15 of services/chronikService.ts), but the test requires them inside the test function using'../../services/transactionService'. While both paths resolve to the same file, this creates a potential module reference mismatch.The more reliable approach is to import the module explicitly at test scope and use
jest.spyOn():+import * as transactionService from '../../services/transactionService' + it('handles TX_REMOVED_FROM_MEMPOOL → deletes unconfirmed txs', async () => { - const { fetchUnconfirmedTransactions, deleteTransactions } = require('../../services/transactionService') - fetchUnconfirmedTransactions.mockResolvedValueOnce(['tx-to-del']) - deleteTransactions.mockResolvedValueOnce(undefined) + jest.spyOn(transactionService, 'fetchUnconfirmedTransactions') + .mockResolvedValueOnce(['tx-to-del'] as any) + jest.spyOn(transactionService, 'deleteTransactions') + .mockResolvedValueOnce(undefined as any) await client.processWsMessage({ type: 'Tx', msgType: 'TX_REMOVED_FROM_MEMPOOL', txid: 'deadbeef' }) expect(transactionService.fetchUnconfirmedTransactions).toHaveBeenCalledWith('deadbeef') expect(transactionService.deleteTransactions).toHaveBeenCalledWith(['tx-to-del']) })Apply the same pattern to line 1429 and other tests using the require() pattern within test bodies.
🧹 Nitpick comments (3)
services/chronikService.ts (3)
594-608: Well-implemented retry logic with exponential backoff.The retry mechanism correctly:
- Detects 404-ish errors via regex pattern matching
- Implements exponential backoff (1s, 2s, 4s delays)
- Re-throws non-retryable errors immediately
- Re-throws after exhausting retries
The pipeline "failure" showing the log at line 603 is expected behavior during retries, not an actual error.
Consider reducing log verbosity in production by changing from
console.errortoconsole.logorconsole.warn:- console.error(`Got a 404 Error trying to fetch tx ${txid} on the attempt number ${i + 1}, waiting ${(delay / 1000).toFixed(1)}s...`) + console.log(`${this.CHRONIK_MSG_PREFIX}: Retrying tx ${txid} (attempt ${i + 1}/${tries}), waiting ${(delay / 1000).toFixed(1)}s...`)
307-410: Improved observability and batching logic.The updates enhance logging and progress tracking, making it easier to monitor sync operations. The batching logic properly yields when the buffer reaches
TX_EMIT_BATCH_SIZEand handles final flushes correctly.Lines 376-378 contain a stopping condition that appears redundant with the check at lines 371-372:
if (oldestTs < lastSyncedTimestampSeconds) { hasReachedStoppingCondition = true } // Later... if (newTxsInThisPage === 0 && oldestTs < lastSyncedTimestampSeconds) { hasReachedStoppingCondition = true }Consider simplifying to:
if (oldestTs < lastSyncedTimestampSeconds) { hasReachedStoppingCondition = true } nextBurstBasePageIndex += 1 -if (newTxsInThisPage === 0 && oldestTs < lastSyncedTimestampSeconds) { - hasReachedStoppingCondition = true -}
806-858: Excellent scoped error handling for batch processing.The introduction of
involvedAddrIds(line 806) enables precise error isolation - only addresses in a failing batch are marked as failed, while others can continue processing. This significantly improves resilience compared to failing the entire sync on a single batch error.The try-catch structure properly:
- Processes transactions in parallel (lines 809-814)
- Commits in batches when reaching
DB_COMMIT_BATCH_SIZE(lines 822-848)- Isolates failures to affected addresses only (lines 852-856)
- Continues processing remaining batches after errors (line 857)
Consider extracting the trigger batching logic that's duplicated between partial commits (lines 835-847) and final flush (lines 875-887):
private async processTriggerBatch( createdTxs: TransactionWithAddressAndPrices[], commitPairs: RowWithRaw[], runTriggers: boolean ): Promise<void> { if (createdTxs.length === 0) return const rawByHash = new Map(commitPairs.map(p => [p.raw.txid, p.raw])) const triggerBatch: BroadcastTxData[] = [] for (const createdTx of createdTxs) { const raw = rawByHash.get(createdTx.hash) if (raw == null) continue const bd = this.broadcastIncomingTx(createdTx.address.address, raw, createdTx) triggerBatch.push(bd) } if (runTriggers && triggerBatch.length > 0) { await executeTriggersBatch(triggerBatch, this.networkId) } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
services/chronikService.ts(13 hunks)tests/unittests/chronikService.test.ts(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/unittests/chronikService.test.ts (2)
services/addressService.ts (1)
fetchAddressesArray(189-201)services/transactionService.ts (3)
fetchUnconfirmedTransactions(612-620)deleteTransactions(622-630)upsertTransaction(374-406)
services/chronikService.ts (5)
constants/index.ts (6)
CHRONIK_MESSAGE_CACHE_DELAY(229-229)INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY(282-282)TX_EMIT_BATCH_SIZE(283-283)MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME(273-273)MEMPOOL_PROCESS_DELAY(275-275)DB_COMMIT_BATCH_SIZE(284-284)services/transactionService.ts (2)
upsertTransaction(374-406)createManyTransactions(557-610)services/triggerService.ts (2)
executeAddressTriggers(234-243)executeTriggersBatch(501-606)prisma-local/seeds/transactions.ts (1)
appendTxsToFile(12-20)ws-service/types.ts (1)
BroadcastTxData(5-9)
🪛 ESLint
tests/unittests/chronikService.test.ts
[error] 1323-1324: More than 1 blank line not allowed.
(no-multiple-empty-lines)
[error] 1332-1332: Extra space before value for key 'confirmed'.
(key-spacing)
[error] 1387-1387: Require statement not part of import statement.
(@typescript-eslint/no-var-requires)
[error] 1429-1429: Require statement not part of import statement.
(@typescript-eslint/no-var-requires)
[error] 1430-1430: Require statement not part of import statement.
(@typescript-eslint/no-var-requires)
[error] 1481-1481: Expected a line break after this opening brace.
(object-curly-newline)
[error] 1485-1485: Expected a line break before this closing brace.
(object-curly-newline)
[error] 1485-1485: A space is required before '}'.
(object-curly-spacing)
🪛 GitHub Actions: Pull Request Tests
tests/unittests/chronikService.test.ts
[error] 1393-1393: TX_REMOVED_FROM_MEMPOOL test failed: expected fetchUnconfirmedTransactions to be called with 'deadbeef' but it was not called (Number of calls: 0).
[error] 1424-1424: TX_CONFIRMED test did not trigger fetchTxWithRetry or payments update as expected (no calls were observed).
services/chronikService.ts
[error] 603-603: FetchTxWithRetry encountered a 404 error: Got a 404 Error trying to fetch tx txABC on the attempt number 1, waiting 1.0s...
🔇 Additional comments (8)
tests/unittests/chronikService.test.ts (4)
23-25: LGTM!The addition of the
tx: jest.fn()mock to the ChronikClient strategy enables testing of transaction fetch paths, including the newfetchTxWithRetrylogic.
1275-1297: Excellent test coverage for counter leak prevention.This test correctly validates that
mempoolTxsBeingProcessedis decremented in thefinallyblock even when errors occur, directly addressing the PR objective of preventing transaction processing from freezing on errors.
1299-1321: Good coverage of retry logic.This test validates that
fetchTxWithRetryretries on 404-ish errors and eventually succeeds, which addresses the mempool race condition mentioned in the PR objectives.
1397-1426: Remove incorrect mocks and add the missinggetAddressesForTransactionmock.The test fails because it mocks the wrong methods. The
TX_CONFIRMEDhandler callsgetAddressesForTransaction, notgetRelatedAddressesForTransactionorgetTransactionFromChronikTransaction. The missing mock forgetAddressesForTransactioncauses the loop to iterate over undefined, failing the test.Replace lines 1405–1418 with:
- // deterministic related addresses - jest.spyOn(client as any, 'getRelatedAddressesForTransaction') - .mockReturnValue(['ecash:qqkv9wr69ry2p9l53lxp635va4h86wv435995w8p2h']) - - // minimal transaction shape for downstream - jest.spyOn(client as any, 'getTransactionFromChronikTransaction') - .mockResolvedValue({ - hash: 'txCONF', - amount: '0.01', - timestamp: Math.floor(Date.now() / 1000), - addressId: 'addr-1', - confirmed: false, - opReturn: JSON.stringify({ message: { type: 'PAY', paymentId: 'pid-1' } }) - }) + + jest.spyOn(client as any, 'getAddressesForTransaction').mockResolvedValue([ + { + address: { address: 'ecash:qqkv9wr69ry2p9l53lxp635va4h86wv435995w8p2h', networkId: 1 }, + transaction: { + hash: 'txCONF', + amount: '0.01', + timestamp: Math.floor(Date.now() / 1000), + addressId: 'addr-1', + confirmed: false, + opReturn: JSON.stringify({ message: { type: 'PAY', paymentId: 'pid-1' } }) + } + } + ])Likely an incorrect or invalid review comment.
services/chronikService.ts (4)
183-201: LGTM: Unit mismatch resolved.The time unit consistency issue flagged in previous reviews has been properly addressed. The code now correctly:
- Stores timestamps in seconds via
moment().unix()(lines 213, 220)- Computes age difference in seconds:
(now - lastTimestamp)- Converts to milliseconds:
* 1000- Compares against
CHRONIK_MESSAGE_CACHE_DELAY(3000 ms)This ensures cache entries expire after 3 seconds as intended.
638-667: Robust mempool processing with proper counter management.The implementation correctly addresses the PR objectives:
- Early return (line 639) prevents duplicate processing
- Backpressure (lines 641-643) limits concurrent mempool transactions
- Counter increment (line 645) tracks in-flight processing
- Try-catch-finally (lines 646-667) ensures counter is always decremented
- fetchTxWithRetry (line 648) handles transient 404 errors
The
Math.max(0, ...)guard in the finally block (line 666) is a good defensive practice to prevent negative counters.
862-888: Clear final flush with proper trigger execution.The final commit properly handles remaining transactions with "FINAL" logging (line 868) and executes triggers for newly created transactions (lines 884-886). The logic correctly mirrors the partial commit flow while marking it as the final operation.
1067-1067: Condition logic is correct; no issues found.The environment condition verification confirms the intended behavior is implemented correctly:
- When
isRunningApp()returnstrue(normal app, not in test, no JOBS_ENV), subscriptions occur- When
NODE_ENV === 'test'ORJOBS_ENV !== undefined, only latency test waits—subscriptions are skippedThis satisfies the requirement that jobs should not subscribe to initial addresses during startup. When
JOBS_ENVis defined, the code routes to theelse ifbranch that excludessubscribeInitialAddresses().
Related to #
Description
chronik.txthrew an error that would permanently block new txs from being processedTest plan
For each of the points above:
1:
Just make sure you agree the logs are better to read now
2:
For this one, try FIRST ON MASTER to apply the diff:
(this patch can be applied by saving it to
2.patchin the root directory of the repository and then runningpatch -p1 < 2.patch)... then make a tx to any subscribed address. Should throw
MOCK 404. If you try making a tx again, you can check that it won't be processed, thus reproducing the bugThen, for THIS BRANCH, do the same with the patch:
...and notice that even though it throws an error, if you keep making txs, it will keep throwing the error (thus not freezing)
3
Hard to test, will have to see if it happens again. In any case, if it does, it won't break the flow as per 2.
4
Also hard to test, but this has been running for some weeks now and looks like it has stopped (tweaking the parallelization parameters fixed it by avoiding trying to do too much at once.)
Summary by CodeRabbit
Bug Fixes
Chores
Behavior
Tests