Skip to content

Conversation

@chedieck
Copy link
Collaborator

@chedieck chedieck commented Oct 22, 2025

Related to #

Description

  1. Improves many confusing logs
  2. Fix bug where if chronik.tx threw an error that would permanently block new txs from being processed
  3. Fix bug where a mempool race condition would make chronik.tx fail by making sure it tries again three more times with a delay between them.
  4. Fix some prisma timeout errors

Test plan

For each of the points above:

1:

Just make sure you agree the logs are better to read now

2:

For this one, try FIRST ON MASTER to apply the diff:
(this patch can be applied by saving it to 2.patch in the root directory of the repository and then running patch -p1 < 2.patch)

diff --git a/constants/index.ts b/constants/index.ts
index 72ae4c3a..d56cda19 100644
--- a/constants/index.ts
+++ b/constants/index.ts
@@ -270,7 +270,7 @@ export const MAX_DAILY_EMAILS = 100 // If changed, update the DB default accordi
 export const XEC_TX_EXPLORER_URL = 'https://explorer.e.cash/tx/'
 export const BCH_TX_EXPLORER_URL = 'https://blockchair.com/bitcoin-cash/transaction/'
 
-export const MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME = 2
+export const MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME = 1
 export const CHRONIK_INITIALIZATION_DELAY = 2000
 export const MEMPOOL_PROCESS_DELAY = 100
 
diff --git a/services/chronikService.ts b/services/chronikService.ts
index 64330f95..c346c02a 100644
--- a/services/chronikService.ts
+++ b/services/chronikService.ts
@@ -33,6 +33,7 @@ import { AddressType } from 'ecashaddrjs/dist/types'
 import { DecimalJsLike } from '@prisma/client/runtime/library'
 
 const decoder = new TextDecoder()
+let _counter = 0
 
 export function getNullDataScriptData (outputScript: string): OpReturnData | null {
   if (outputScript.length < 2 || outputScript.length % 2 !== 0) {
@@ -613,6 +614,13 @@ export class ChronikBlockchainClient {
         }
         this.mempoolTxsBeingProcessed += 1
         console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] ${msg.txid}`)
+        console.log("OIA", _counter)
+        if (_counter === 0) {
+          _counter +=1
+          throw new Error("MOCK 404")
+        }
+        _counter +=1
+
         const transaction = await this.chronik.tx(msg.txid)
         const addressesWithTransactions = await this.getAddressesForTransaction(transaction)
         await this.waitForSyncing(msg.txid, addressesWithTransactions.map(obj => obj.address.address))

... then make a tx to any subscribed address. Should throw MOCK 404. If you try making a tx again, you can check that it won't be processed, thus reproducing the bug

Then, for THIS BRANCH, do the same with the patch:

diff --git a/constants/index.ts b/constants/index.ts
index a874db7f..7148f045 100644
--- a/constants/index.ts
+++ b/constants/index.ts
@@ -270,7 +270,7 @@ export const MAX_DAILY_EMAILS = 100 // If changed, update the DB default accordi
 export const XEC_TX_EXPLORER_URL = 'https://explorer.e.cash/tx/'
 export const BCH_TX_EXPLORER_URL = 'https://blockchair.com/bitcoin-cash/transaction/'
 
-export const MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME = 2
+export const MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME = 1
 export const CHRONIK_INITIALIZATION_DELAY = 2000
 export const MEMPOOL_PROCESS_DELAY = 100
 
diff --git a/services/chronikService.ts b/services/chronikService.ts
index 1c3ec27f..d2b35b90 100644
--- a/services/chronikService.ts
+++ b/services/chronikService.ts
@@ -33,6 +33,7 @@ import { AddressType } from 'ecashaddrjs/dist/types'
 import { DecimalJsLike } from '@prisma/client/runtime/library'
 
 const decoder = new TextDecoder()
+let _counter = 0
 
 export function getNullDataScriptData (outputScript: string): OpReturnData | null {
   if (outputScript.length < 2 || outputScript.length % 2 !== 0) {
@@ -640,6 +641,12 @@ export class ChronikBlockchainClient {
         this.mempoolTxsBeingProcessed += 1
         try {
           console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] ${msg.txid}`)
+          console.log("OIA", _counter)
+          if (_counter === 0) {
+            _counter +=1
+            throw new Error("MOCK 404")
+          }
+          _counter +=1
           const transaction = await this.fetchTxWithRetry(msg.txid)
           const addressesWithTransactions = await this.getAddressesForTransaction(transaction)
           await this.waitForSyncing(msg.txid, addressesWithTransactions.map(obj => obj.address.address))

...and notice that even though it throws an error, if you keep making txs, it will keep throwing the error (thus not freezing)

3

Hard to test, will have to see if it happens again. In any case, if it does, it won't break the flow as per 2.

4

Also hard to test, but this has been running for some weeks now and looks like it has stopped (tweaking the parallelization parameters fixed it by avoiding trying to do too much at once.)

Summary by CodeRabbit

  • Bug Fixes

    • Improved transaction fetch reliability with retries/backoff, safer mempool handling, ensured worker cleanup on failures, and finer-grained batch error containment.
  • Chores

    • Lowered retry counts, batch sizes and concurrency to reduce resource usage and speed startup.
    • Reset processing queues on startup and consolidated batch-oriented logging.
  • Behavior

    • More resilient price fetching/renewal; transaction–price linking now includes network context and stricter completeness checks.
    • Improved sync progress reporting and commit semantics.
  • Tests

    • Added unit coverage for mempool retries, fetch retries, message TTL cleanup and related workflows.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 22, 2025

Walkthrough

Reduced retry counts and batching sizes; added queue obliteration and auto-remove job flags; introduced generic retry helpers and Chronik tx retry/cleanup; made price fetching and transaction-price linking network-aware with batched upserts; tightened mempool/error handling; updated tests and logging.

Changes

Cohort / File(s) Summary
Configuration
constants/index.ts
Reduced PRICE_API_MAX_RETRIES 3→1, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY 512→128, TX_EMIT_BATCH_SIZE 10000→2000, DB_COMMIT_BATCH_SIZE 10000→2000.
Queue setup & worker lifecycle
jobs/initJobs.ts, jobs/workers.ts
Create/obliterate queues (pricesSync, blockchainSync, clientPaymentCleanup) at startup; enqueue jobs with removeOnComplete: true/removeOnFail: true; worker failure handler now awaits multiBlockchainClient.destroy() on error.
Chronik client & mempool processing
services/chronikService.ts, tests/unittests/chronikService.test.ts
Added fetchTxWithRetry(txid, tries, delayMs) with backoff; rewrote parallel/address-batch processing and mempool counters; isolated batch error handling and refined FINAL vs non-FINAL commit/flush semantics; added/updated WS and TTL tests.
Price service & seeds
services/priceService.ts, prisma-local/seeds/prices.ts, scripts/updateAllPriceConnections.ts
Added generic withRetries; changed public signatures/overloads for price fetchers; renamed prismaprismaTx, added tryRenewing, renewPricesForTimestamp now returns Promise<boolean>; seed call updated to pass extra arg; scripts now include networkId in selections.
Transaction batching & network context
services/transactionService.ts, tests/unittests/transactionService.test.ts
Introduced TransactionWithNetwork/includeNetwork; refactored connectTransactionsListToPrices to bulk-fetch per-network and validate CAD+USD presence; createManyTransactions batches upserts (size 50); tests updated to use findUniqueOrThrow.
Trigger logs and utilities
services/triggerService.ts, services/userService.ts, utils/validators.ts
Trigger log prefix changed to "BATCH TRIGGER"; DER private-key assembly refactored to manual Uint8Array→Buffer; signPostData now encodes payloads as Uint8Array for crypto.sign.
Tests & deps
tests/unittests/*, package.json
Added/expanded unit tests (Chronik/transaction flows, mempool/retries, WS matrix); bumped devDependency @types/jest ^27.5.1→^30.0.0.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Init as initJobs()
  participant Queue as BullMQ
  participant Worker as sync worker
  Init->>Queue: create & obliterate queues (`pricesSync`, `blockchainSync`, `clientPaymentCleanup`)
  Init->>Queue: add jobs with removeOnComplete/removeOnFail
  Queue->>Worker: dispatch job
  alt success
    Worker->>Queue: complete (auto-removed)
  else fail
    Worker->>Worker: log error
    Worker->>Worker: await multiBlockchainClient.destroy()
    Worker->>Queue: fail (auto-removed)
  end
Loading
sequenceDiagram
  autonumber
  participant Worker as sync worker
  participant Chronik as ChronikClient
  participant Retry as fetchTxWithRetry()
  participant DB as DB/Commit
  participant Logger as Logger

  Worker->>Chronik: WS message requests tx
  Chronik->>Retry: fetchTxWithRetry(txid)
  alt transient 404 / error
    Retry->>Retry: backoff & retry
  end
  Retry-->>Worker: tx or throw
  alt tx obtained
    Worker->>DB: enqueue for batched commit (non-FINAL / FINAL)
    DB->>Logger: emit commit and triggers
  else thrown
    Worker->>Logger: log error
    Worker->>Worker: ensure mempool counter decremented (finally)
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Potential attention points:

  • Chronik: fetchTxWithRetry correctness, mempool counter decrement in all paths, batch isolation and FINAL commit behavior.
  • Transaction/price bulk-join logic and network-aware mapping/validation.
  • Job lifecycle changes: queue obliterate on startup and safe destruction of multiBlockchainClient.
  • Tests: ensure updated mocks (tx mock, findUniqueOrThrow) align with new behavior.

Possibly related PRs

Suggested reviewers

  • lissavxo
  • Klakurka

"🐰 I shrank retries and trimmed the batch,
reset the queues to clear the cache,
retried a tx till truth came through,
joined prices by network, neat and true,
I log and hop — a tidy patch! 🥕"

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title Check ✅ Passed The PR title "fix: timeout prisma, mempool race conditions & freezing of tx processing upon erroring" is directly related to the main changes in the changeset. It clearly summarizes the primary objectives: addressing Prisma timeout issues, fixing a mempool race condition, and resolving a bug where transaction processing freezes on errors. The title is concise, uses the conventional "fix:" prefix, and provides enough specificity that a teammate scanning history would understand the core issues being addressed.
Description Check ✅ Passed The PR description follows the template structure with required sections present: it includes a Description section with four clearly numbered points explaining each fix, and a comprehensive Test plan section with detailed reproduction steps and patches for verifying the fixes. However, the "Related to #" field is empty and lacks an issue number link, which is a minor metadata gap. The core content is well-documented and substantive, with specific test instructions that demonstrate good understanding of the changes.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/timeout-prisma

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3637b2f and 7704f33.

📒 Files selected for processing (1)
  • tests/unittests/chronikService.test.ts (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/unittests/chronikService.test.ts (4)
services/chronikService.ts (1)
  • ChronikBlockchainClient (118-945)
services/addressService.ts (1)
  • fetchAddressesArray (189-201)
services/transactionService.ts (3)
  • fetchUnconfirmedTransactions (612-620)
  • deleteTransactions (622-630)
  • upsertTransaction (374-406)
services/triggerService.ts (1)
  • executeAddressTriggers (234-243)
🪛 ESLint
tests/unittests/chronikService.test.ts

[error] 1323-1324: More than 1 blank line not allowed.

(no-multiple-empty-lines)


[error] 1332-1332: Extra space before value for key 'confirmed'.

(key-spacing)


[error] 1387-1387: Require statement not part of import statement.

(@typescript-eslint/no-var-requires)


[error] 1429-1429: Require statement not part of import statement.

(@typescript-eslint/no-var-requires)


[error] 1430-1430: Require statement not part of import statement.

(@typescript-eslint/no-var-requires)


[error] 1481-1481: Expected a line break after this opening brace.

(object-curly-newline)


[error] 1485-1485: Expected a line break before this closing brace.

(object-curly-newline)


[error] 1485-1485: A space is required before '}'.

(object-curly-spacing)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run Tests
🔇 Additional comments (7)
tests/unittests/chronikService.test.ts (7)

23-24: LGTM: Essential mock for tx method.

Adding tx: jest.fn() to the ChronikClient mock ensures tests can properly mock transaction fetching behavior, which is exercised by the new regression tests.


333-333: LGTM: Correct mock cleanup approach.

Changing to jest.clearAllMocks() is appropriate here since the mocks are defined at module level with jest.mock(). Using resetModules() would clear the module cache and undo the mock setup.


1275-1297: LGTM: Critical regression test for mempool counter leak.

This test correctly verifies that mempoolTxsBeingProcessed is always decremented in the finally block, even when errors occur. This prevents the freeze bug described in the PR objectives.


1299-1321: LGTM: Validates retry logic for transient 404 errors.

The test properly verifies that fetchTxWithRetry retries on "not found in the index" errors and eventually succeeds, addressing the mempool race condition mentioned in the PR objectives.


1346-1385: LGTM: Well-structured test suite setup.

The test setup properly initializes the client, disables initialization delays with setInitialized(), and mocks all dependencies to enable isolated unit testing of message handling logic.


1467-1474: LGTM: Validates deduplication logic.

This test correctly verifies that isAlreadyBeingProcessed prevents duplicate processing when a transaction is already being handled.


1503-1519: LGTM: Complete coverage of remaining message types.

The BLK_FINALIZED and Error message type tests properly validate block synchronization and error logging behavior, completing the comprehensive message handling test matrix.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
prisma-local/seeds/prices.ts (1)

96-103: Timezone mismatch: store daily CSV timestamps in UTC to match flattenTimestamp.

Prices are later fetched by UTC‑flattened timestamp. Using moment(price.date).unix() stores local midnight, which can be off by a day and break joins.

Apply this diff:

-        timestamp: moment(price.date).unix(),
+        // Store price-day at UTC midnight to align with flattenTimestamp()
+        timestamp: moment.utc(price.date, PRICE_API_DATE_FORMAT).unix(),
services/priceService.ts (1)

81-85: Unify day handling in UTC to prevent off‑by‑one price lookups and mislinked rows.

Local time is used in multiple places while flattenTimestamp is UTC. This can fetch the wrong API day and store prices under the wrong timestamp.

Apply these diffs:

--- a/services/priceService.ts
+++ b/services/priceService.ts
@@
 function getPriceURLForDayAndNetworkTicker (day: moment.Moment, networkTicker: string): string {
   validatePriceAPIUrlAndToken()
   validateNetworkTicker(networkTicker)
-  return `${config.priceAPIURL}/pricebydate/${process.env.PRICE_API_TOKEN!}/${networkTicker}+${day.format(PRICE_API_DATE_FORMAT)}`
+  // Always use UTC day when addressing the Price API
+  const dayUTC = day.clone().utc()
+  return `${config.priceAPIURL}/pricebydate/${process.env.PRICE_API_TOKEN!}/${networkTicker}+${dayUTC.format(PRICE_API_DATE_FORMAT)}`
 }
@@
-    await Promise.all(
+    await Promise.all(
       allXECPrices
         .filter(p => daysToRetrieve.includes(p.day))
-        .map(async price => await upsertPricesForNetworkId(price, XEC_NETWORK_ID, moment(price.day).unix()))
+        // Persist daily price at UTC midnight
+        .map(async price => await upsertPricesForNetworkId(price, XEC_NETWORK_ID, moment.utc(price.day, PRICE_API_DATE_FORMAT).unix()))
     )
@@
-    await Promise.all(
+    await Promise.all(
       allBCHPrices
         .filter(p => daysToRetrieve.includes(p.day))
-        .map(async price => await upsertPricesForNetworkId(price, BCH_NETWORK_ID, moment(price.day).unix()))
+        // Persist daily price at UTC midnight
+        .map(async price => await upsertPricesForNetworkId(price, BCH_NETWORK_ID, moment.utc(price.day, PRICE_API_DATE_FORMAT).unix()))
     )
@@
 async function renewPricesForTimestamp (timestamp: number): Promise<boolean> {
   try {
-    const xecPrice = await getPriceForDayAndNetworkTicker(moment(timestamp * 1000), NETWORK_TICKERS.ecash)
+    const xecPrice = await getPriceForDayAndNetworkTicker(moment.unix(timestamp).utc(), NETWORK_TICKERS.ecash)
     await upsertPricesForNetworkId(xecPrice, XEC_NETWORK_ID, timestamp)
 
-    const bchPrice = await getPriceForDayAndNetworkTicker(moment(timestamp * 1000), NETWORK_TICKERS.bitcoincash)
+    const bchPrice = await getPriceForDayAndNetworkTicker(moment.unix(timestamp).utc(), NETWORK_TICKERS.bitcoincash)
     await upsertPricesForNetworkId(bchPrice, BCH_NETWORK_ID, timestamp)

Also applies to: 268-275, 173-191

🧹 Nitpick comments (8)
services/chronikService.ts (1)

817-883: Consider extracting duplicated commit logic.

The commit and trigger logic is duplicated between batch processing (lines 817-843) and final flush (lines 857-883). Consider extracting to a helper method to improve maintainability:

private async commitBatchAndTrigger(
  commitPairs: RowWithRaw[],
  productionAddressesIds: string[],
  runTriggers: boolean,
  isFinal: boolean
): Promise<TransactionWithAddressAndPrices[]> {
  const rows = commitPairs.map(p => p.row)
  const createdTxs = await createManyTransactions(rows)
  console.log(`${this.CHRONIK_MSG_PREFIX} committed${isFinal ? ' FINAL' : ''} — created=${createdTxs.length}`)
  
  const createdForProd = createdTxs.filter(t => productionAddressesIds.includes(t.addressId))
  if (createdForProd.length > 0) {
    await appendTxsToFile(createdForProd as unknown as Prisma.TransactionCreateManyInput[])
  }
  
  if (createdTxs.length > 0) {
    const rawByHash = new Map(commitPairs.map(p => [p.raw.txid, p.raw]))
    const triggerBatch: BroadcastTxData[] = []
    for (const createdTx of createdTxs) {
      const raw = rawByHash.get(createdTx.hash)
      if (raw == null) continue
      const bd = this.broadcastIncomingTx(createdTx.address.address, raw, createdTx)
      triggerBatch.push(bd)
    }
    if (runTriggers && triggerBatch.length > 0) {
      await executeTriggersBatch(triggerBatch, this.networkId)
    }
  }
  
  return createdTxs
}
constants/index.ts (1)

273-276: Extract chronik retry parameters as constants in constants/index.ts for consistency.

The retry logic exists at services/chronikService.ts:594 with hardcoded defaults (tries = 3, delayMs = 400), but these values are not extracted as constants. Since the PR adds related constants (MEMPOOL_PROCESS_DELAY, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME), consider adding CHRONIK_TX_RETRY_ATTEMPTS and CHRONIK_TX_RETRY_DELAY_MS for consistency and maintainability.

services/priceService.ts (3)

147-153: Harden mapping to exclude non‑day entries.

If the API includes extra keys (e.g., success), Object.entries will incorrectly map them. Filter by objects containing both price fields.

-      return Object.entries<IResponseData>(res.data).map(([day, priceData]) => ({ day, ...priceData }))
+      const entries = Object.entries(res.data)
+        .filter(([, v]) => v && typeof v === 'object' && 'Price_in_CAD' in v && 'Price_in_USD' in v)
+      return entries.map(([day, priceData]) => ({ day, ...(priceData as IResponseData) }))

114-120: Improve retry context for logs.

Add networkTicker/day to retry context for easier tracing.

 export async function getPriceForDayAndNetworkTicker (day: moment.Moment, networkTicker: string): Promise<IResponseData> {
   return await withRetries(async () => {
@@
-  }, { context: { day } })
+  }, { context: { networkTicker, day: day.clone().utc().format(PRICE_API_DATE_FORMAT) } })
 }

93-112: Optional: add backoff to withRetries and guard zero retries.

Small resilience boost and safer default.

-async function withRetries<T> (
-  fn: () => Promise<T>,
-  { maxRetries = PRICE_API_MAX_RETRIES, throwOnFailure = true, context = {} } = {}
-): Promise<T | null> {
+async function withRetries<T> (
+  fn: () => Promise<T>,
+  { maxRetries = PRICE_API_MAX_RETRIES, throwOnFailure = true, context = {}, delayMs = 0 } = {}
+): Promise<T | null> {
   let lastError: any
-  for (let attempt = 1; attempt <= maxRetries; attempt++) {
+  const retries = Math.max(1, maxRetries)
+  for (let attempt = 1; attempt <= retries; attempt++) {
     try {
       return await fn()
     } catch (error) {
       lastError = error
       console.error(`[Retry ${attempt}/${maxRetries}] ${String(error)}`, { ...context })
-      if (attempt < maxRetries) continue
+      if (attempt < retries) {
+        if (delayMs > 0) await new Promise(r => setTimeout(r, delayMs))
+        continue
+      }
       if (throwOnFailure) throw lastError
       console.error(`[Retry ${attempt}/${maxRetries}] skipping error:`)
       console.error(String(lastError))
       return null
     }
   }
   return null
 }
services/transactionService.ts (3)

641-654: Generalize “irregular prices” to any count != N_OF_QUOTES.

Current equals:1 misses duplicates or future quote expansions.

 export async function fetchAllTransactionsWithIrregularPrices (): Promise<TransactionWithNetwork[]> {
-  const grouped = await prisma.pricesOnTransactions.groupBy({
+  const grouped = await prisma.pricesOnTransactions.groupBy({
     by: ['transactionId'],
     _count: { transactionId: true },
-    having: { transactionId: { _count: { equals: 1 } } }
+    having: { transactionId: { _count: { not: N_OF_QUOTES } } }
   })

398-399: Type nit: accept both Prisma.TransactionClient and PrismaClient.

You pass prisma (client) here; widen the fetchPricesForNetworkAndTimestamp param type for clarity.

-export async function fetchPricesForNetworkAndTimestamp (
+export async function fetchPricesForNetworkAndTimestamp (
   networkId: number,
   timestamp: number,
-  prismaTx: Prisma.TransactionClient,
+  prismaTx: Prisma.TransactionClient | typeof prisma,
   tryRenewing = true
 ): Promise<AllPrices> {

520-531: Operational note: transaction can grow large; consider chunking deleteMany.

Deleting for thousands of txs at once may exceed statement timeouts on some DBs. Optional: split ids in chunks similar to inserts.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between af72ccc and b17fb30.

⛔ Files ignored due to path filters (2)
  • prisma-local/seeds/prices.csv is excluded by !**/*.csv
  • prisma-local/seeds/productionTxs.csv is excluded by !**/*.csv
📒 Files selected for processing (9)
  • constants/index.ts (2 hunks)
  • jobs/initJobs.ts (2 hunks)
  • jobs/workers.ts (1 hunks)
  • prisma-local/seeds/prices.ts (1 hunks)
  • scripts/updateAllPriceConnections.ts (1 hunks)
  • services/chronikService.ts (12 hunks)
  • services/priceService.ts (6 hunks)
  • services/transactionService.ts (10 hunks)
  • services/triggerService.ts (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (6)
jobs/workers.ts (1)
services/chronikService.ts (1)
  • multiBlockchainClient (1164-1164)
jobs/initJobs.ts (2)
redis/clientInstance.ts (1)
  • redisBullMQ (54-54)
jobs/workers.ts (1)
  • syncBlockchainAndPricesWorker (34-66)
prisma-local/seeds/prices.ts (1)
services/priceService.ts (1)
  • getAllPricesByNetworkTicker (143-154)
services/transactionService.ts (2)
services/priceService.ts (3)
  • fetchPricesForNetworkAndTimestamp (241-266)
  • flattenTimestamp (9-13)
  • AllPrices (21-24)
constants/index.ts (6)
  • PRICES_CONNECTION_BATCH_SIZE (291-291)
  • CAD_QUOTE_ID (152-152)
  • USD_QUOTE_ID (151-151)
  • HUMAN_READABLE_DATE_FORMAT (165-165)
  • PRICES_CONNECTION_TIMEOUT (293-293)
  • UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT (200-200)
services/chronikService.ts (5)
constants/index.ts (6)
  • CHRONIK_MESSAGE_CACHE_DELAY (229-229)
  • INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY (282-282)
  • TX_EMIT_BATCH_SIZE (283-283)
  • MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME (273-273)
  • MEMPOOL_PROCESS_DELAY (275-275)
  • DB_COMMIT_BATCH_SIZE (284-284)
services/transactionService.ts (2)
  • upsertTransaction (374-406)
  • createManyTransactions (554-607)
services/triggerService.ts (2)
  • executeAddressTriggers (234-243)
  • executeTriggersBatch (501-606)
prisma-local/seeds/transactions.ts (1)
  • appendTxsToFile (12-20)
ws-service/types.ts (1)
  • BroadcastTxData (5-9)
services/priceService.ts (2)
constants/index.ts (10)
  • PRICE_API_MAX_RETRIES (169-169)
  • PRICE_API_TIMEOUT (168-168)
  • RESPONSE_MESSAGES (8-108)
  • PRICE_API_DATE_FORMAT (167-167)
  • NETWORK_TICKERS (181-184)
  • XEC_NETWORK_ID (148-148)
  • BCH_NETWORK_ID (149-149)
  • N_OF_QUOTES (153-153)
  • USD_QUOTE_ID (151-151)
  • CAD_QUOTE_ID (152-152)
prisma-local/seeds/prices.ts (1)
  • getPrices (87-108)
🪛 GitHub Actions: Pull Request Tests
services/priceService.ts

[error] 250-250: The provided database string is invalid. Error parsing connection string: invalid port number in database URL.

🔇 Additional comments (19)
services/triggerService.ts (3)

523-523: Clearer batch context and quantified metrics in preparation log.

The change adds currency context and transaction/address counts, improving observability for batch execution. This aligns with the PR objective to clarify logging.


554-554: Detailed task breakdown improves visibility into trigger execution.

The updated log clearly delineates post vs. email task counts and affected user counts, making it easier to monitor batch execution characteristics and identify distribution anomalies.


605-605: Summary metrics in completion log provide actionable completion status.

The updated log includes logs created, users charged, and breakdown of accepted triggers (email vs. post), enabling quick assessment of batch execution results and success rates.

scripts/updateAllPriceConnections.ts (1)

61-62: LGTM – adding network context to price data.

The addition of networkId to the price selection aligns with the PR's broader introduction of network-aware price data. While networkId isn't directly consumed in this script's logic (line 82 only uses timestamp), it's included in the data passed to connectTransactionsListToPrices (line 90), which may require or benefit from network context.

jobs/workers.ts (1)

58-65: LGTM! Good symmetric cleanup on failure.

The failure handler now mirrors the completed handler by wrapping in an async IIFE and calling multiBlockchainClient.destroy(). This ensures proper resource cleanup regardless of job outcome.

services/chronikService.ts (7)

183-201: LGTM! Simplified timestamp arithmetic.

Replacing moment-based time diff with direct arithmetic (now - timestamp) is more efficient and clearer. The age calculation is correct.


594-606: LGTM! Well-implemented retry logic for mempool race conditions.

The exponential backoff strategy (400ms, 800ms, 1600ms) with 404-specific handling correctly addresses the mempool race condition described in the PR objectives. The logic properly distinguishes between "not found yet" (404) and genuine errors.


633-663: LGTM! Critical fix for transaction processing freeze.

Wrapping the entire mempool processing path in try/catch/finally directly addresses the freeze bug described in the PR objectives. The finally block ensures mempoolTxsBeingProcessed is always decremented, preventing the counter from becoming permanently stuck and blocking future transactions.

Key improvements:

  • Line 643: Uses fetchTxWithRetry() to handle mempool race conditions
  • Lines 658-662: finally block guarantees cleanup even on error
  • Defensive Math.max(0, ...) prevents negative counter values

673-674: LGTM! Improved observability.

Adding subscription count logging helps monitor the health of address subscriptions after block processing completes.


301-410: LGTM! Enhanced progress tracking and clearer logging.

The improvements to logging and progress counters significantly enhance observability during parallel fetching:

  • Lines 320-326: Progress counters show sync state
  • Lines 336-383: Per-address transaction counts
  • Lines 393-404: Clarified comments explain batch yielding and final flush logic

801-853: LGTM! Robust batch-scoped error handling.

The introduction of involvedAddrIds (line 801) and batch-scoped try/catch (lines 803-853) is excellent. When a batch fails:

  • Only addresses in that specific batch are marked as failed (lines 847-851)
  • Processing continues with remaining batches (line 852)
  • Other addresses aren't penalized by one batch's failure

This directly addresses the Prisma timeout issues mentioned in the PR objectives by containing failures and allowing partial success.


892-902: LGTM! Clearer error and completion messages.

The logging improvements make error severity and completion status more explicit:

  • Line 892: "FATAL ERROR in parallel sync" clearly indicates scope and severity
  • Line 902: Removed potentially confusing "PARALLEL" prefix from completion message
constants/index.ts (2)

282-284: Concurrency and batch size reductions align with fixing Prisma timeouts.

These reductions (75-80%) should effectively address Prisma timeout errors by reducing parallel database load and committing smaller batches more frequently. All three constants are used correctly throughout the codebase for their intended batching and concurrency control purposes. Since the PR indicates these changes ran successfully in production for weeks, the performance trade-off appears acceptable.


169-169: Verify: Confirm the intentional trade-off of eliminating price API retries.

Based on code analysis, PRICE_API_MAX_RETRIES = 1 means zero actual retries—only a single attempt before failure. This is a significant reduction in resilience compared to SYNC_TXS_JOBS_MAX_RETRIES = 3. No documentation in the code explains this choice.

While this may intentionally prevent cascading timeouts, it makes the price fetch fail immediately on any transient API issue. Please confirm this trade-off aligns with your PR objectives and that the price API is sufficiently reliable to warrant this reduced tolerance.

jobs/initJobs.ts (3)

32-39: Cleanup configuration aligns with the fresh-start strategy.

The addition of removeOnComplete: true and removeOnFail: true ensures no job history accumulates. This is consistent with the obliterate-on-startup approach and helps prevent stale job state from interfering with new executions.

This configuration pairs well with the worker's cleanup logic (in workers.ts lines 33-65) that destroys the multiBlockchainClient on both completion and failure.


47-48: Standard cleanup configuration for repeating jobs.

Adding removeOnComplete: true and removeOnFail: true to this repeating job prevents unbounded accumulation of job history records. This is a best practice for long-running repeating jobs and aligns with the overall cleanup strategy.


10-17: No issues found after verification — resolve as approved.

The obliterate behavior is acceptable because both syncMissedTransactions() and connectAllTransactionsToPrices() are idempotent operations:

  • They query for work to do (addresses to sync, transactions needing price connections) rather than resuming from checkpoints
  • Restarting mid-process simply re-fetches and re-processes the same data
  • The inline comment "// --- force fresh start ---" already documents the intent

This design intentionally sacrifices job history and aggressive cleanup to fix the "permanently block" bug by ensuring a clean state on every startup. The trade-off is acceptable.

prisma-local/seeds/prices.ts (1)

36-36: Seed call should throw on failures — good change.

Passing throwOnFailure=true is appropriate for seeding; it surfaces API issues early.

services/transactionService.ts (1)

559-595: Good: batched upserts inside a single transaction with a timeout.

BATCH_SIZE=50 and timeout guard match the PR goal to reduce Prisma timeouts.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b17fb30 and af60833.

📒 Files selected for processing (2)
  • tests/unittests/chronikService.test.ts (2 hunks)
  • tests/unittests/transactionService.test.ts (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/unittests/transactionService.test.ts (2)
prisma-local/mockedClient.ts (1)
  • prismaMock (19-19)
tests/mockedObjects.ts (1)
  • mockedUSDPriceOnTransaction (384-398)
🪛 ESLint
tests/unittests/chronikService.test.ts

[error] 1323-1324: More than 1 blank line not allowed.

(no-multiple-empty-lines)


[error] 1332-1332: Extra space before value for key 'confirmed'.

(key-spacing)

🔇 Additional comments (4)
tests/unittests/transactionService.test.ts (1)

51-52: LGTM! Improved error handling with findUniqueOrThrow.

The change from findUnique to findUniqueOrThrow provides better fail-fast behavior when price data is missing, making error conditions more explicit rather than silently continuing with null values.

tests/unittests/chronikService.test.ts (3)

24-24: LGTM! Mock addition enables transaction-related testing.

The tx mock addition to the ChronikClient enables the new regression tests to validate transaction fetching and retry behavior.


1274-1297: Excellent regression test for counter leak prevention.

This test validates the critical fix for the transaction processing freeze bug by ensuring the mempoolTxsBeingProcessed counter is properly decremented in the finally block even when errors occur. The test correctly:

  • Simulates a failing chronik.tx call
  • Verifies counter returns to 0 after error
  • Confirms subsequent transactions can still be processed

1299-1321: Strong validation of retry logic for mempool race conditions.

This test effectively validates the fetchTxWithRetry method handles transient 404 errors (mempool race conditions) by retrying until success. The use of setImmediate to wait for async constructor initialization is good practice.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (4)
services/userService.ts (1)

75-80: Simplify by using Buffer.concat directly.

The manual Uint8Array construction with runtime type checks is unnecessarily complex. Both prefixPrivateEd25519 (line 74) and seed (line 69) are always Buffer instances, so the instanceof Uint8Array checks are redundant. The original Buffer.concat([prefixPrivateEd25519, seed]) would be simpler and equally correct.

Apply this diff to simplify:

-
-  const der = new Uint8Array(prefixPrivateEd25519.length + seed.length)
-  der.set(prefixPrivateEd25519 instanceof Uint8Array ? prefixPrivateEd25519 : new Uint8Array(prefixPrivateEd25519))
-  der.set(seed instanceof Uint8Array ? seed : new Uint8Array(seed), prefixPrivateEd25519.length)
-  const derBuf = Buffer.from(der)
-  return crypto.createPrivateKey({ key: derBuf, format: 'der', type: 'pkcs8' })
+  const der = Buffer.concat([prefixPrivateEd25519, seed])
+  return crypto.createPrivateKey({ key: der, format: 'der', type: 'pkcs8' })
utils/validators.ts (1)

251-258: Remove dead code branch.

The conditional check at line 252 is unnecessary because payload is always a string (returned by getSignaturePayload at line 245 via .join('+')). The non-string branch (line 254) with the as unknown as Uint8Array cast is unreachable dead code. Both Buffer.from(payload) and TextEncoder().encode(payload) use UTF-8 encoding for strings, so the encoding behavior is equivalent.

Apply this diff to remove the dead code:

-
-  const data: Uint8Array = typeof payload === 'string'
-    ? new TextEncoder().encode(payload)
-    : payload as unknown as Uint8Array
-
   const signature = crypto.sign(
     null,
-    data,
+    Buffer.from(payload),
     pk
   )

Alternatively, if you prefer TextEncoder:

-
-  const data: Uint8Array = typeof payload === 'string'
-    ? new TextEncoder().encode(payload)
-    : payload as unknown as Uint8Array
-
+  const data = new TextEncoder().encode(payload)
   const signature = crypto.sign(
     null,
     data,
     pk
   )
services/chronikService.ts (2)

594-608: Resilient fetchTxWithRetry with backoff — nice.

Small nit: consider console.warn for expected 404-retry logs to reduce noise in error monitoring.


725-733: Deduplicate related addresses before DB lookups.

Avoid redundant queries and processing:

-    const inputAddresses = transaction.inputs.map(inp => outputScriptToAddress(this.networkSlug, inp.outputScript))
-    const outputAddresses = transaction.outputs.map(out => outputScriptToAddress(this.networkSlug, out.outputScript))
-    return [...inputAddresses, ...outputAddresses].filter(a => a !== undefined)
+    const inputAddresses = transaction.inputs.map(inp => outputScriptToAddress(this.networkSlug, inp.outputScript))
+    const outputAddresses = transaction.outputs.map(out => outputScriptToAddress(this.networkSlug, out.outputScript))
+    return Array.from(
+      new Set(
+        [...inputAddresses, ...outputAddresses].filter((a): a is string => a !== undefined)
+      )
+    )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5394bf9 and c2b0ccc.

⛔ Files ignored due to path filters (1)
  • yarn.lock is excluded by !**/yarn.lock, !**/*.lock
📒 Files selected for processing (5)
  • package.json (1 hunks)
  • services/chronikService.ts (12 hunks)
  • services/userService.ts (1 hunks)
  • tests/unittests/chronikService.test.ts (2 hunks)
  • utils/validators.ts (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
services/chronikService.ts (5)
constants/index.ts (6)
  • CHRONIK_MESSAGE_CACHE_DELAY (229-229)
  • INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY (282-282)
  • TX_EMIT_BATCH_SIZE (283-283)
  • MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME (273-273)
  • MEMPOOL_PROCESS_DELAY (275-275)
  • DB_COMMIT_BATCH_SIZE (284-284)
services/transactionService.ts (2)
  • upsertTransaction (374-406)
  • createManyTransactions (557-610)
services/triggerService.ts (2)
  • executeAddressTriggers (234-243)
  • executeTriggersBatch (501-606)
prisma-local/seeds/transactions.ts (1)
  • appendTxsToFile (12-20)
ws-service/types.ts (1)
  • BroadcastTxData (5-9)
tests/unittests/chronikService.test.ts (3)
services/addressService.ts (1)
  • fetchAddressesArray (189-201)
services/transactionService.ts (3)
  • fetchUnconfirmedTransactions (612-620)
  • deleteTransactions (622-630)
  • upsertTransaction (374-406)
services/triggerService.ts (1)
  • executeAddressTriggers (234-243)
🪛 ESLint
tests/unittests/chronikService.test.ts

[error] 1323-1324: More than 1 blank line not allowed.

(no-multiple-empty-lines)


[error] 1332-1332: Extra space before value for key 'confirmed'.

(key-spacing)


[error] 1387-1387: Require statement not part of import statement.

(@typescript-eslint/no-var-requires)


[error] 1429-1429: Require statement not part of import statement.

(@typescript-eslint/no-var-requires)


[error] 1430-1430: Require statement not part of import statement.

(@typescript-eslint/no-var-requires)


[error] 1481-1481: Expected a line break after this opening brace.

(object-curly-newline)


[error] 1485-1485: Expected a line break before this closing brace.

(object-curly-newline)


[error] 1485-1485: A space is required before '}'.

(object-curly-spacing)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run Tests
🔇 Additional comments (3)
tests/unittests/chronikService.test.ts (1)

1273-1344: Great regression coverage for mempool, retries, TTL, and WS matrix.

The tests exercise the retry/backoff, mempool counter finally, and block finalization flow well.

Also applies to: 1347-1520

services/chronikService.ts (2)

183-201: TTL unit mismatch risk in clearOldMessages (seconds vs milliseconds).

You store timestamps via moment().unix() (seconds) and compare ageDiff to CHRONIK_MESSAGE_CACHE_DELAY. If that constant is in milliseconds (e.g., 3000 = 3s), entries will live ~50 minutes. Either rename the constant to clarify units or convert consistently.

Option A: keep seconds everywhere, convert constant:

-      const ageDiff = now - Number(this.lastProcessedMessages.unconfirmed[key])
-      if (ageDiff > CHRONIK_MESSAGE_CACHE_DELAY) {
+      const ageSec = now - Number(this.lastProcessedMessages.unconfirmed[key])
+      if (ageSec > CHRONIK_MESSAGE_CACHE_DELAY / 1000) {
         const { [key]: _, ...rest } = this.lastProcessedMessages.unconfirmed
         this.lastProcessedMessages.unconfirmed = rest
       }

Repeat for the confirmed loop. Alternatively, switch to Date.now() and store ms.


635-663: Mempool handler finally-decrements counter — good safeguard.

This prevents freezing under exceptions and caps concurrency via MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME.

},
"devDependencies": {
"@types/jest": "^27.5.1",
"@types/jest": "^30.0.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Align Jest types with Jest/ts-jest versions.

@types/jest is ^30 while jest is ^29.7 and ts-jest is ^29.2.5. This can cause TS inference/compile breakage.

Two safe paths:

  • Keep Jest 29: downgrade types.
-    "@types/jest": "^30.0.0",
+    "@types/jest": "^29.5.12",
  • Or upgrade the stack together (only if you intend to move): bump jest and ts-jest to matching 30.x.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"@types/jest": "^30.0.0",
"@types/jest": "^29.5.12",
🤖 Prompt for AI Agents
In package.json around line 75, @types/jest is ^30 while jest is ^29.7 and
ts-jest is ^29.2.5 causing type/compile mismatch; fix by either downgrading
@types/jest to a 29.x version that aligns with jest (e.g., ^29.0.0) or by
upgrading both jest and ts-jest to matching 30.x versions (bump jest and ts-jest
to 30.x, update any config if required), then run install and the test/type
checks to verify no remaining type errors.

Comment on lines +1323 to +1344

it('clearOldMessages expires entries by TTL and from the correct maps', () => {
process.env.WS_AUTH_KEY = 'test-auth-key'
const client = new ChronikBlockchainClient('ecash')

const nowSec = Math.floor(Date.now() / 1000)
// build state: one old and one fresh in each map
;(client as any).lastProcessedMessages = {
unconfirmed: { u_old: nowSec - 999999, u_new: nowSec - 1 },
confirmed: { c_old: nowSec - 999999, c_new: nowSec - 1 }
}

// run cleanup
;(client as any).clearOldMessages()

// old entries should be gone, fresh ones should remain
expect((client as any).lastProcessedMessages.unconfirmed.u_old).toBeUndefined()
expect((client as any).lastProcessedMessages.confirmed.c_old).toBeUndefined()
expect((client as any).lastProcessedMessages.unconfirmed.u_new).toBeDefined()
expect((client as any).lastProcessedMessages.confirmed.c_new).toBeDefined()
})
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix ESLint formatting: blank lines, key spacing, object-curly newlines/spacing.

Apply these minimal fixes:

  • Collapse extra blank line and fix key spacing.
-    ;(client as any).lastProcessedMessages = {
-      unconfirmed: { u_old: nowSec - 999999, u_new: nowSec - 1 },
-      confirmed:   { c_old: nowSec - 999999, c_new: nowSec - 1 }
-    }
+    ;(client as any).lastProcessedMessages = {
+      unconfirmed: { u_old: nowSec - 999999, u_new: nowSec - 1 },
+      confirmed: { c_old: nowSec - 999999, c_new: nowSec - 1 }
+    }
  • Conform object-curly-newline/spacing when overriding client.chronik.
-    ;(client.chronik as any) = { tx: jest.fn(async () => {
-      attempts += 1
-      if (attempts < 3) throw new Error('Transaction not found in the index')
-      return { txid: 'tx404', inputs: [], outputs: [] }
-    })}
+    ;(client.chronik as any) = {
+      tx: jest.fn(async () => {
+        attempts += 1
+        if (attempts < 3) throw new Error('Transaction not found in the index')
+        return { txid: 'tx404', inputs: [], outputs: [] }
+      })
+    }

Also applies to: 1481-1485

🧰 Tools
🪛 ESLint

[error] 1323-1324: More than 1 blank line not allowed.

(no-multiple-empty-lines)


[error] 1332-1332: Extra space before value for key 'confirmed'.

(key-spacing)

🤖 Prompt for AI Agents
In tests/unittests/chronikService.test.ts around lines 1323-1344 (also apply
same fixes at 1481-1485): remove the extra blank line, normalize key spacing so
object keys align as { u_old: ..., u_new: ... } and { c_old: ..., c_new: ... },
and fix object-curly-newline/spacing when overriding client.chronik (ensure
braces and properties are on the same line or follow the repo's
object-curly-newline rule consistently). Make only these minimal formatting
changes to satisfy ESLint.

Comment on lines +1387 to +1395
const { fetchUnconfirmedTransactions, deleteTransactions } = require('../../services/transactionService')
fetchUnconfirmedTransactions.mockResolvedValueOnce(['tx-to-del'])
deleteTransactions.mockResolvedValueOnce(undefined)

await client.processWsMessage({ type: 'Tx', msgType: 'TX_REMOVED_FROM_MEMPOOL', txid: 'deadbeef' })

expect(fetchUnconfirmedTransactions).toHaveBeenCalledWith('deadbeef')
expect(deleteTransactions).toHaveBeenCalledWith(['tx-to-del'])
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Replace inline require with typed imports (or silence with ESLint directive).

Prefer imports for mocked modules to satisfy @typescript-eslint/no-var-requires:

Add near the top (module scope):

import * as addressService from '../../services/addressService'
import * as txService from '../../services/transactionService'
import * as triggerService from '../../services/triggerService'

Then update usages in the tests:

-    const { fetchUnconfirmedTransactions, deleteTransactions } = require('../../services/transactionService')
-    fetchUnconfirmedTransactions.mockResolvedValueOnce(['tx-to-del'])
-    deleteTransactions.mockResolvedValueOnce(undefined)
+    const fetchUnconfirmedTransactions = jest.spyOn(txService, 'fetchUnconfirmedTransactions').mockResolvedValueOnce(['tx-to-del'] as any)
+    const deleteTransactions = jest.spyOn(txService, 'deleteTransactions').mockResolvedValueOnce(undefined as any)
-    ;({ fetchAddressesArray } = require('../../services/addressService'))
-    fetchAddressesArray.mockResolvedValue([
+    fetchAddressesArray = jest.spyOn(addressService, 'fetchAddressesArray').mockResolvedValue([
       { id: 'addr-1', address: 'ecash:qqkv9...', networkId: 1, syncing: false, lastSynced: new Date().toISOString() }
     ] as any)
-    const { upsertTransaction } = require('../../services/transactionService')
-    const { executeAddressTriggers } = require('../../services/triggerService')
+    const upsertTransaction = jest.spyOn(txService, 'upsertTransaction')
+    const executeAddressTriggers = jest.spyOn(triggerService, 'executeAddressTriggers')

If you prefer to keep require, add per-line disables:

// eslint-disable-next-line @typescript-eslint/no-var-requires

Also applies to: 1429-1435

🧰 Tools
🪛 ESLint

[error] 1387-1387: Require statement not part of import statement.

(@typescript-eslint/no-var-requires)

🤖 Prompt for AI Agents
In tests/unittests/chronikService.test.ts around lines 1387 to 1395, the test
uses inline require(...) to import mocked services which violates
@typescript-eslint/no-var-requires; replace the inline require calls with
module-scope typed imports (e.g. import * as txService from
'../../services/transactionService' and likewise for addressService and
triggerService) and update the mocked calls to use the imported names (e.g.
txService.fetchUnconfirmedTransactions). Alternatively, if you must keep
require, add an eslint disable comment directly above each require line to
silence the rule. Also apply the same change to the similar block at lines
1429-1435.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
services/chronikService.ts (1)

625-634: Wrap TX_CONFIRMED handler with try/catch to prevent unhandled rejections.

The TX_CONFIRMED path is not protected by error handling. If fetchTxWithRetry exhausts retries, or if getAddressesForTransaction or handleUpdateClientPaymentStatus fail, the rejection will be unhandled since processWsMessage is called fire-and-forget (line 504). This is the same issue flagged in the previous review.

Apply this diff to add error handling:

     } else if (msg.msgType === 'TX_CONFIRMED') {
+      try {
         const transaction = await this.fetchTxWithRetry(msg.txid)
         const addressesWithTransactions = await this.getAddressesForTransaction(transaction)
         console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] ${msg.txid}`)
         this.confirmedTxsHashesFromLastBlock = [...this.confirmedTxsHashesFromLastBlock, msg.txid]
         for (const addressWithTransaction of addressesWithTransactions) {
           const { amount, opReturn } = addressWithTransaction.transaction
           await this.handleUpdateClientPaymentStatus(amount, opReturn, 'CONFIRMED' as ClientPaymentStatus, addressWithTransaction.address.address)
         }
+      } catch (e) {
+        console.error(`${this.CHRONIK_MSG_PREFIX}: confirmed handler failed for ${msg.txid}`, e)
+      }
     } else if (msg.msgType === 'TX_ADDED_TO_MEMPOOL') {
🧹 Nitpick comments (1)
services/chronikService.ts (1)

376-378: Redundant stopping condition check.

Lines 376-378 check if (newTxsInThisPage === 0 && oldestTs < lastSyncedTimestampSeconds) but line 371-373 already sets hasReachedStoppingCondition = true when oldestTs < lastSyncedTimestampSeconds. This second check is redundant and can be removed for clarity.

Apply this diff to remove the redundant check:

       if (oldestTs < lastSyncedTimestampSeconds) {
         hasReachedStoppingCondition = true
       }
 
       nextBurstBasePageIndex += 1
-      if (newTxsInThisPage === 0 && oldestTs < lastSyncedTimestampSeconds) {
-        hasReachedStoppingCondition = true
-      }
       newTxs += newTxsInThisPage
     }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c2b0ccc and 92f403c.

📒 Files selected for processing (1)
  • services/chronikService.ts (13 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
services/chronikService.ts (6)
components/Transaction/PaybuttonTransactions.tsx (1)
  • moment (40-124)
constants/index.ts (6)
  • CHRONIK_MESSAGE_CACHE_DELAY (229-229)
  • INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY (282-282)
  • TX_EMIT_BATCH_SIZE (283-283)
  • MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME (273-273)
  • MEMPOOL_PROCESS_DELAY (275-275)
  • DB_COMMIT_BATCH_SIZE (284-284)
services/transactionService.ts (2)
  • upsertTransaction (374-406)
  • createManyTransactions (557-610)
services/triggerService.ts (2)
  • executeAddressTriggers (234-243)
  • executeTriggersBatch (501-606)
prisma-local/seeds/transactions.ts (1)
  • appendTxsToFile (12-20)
ws-service/types.ts (1)
  • BroadcastTxData (5-9)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run Tests
🔇 Additional comments (7)
services/chronikService.ts (7)

594-608: LGTM! Solid retry implementation with exponential backoff.

The fetchTxWithRetry helper correctly implements exponential backoff for 404 errors (mempool race conditions), while immediately throwing for other error types. This addresses the PR's objective of fixing mempool race conditions by retrying Chronik transaction lookups.


634-664: LGTM! Robust error handling prevents tx processing freeze.

The TX_ADDED_TO_MEMPOOL handler now properly wraps all processing in try/catch/finally, ensuring that:

  1. Errors don't block subsequent transaction processing (addressing the PR's freeze bug)
  2. The mempoolTxsBeingProcessed counter is always decremented via finally block
  3. fetchTxWithRetry handles mempool race conditions with retries

The use of Math.max(0, ...) on line 662 is defensive and prevents counter corruption.


674-676: LGTM! Useful diagnostic logging.

Adding subscription count logging after block finalization helps monitor the health of WebSocket subscriptions.


802-854: LGTM! Improved error isolation in batch processing.

The addition of involvedAddrIds (line 802) and scoped error handling (lines 845-854) ensures that when a batch fails, only the addresses involved in that batch are marked as failed, allowing other addresses to continue processing. This improves resilience and addresses the Prisma timeout concerns mentioned in the PR objectives.


858-884: LGTM! Final flush properly handles remaining transactions.

The final flush logic correctly processes any remaining transactions after all address batches complete, maintaining consistency with the in-loop commit logic and properly executing triggers for newly created transactions.


892-899: LGTM! Fatal error handler preserves partial success.

The fatal error handler (lines 892-899) only marks addresses that made zero progress as failed, allowing addresses that successfully synced some transactions to be considered successful. This is a good resilience pattern.


903-903: LGTM! Clear summary logging.

The final log statement provides a clear summary of the sync operation with transaction count, address count, and error count.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (6)
services/chronikService.ts (1)

625-637: Excellent: TX_CONFIRMED error handling implemented.

The try-catch block addresses the critical issue flagged in previous reviews. Failures in fetchTxWithRetry or downstream processing are now caught and logged, preventing unhandled rejections that could stall the processing flow.

tests/unittests/chronikService.test.ts (5)

1323-1343: Fix ESLint formatting issues.

The formatting issues flagged in previous reviews remain unresolved.

Apply this diff to fix blank line and key spacing:

   })

-

   it('clearOldMessages expires entries by TTL and from the correct maps', () => {
     process.env.WS_AUTH_KEY = 'test-auth-key'
     const client = new ChronikBlockchainClient('ecash')

     const nowSec = Math.floor(Date.now() / 1000)
     // build state: one old and one fresh in each map
     ;(client as any).lastProcessedMessages = {
       unconfirmed: { u_old: nowSec - 999999, u_new: nowSec - 1 },
-      confirmed:   { c_old: nowSec - 999999, c_new: nowSec - 1 }
+      confirmed: { c_old: nowSec - 999999, c_new: nowSec - 1 }
     }

1370-1381: Replace inline require with typed imports.

The inline require statements violate @typescript-eslint/no-var-requires. As noted in previous reviews, prefer module-scope imports:

+import * as addressService from '../../services/addressService'

   beforeEach(() => {
     jest.clearAllMocks()
     // ... client setup ...
     
-    ;({ fetchAddressesArray } = require('../../services/addressService'))
+    fetchAddressesArray = jest.spyOn(addressService, 'fetchAddressesArray').mockResolvedValue([
-    fetchAddressesArray.mockResolvedValue([
       {
         id: 'addr-1',
         address: 'ecash:qqkv9wr69ry2p9l53lxp635va4h86wv435995w8p2h',
         networkId: 1,
         syncing: false,
         lastSynced: new Date().toISOString()
       }
-    ])
+    ] as any)

1428-1465: Replace inline require with typed imports.

Same issue as earlier - use module-scope imports instead of require:

+import * as transactionService from '../../services/transactionService'
+import * as triggerService from '../../services/triggerService'

 it('handles TX_ADDED_TO_MEMPOOL → calls fetchTxWithRetry and upserts, triggers once', async () => {
-  const { upsertTransaction } = require('../../services/transactionService')
-  const { executeAddressTriggers } = require('../../services/triggerService')
+  const upsertTransaction = jest.spyOn(transactionService, 'upsertTransaction')
+  const executeAddressTriggers = jest.spyOn(triggerService, 'executeAddressTriggers')

1476-1501: Fix object-curly-newline and spacing issues.

The ESLint errors flagged in previous reviews remain:

   let attempts = 0
   // drive underlying chronik.tx via the real fetchTxWithRetry
-  ;(client.chronik as any) = { tx: jest.fn(async () => {
-    attempts += 1
-    if (attempts < 3) throw new Error('Transaction not found in the index')
-    return { txid: 'tx404', inputs: [], outputs: [] }
-  })}
+  ;(client.chronik as any) = {
+    tx: jest.fn(async () => {
+      attempts += 1
+      if (attempts < 3) throw new Error('Transaction not found in the index')
+      return { txid: 'tx404', inputs: [], outputs: [] }
+    })
+  }

1386-1395: Use explicit module imports with jest.spyOn() for reliable mock setup

The implementation imports fetchUnconfirmedTransactions and deleteTransactions at module scope from './transactionService' (line 6-15 of services/chronikService.ts), but the test requires them inside the test function using '../../services/transactionService'. While both paths resolve to the same file, this creates a potential module reference mismatch.

The more reliable approach is to import the module explicitly at test scope and use jest.spyOn():

+import * as transactionService from '../../services/transactionService'
+
 it('handles TX_REMOVED_FROM_MEMPOOL → deletes unconfirmed txs', async () => {
-  const { fetchUnconfirmedTransactions, deleteTransactions } = require('../../services/transactionService')
-  fetchUnconfirmedTransactions.mockResolvedValueOnce(['tx-to-del'])
-  deleteTransactions.mockResolvedValueOnce(undefined)
+  jest.spyOn(transactionService, 'fetchUnconfirmedTransactions')
+    .mockResolvedValueOnce(['tx-to-del'] as any)
+  jest.spyOn(transactionService, 'deleteTransactions')
+    .mockResolvedValueOnce(undefined as any)

   await client.processWsMessage({ type: 'Tx', msgType: 'TX_REMOVED_FROM_MEMPOOL', txid: 'deadbeef' })

   expect(transactionService.fetchUnconfirmedTransactions).toHaveBeenCalledWith('deadbeef')
   expect(transactionService.deleteTransactions).toHaveBeenCalledWith(['tx-to-del'])
 })

Apply the same pattern to line 1429 and other tests using the require() pattern within test bodies.

🧹 Nitpick comments (3)
services/chronikService.ts (3)

594-608: Well-implemented retry logic with exponential backoff.

The retry mechanism correctly:

  • Detects 404-ish errors via regex pattern matching
  • Implements exponential backoff (1s, 2s, 4s delays)
  • Re-throws non-retryable errors immediately
  • Re-throws after exhausting retries

The pipeline "failure" showing the log at line 603 is expected behavior during retries, not an actual error.

Consider reducing log verbosity in production by changing from console.error to console.log or console.warn:

-        console.error(`Got a 404 Error trying to fetch tx ${txid} on the attempt number ${i + 1}, waiting ${(delay / 1000).toFixed(1)}s...`)
+        console.log(`${this.CHRONIK_MSG_PREFIX}: Retrying tx ${txid} (attempt ${i + 1}/${tries}), waiting ${(delay / 1000).toFixed(1)}s...`)

307-410: Improved observability and batching logic.

The updates enhance logging and progress tracking, making it easier to monitor sync operations. The batching logic properly yields when the buffer reaches TX_EMIT_BATCH_SIZE and handles final flushes correctly.

Lines 376-378 contain a stopping condition that appears redundant with the check at lines 371-372:

if (oldestTs < lastSyncedTimestampSeconds) {
  hasReachedStoppingCondition = true
}

// Later...
if (newTxsInThisPage === 0 && oldestTs < lastSyncedTimestampSeconds) {
  hasReachedStoppingCondition = true
}

Consider simplifying to:

 if (oldestTs < lastSyncedTimestampSeconds) {
   hasReachedStoppingCondition = true
 }

 nextBurstBasePageIndex += 1
-if (newTxsInThisPage === 0 && oldestTs < lastSyncedTimestampSeconds) {
-  hasReachedStoppingCondition = true
-}

806-858: Excellent scoped error handling for batch processing.

The introduction of involvedAddrIds (line 806) enables precise error isolation - only addresses in a failing batch are marked as failed, while others can continue processing. This significantly improves resilience compared to failing the entire sync on a single batch error.

The try-catch structure properly:

  • Processes transactions in parallel (lines 809-814)
  • Commits in batches when reaching DB_COMMIT_BATCH_SIZE (lines 822-848)
  • Isolates failures to affected addresses only (lines 852-856)
  • Continues processing remaining batches after errors (line 857)

Consider extracting the trigger batching logic that's duplicated between partial commits (lines 835-847) and final flush (lines 875-887):

private async processTriggerBatch(
  createdTxs: TransactionWithAddressAndPrices[],
  commitPairs: RowWithRaw[],
  runTriggers: boolean
): Promise<void> {
  if (createdTxs.length === 0) return
  
  const rawByHash = new Map(commitPairs.map(p => [p.raw.txid, p.raw]))
  const triggerBatch: BroadcastTxData[] = []
  
  for (const createdTx of createdTxs) {
    const raw = rawByHash.get(createdTx.hash)
    if (raw == null) continue
    const bd = this.broadcastIncomingTx(createdTx.address.address, raw, createdTx)
    triggerBatch.push(bd)
  }
  
  if (runTriggers && triggerBatch.length > 0) {
    await executeTriggersBatch(triggerBatch, this.networkId)
  }
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 92f403c and 3637b2f.

📒 Files selected for processing (2)
  • services/chronikService.ts (13 hunks)
  • tests/unittests/chronikService.test.ts (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/unittests/chronikService.test.ts (2)
services/addressService.ts (1)
  • fetchAddressesArray (189-201)
services/transactionService.ts (3)
  • fetchUnconfirmedTransactions (612-620)
  • deleteTransactions (622-630)
  • upsertTransaction (374-406)
services/chronikService.ts (5)
constants/index.ts (6)
  • CHRONIK_MESSAGE_CACHE_DELAY (229-229)
  • INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY (282-282)
  • TX_EMIT_BATCH_SIZE (283-283)
  • MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME (273-273)
  • MEMPOOL_PROCESS_DELAY (275-275)
  • DB_COMMIT_BATCH_SIZE (284-284)
services/transactionService.ts (2)
  • upsertTransaction (374-406)
  • createManyTransactions (557-610)
services/triggerService.ts (2)
  • executeAddressTriggers (234-243)
  • executeTriggersBatch (501-606)
prisma-local/seeds/transactions.ts (1)
  • appendTxsToFile (12-20)
ws-service/types.ts (1)
  • BroadcastTxData (5-9)
🪛 ESLint
tests/unittests/chronikService.test.ts

[error] 1323-1324: More than 1 blank line not allowed.

(no-multiple-empty-lines)


[error] 1332-1332: Extra space before value for key 'confirmed'.

(key-spacing)


[error] 1387-1387: Require statement not part of import statement.

(@typescript-eslint/no-var-requires)


[error] 1429-1429: Require statement not part of import statement.

(@typescript-eslint/no-var-requires)


[error] 1430-1430: Require statement not part of import statement.

(@typescript-eslint/no-var-requires)


[error] 1481-1481: Expected a line break after this opening brace.

(object-curly-newline)


[error] 1485-1485: Expected a line break before this closing brace.

(object-curly-newline)


[error] 1485-1485: A space is required before '}'.

(object-curly-spacing)

🪛 GitHub Actions: Pull Request Tests
tests/unittests/chronikService.test.ts

[error] 1393-1393: TX_REMOVED_FROM_MEMPOOL test failed: expected fetchUnconfirmedTransactions to be called with 'deadbeef' but it was not called (Number of calls: 0).


[error] 1424-1424: TX_CONFIRMED test did not trigger fetchTxWithRetry or payments update as expected (no calls were observed).

services/chronikService.ts

[error] 603-603: FetchTxWithRetry encountered a 404 error: Got a 404 Error trying to fetch tx txABC on the attempt number 1, waiting 1.0s...

🔇 Additional comments (8)
tests/unittests/chronikService.test.ts (4)

23-25: LGTM!

The addition of the tx: jest.fn() mock to the ChronikClient strategy enables testing of transaction fetch paths, including the new fetchTxWithRetry logic.


1275-1297: Excellent test coverage for counter leak prevention.

This test correctly validates that mempoolTxsBeingProcessed is decremented in the finally block even when errors occur, directly addressing the PR objective of preventing transaction processing from freezing on errors.


1299-1321: Good coverage of retry logic.

This test validates that fetchTxWithRetry retries on 404-ish errors and eventually succeeds, which addresses the mempool race condition mentioned in the PR objectives.


1397-1426: Remove incorrect mocks and add the missing getAddressesForTransaction mock.

The test fails because it mocks the wrong methods. The TX_CONFIRMED handler calls getAddressesForTransaction, not getRelatedAddressesForTransaction or getTransactionFromChronikTransaction. The missing mock for getAddressesForTransaction causes the loop to iterate over undefined, failing the test.

Replace lines 1405–1418 with:

- // deterministic related addresses
- jest.spyOn(client as any, 'getRelatedAddressesForTransaction')
-   .mockReturnValue(['ecash:qqkv9wr69ry2p9l53lxp635va4h86wv435995w8p2h'])
-
- // minimal transaction shape for downstream
- jest.spyOn(client as any, 'getTransactionFromChronikTransaction')
-   .mockResolvedValue({
-     hash: 'txCONF',
-     amount: '0.01',
-     timestamp: Math.floor(Date.now() / 1000),
-     addressId: 'addr-1',
-     confirmed: false,
-     opReturn: JSON.stringify({ message: { type: 'PAY', paymentId: 'pid-1' } })
-   })
+
+ jest.spyOn(client as any, 'getAddressesForTransaction').mockResolvedValue([
+   {
+     address: { address: 'ecash:qqkv9wr69ry2p9l53lxp635va4h86wv435995w8p2h', networkId: 1 },
+     transaction: {
+       hash: 'txCONF',
+       amount: '0.01',
+       timestamp: Math.floor(Date.now() / 1000),
+       addressId: 'addr-1',
+       confirmed: false,
+       opReturn: JSON.stringify({ message: { type: 'PAY', paymentId: 'pid-1' } })
+     }
+   }
+ ])

Likely an incorrect or invalid review comment.

services/chronikService.ts (4)

183-201: LGTM: Unit mismatch resolved.

The time unit consistency issue flagged in previous reviews has been properly addressed. The code now correctly:

  1. Stores timestamps in seconds via moment().unix() (lines 213, 220)
  2. Computes age difference in seconds: (now - lastTimestamp)
  3. Converts to milliseconds: * 1000
  4. Compares against CHRONIK_MESSAGE_CACHE_DELAY (3000 ms)

This ensures cache entries expire after 3 seconds as intended.


638-667: Robust mempool processing with proper counter management.

The implementation correctly addresses the PR objectives:

  1. Early return (line 639) prevents duplicate processing
  2. Backpressure (lines 641-643) limits concurrent mempool transactions
  3. Counter increment (line 645) tracks in-flight processing
  4. Try-catch-finally (lines 646-667) ensures counter is always decremented
  5. fetchTxWithRetry (line 648) handles transient 404 errors

The Math.max(0, ...) guard in the finally block (line 666) is a good defensive practice to prevent negative counters.


862-888: Clear final flush with proper trigger execution.

The final commit properly handles remaining transactions with "FINAL" logging (line 868) and executes triggers for newly created transactions (lines 884-886). The logic correctly mirrors the partial commit flow while marking it as the final operation.


1067-1067: Condition logic is correct; no issues found.

The environment condition verification confirms the intended behavior is implemented correctly:

  • When isRunningApp() returns true (normal app, not in test, no JOBS_ENV), subscriptions occur
  • When NODE_ENV === 'test' OR JOBS_ENV !== undefined, only latency test waits—subscriptions are skipped

This satisfies the requirement that jobs should not subscribe to initial addresses during startup. When JOBS_ENV is defined, the code routes to the else if branch that excludes subscribeInitialAddresses().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants