Skip to content

Commit 76269c4

Browse files
committed
wip2
1 parent 5ce5f92 commit 76269c4

File tree

1 file changed

+60
-77
lines changed

1 file changed

+60
-77
lines changed

services/chronikService.ts

Lines changed: 60 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ export function getNullDataScriptData (outputScript: string): OpReturnData | nul
106106
return ret
107107
}
108108

109-
type ChronikTxWithAddress = { tx: Tx; address: Address }
109+
interface ChronikTxWithAddress { tx: Tx, address: Address }
110110
interface FetchedTxsBatch {
111111
chronikTxs: ChronikTxWithAddress[]
112112
addressesSynced: string[]
@@ -310,8 +310,8 @@ export class ChronikBlockchainClient {
310310
)
311311

312312
let chronikTxs: ChronikTxWithAddress[] = []
313-
314313
let lastBatchAddresses: string[] = []
314+
315315
for (let i = 0; i < addresses.length; i += INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY) {
316316
const addressBatchSlice = addresses.slice(i, i + INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY)
317317
lastBatchAddresses = addressBatchSlice.map(a => a.address)
@@ -328,89 +328,69 @@ export class ChronikBlockchainClient {
328328

329329
while (!hasReachedStoppingCondition) {
330330
const pageIndex = nextBurstBasePageIndex
331-
332-
// Fetch the single page for this burst; swallow page errors.
333331
let pageTxs: Tx[] = []
332+
334333
try {
335334
pageTxs = await this.getPaginatedTxs(address.address, pageIndex, CHRONIK_FETCH_N_TXS_PER_PAGE)
336335
} catch (err: any) {
337-
console.warn(`${addrLogPrefix} page=${pageIndex} failed: ${err?.message as string ?? err as string}`)
336+
console.warn(`${addrLogPrefix} page=${pageIndex} failed: ${err.message as string}`)
338337
pageTxs = []
339338
}
340339

341-
// If the page is empty, treat as "EMPTY ADDRESS" and stop.
342340
if (pageTxs.length === 0) {
343341
console.log(`${addrLogPrefix} EMPTY ADDRESS`)
344342
break
345343
}
346344

347-
// Burst-level bounds collapse to this single page
348-
const newestTimestampAcrossBurst = Number(pageTxs[0].block?.timestamp ?? pageTxs[0].timeFirstSeen)
349-
const lastTxInPage = pageTxs[pageTxs.length - 1]
350-
const oldestTimestampAcrossBurst = Number(lastTxInPage.block?.timestamp ?? lastTxInPage.timeFirstSeen)
345+
const newestTs = Number(pageTxs[0].block?.timestamp ?? pageTxs[0].timeFirstSeen)
346+
const oldestTs = Number(pageTxs[pageTxs.length - 1].block?.timestamp ?? pageTxs[pageTxs.length - 1].timeFirstSeen)
351347

352-
// If even the newest is older than lastSync, stop.
353-
if (newestTimestampAcrossBurst < lastSyncedTimestampSeconds) {
348+
if (newestTs < lastSyncedTimestampSeconds) {
354349
console.log(`${addrLogPrefix} NO NEW TXS`)
355350
break
356351
}
357352

358-
// Process this page
359-
let keptTransactionsInBurstCount = 0
360-
361353
pageTxs = pageTxs
362354
.filter(txThresholdFilter)
363355
.filter(t => t.block === undefined || t.block.timestamp >= lastSyncedTimestampSeconds)
364356

365357
if (pageTxs.length > 0) {
366-
chronikTxs.push(...pageTxs.map(tx => ({ tx, address: address })))
367-
keptTransactionsInBurstCount += pageTxs.length
358+
chronikTxs.push(...pageTxs.map(tx => ({ tx, address })))
368359
}
369360

370-
// If the page’s oldest tx is older than lastSync, stop after this burst.
371-
const oldestTimestampInPage = oldestTimestampAcrossBurst
372-
if (oldestTimestampInPage < lastSyncedTimestampSeconds) {
361+
if (oldestTs < lastSyncedTimestampSeconds) {
373362
hasReachedStoppingCondition = true
374363
}
375364

376-
console.log(`${addrLogPrefix} ${keptTransactionsInBurstCount} new txs...`)
377-
378365
nextBurstBasePageIndex += 1
379-
380-
if (keptTransactionsInBurstCount === 0 && oldestTimestampAcrossBurst < lastSyncedTimestampSeconds) {
366+
if (pageTxs.length === 0 && oldestTs < lastSyncedTimestampSeconds) {
381367
hasReachedStoppingCondition = true
382368
}
383369
}
384370
})
385371

386-
// Wait for this slice of address workers; continue even if some workers throw.
387372
await Promise.all(
388373
perAddressWorkers.map(async worker =>
389-
await worker.catch(err => {
390-
console.error(`${logPrefix}: address job failed: ${err?.message as string ?? err as string}`)
391-
})
374+
await worker.catch(err => console.error(`${logPrefix}: address job failed: ${err.message as string}`))
392375
)
393376
)
394377

395-
// Emit any full chunks we’ve accumulated after this slice
378+
// Emit full chunks of chronik txs (addressesSynced vazio)
396379
while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) {
397380
const chronikTxsSlice = chronikTxs.slice(0, TX_EMIT_BATCH_SIZE)
398381
chronikTxs = chronikTxs.slice(TX_EMIT_BATCH_SIZE)
399-
yield {
400-
chronikTxs: chronikTxsSlice,
401-
addressesSynced: lastBatchAddresses
402-
}
382+
yield { chronikTxs: chronikTxsSlice, addressesSynced: [] }
403383
}
384+
385+
// Emit marcador para este slice (sem txs, só addressesSynced)
386+
yield { chronikTxs: [], addressesSynced: lastBatchAddresses }
404387
}
405388

406-
// Final flush
389+
// Final flush de txs (addressesSynced vazio)
407390
if (chronikTxs.length > 0) {
408391
const remaining = chronikTxs
409392
chronikTxs = []
410-
yield {
411-
chronikTxs: remaining,
412-
addressesSynced: lastBatchAddresses
413-
}
393+
yield { chronikTxs: remaining, addressesSynced: [] }
414394
}
415395
}
416396

@@ -733,79 +713,85 @@ export class ChronikBlockchainClient {
733713
const perAddrCount = new Map<string, number>()
734714
addresses.forEach(a => perAddrCount.set(a.id, 0))
735715

736-
let txsToCreate: Prisma.TransactionUncheckedCreateInput[] = []
716+
interface RowWithRaw { row: Prisma.TransactionUncheckedCreateInput, raw: Tx }
717+
let toCommit: RowWithRaw[] = []
737718

738719
try {
739720
const pfx = `${this.CHRONIK_MSG_PREFIX}[PARALLEL FETCHING]`
740721
console.log(`${pfx} will fetch batches of ${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY} addresses from chronik`)
722+
741723
for await (const batch of this.fetchLatestTxsForAddresses(addresses)) {
742-
console.log(`${pfx} fetched batch of ${batch.chronikTxs.length} txs from chronik`)
743-
// count per address before committing
744-
const txRowsToCreate = await Promise.all(
745-
batch.chronikTxs.map(({ tx, address }) =>
746-
this.getTransactionFromChronikTransaction(tx, address)
747-
)
724+
if (batch.addressesSynced.length > 0) {
725+
// marcador de slice => desmarca syncing
726+
await setSyncingBatch(batch.addressesSynced, false)
727+
continue
728+
}
729+
730+
// são txs de fato
731+
const pairsFromBatch: RowWithRaw[] = await Promise.all(
732+
batch.chronikTxs.map(async ({ tx, address }) => {
733+
const row = await this.getTransactionFromChronikTransaction(tx, address)
734+
return { row, raw: tx }
735+
})
748736
)
749737

750-
for (const tx of txRowsToCreate) {
751-
perAddrCount.set(tx.addressId, (perAddrCount.get(tx.addressId) ?? 0) + 1)
738+
for (const { row } of pairsFromBatch) {
739+
perAddrCount.set(row.addressId, (perAddrCount.get(row.addressId) ?? 0) + 1)
752740
}
753-
txsToCreate.push(...txRowsToCreate)
754-
await setSyncingBatch(batch.addressesSynced, false)
755741

756-
if (txsToCreate.length >= DB_COMMIT_BATCH_SIZE) {
757-
console.log(`${this.CHRONIK_MSG_PREFIX} committing batch to DB — txs=${txsToCreate.length}`)
758-
const createdTxs = await createManyTransactions(txsToCreate)
742+
toCommit.push(...pairsFromBatch)
743+
744+
if (toCommit.length >= DB_COMMIT_BATCH_SIZE) {
745+
const commitPairs = toCommit.slice(0, DB_COMMIT_BATCH_SIZE)
746+
toCommit = toCommit.slice(DB_COMMIT_BATCH_SIZE)
747+
748+
const rows = commitPairs.map(p => p.row)
749+
const createdTxs = await createManyTransactions(rows)
759750
console.log(`${this.CHRONIK_MSG_PREFIX} committed — created=${createdTxs.length}`)
760-
txsToCreate = []
761751

762-
// optional append for production addresses
763752
const createdForProd = createdTxs.filter(t => productionAddressesIds.includes(t.addressId))
764753
if (createdForProd.length > 0) {
765754
await appendTxsToFile(createdForProd as unknown as Prisma.TransactionCreateManyInput[])
766755
}
767756

768-
// broadcast/triggers after commit
769757
if (createdTxs.length > 0) {
758+
const rawByHash = new Map(commitPairs.map(p => [p.raw.txid, p.raw]))
770759
const triggerBatch: BroadcastTxData[] = []
771-
const rawByHash = new Map(
772-
batch.chronikTxs.map(({ tx }) => [tx.txid, tx])
773-
)
774-
for (const createdTx of created) {
775-
const raw = rawByHash.get(createdTx.txId) // ajuste: campo do seu modelo
776-
if (!raw) continue
760+
for (const createdTx of createdTxs) {
761+
const raw = rawByHash.get(createdTx.hash)
762+
if (raw == null) continue
777763
const bd = this.broadcastIncomingTx(createdTx.address.address, raw, createdTx)
778764
triggerBatch.push(bd)
779765
}
780-
if (runTriggers) {
781-
console.log(`${this.CHRONIK_MSG_PREFIX} executing trigger batch — broadcasts=${triggerBatch.length}`)
766+
if (runTriggers && triggerBatch.length > 0) {
782767
await executeTriggersBatch(triggerBatch, this.networkId)
783768
}
784769
}
785770
}
786771
}
787772

788-
// final DB flush
789-
if (txsToCreate.length > 0) {
790-
console.log(`${this.CHRONIK_MSG_PREFIX} committing FINAL batch to DB — txs=${txsToCreate.length}`)
791-
const createdTxs = await createManyTransactions(txsToCreate)
792-
console.log(`${this.CHRONIK_MSG_PREFIX} committed FINAL — createdTxs=${createdTxs.length}`)
793-
txsToCreate = []
773+
// final DB flush (se sobrou menos que DB_COMMIT_BATCH_SIZE)
774+
if (toCommit.length > 0) {
775+
const rows = toCommit.map(p => p.row)
776+
const createdTxs = await createManyTransactions(rows)
777+
console.log(`${this.CHRONIK_MSG_PREFIX} committed FINAL — created=${createdTxs.length}`)
778+
toCommit = []
794779

795780
const createdForProd = createdTxs.filter(t => productionAddressesIds.includes(t.addressId))
796781
if (createdForProd.length > 0) {
797782
await appendTxsToFile(createdForProd as unknown as Prisma.TransactionCreateManyInput[])
798783
}
799784

800785
if (createdTxs.length > 0) {
786+
const rawByHash = new Map(toCommit.map(p => [p.raw.txid, p.raw]))
801787
const triggerBatch: BroadcastTxData[] = []
802-
for (const tx of createdTxs) {
803-
// WIP FIX
804-
const bd = this.broadcastIncomingTx(tx.address.address, tx)
788+
for (const createdTx of createdTxs) {
789+
const raw = rawByHash.get(createdTx.hash)
790+
if (raw == null) continue
791+
const bd = this.broadcastIncomingTx(createdTx.address.address, raw, createdTx)
805792
triggerBatch.push(bd)
806793
}
807-
if (runTriggers) {
808-
console.log(`${this.CHRONIK_MSG_PREFIX} executing FINAL trigger batch — broadcasts=${triggerBatch.length}`)
794+
if (runTriggers && triggerBatch.length > 0) {
809795
await executeTriggersBatch(triggerBatch, this.networkId)
810796
}
811797
}
@@ -830,9 +816,6 @@ export class ChronikBlockchainClient {
830816
const total = Object.values(successfulAddressesWithCount).reduce((p, c) => p + c, 0)
831817
console.log(`${this.CHRONIK_MSG_PREFIX} (PARALLEL) Finished syncing ${total} txs for ${addresses.length} addresses with ${failed.length} errors.`)
832818
console.timeEnd(`${this.CHRONIK_MSG_PREFIX} syncAddresses`)
833-
if (failed.length > 0) {
834-
console.log(`${this.CHRONIK_MSG_PREFIX} Failed addresses were:\n- ${Object.entries(failedAddressesWithErrors).map(([k, v]) => `${k}: ${v}`).join('\n- ')}`)
835-
}
836819

837820
return { failedAddressesWithErrors, successfulAddressesWithCount }
838821
}

0 commit comments

Comments
 (0)