Skip to content

Commit af72ccc

Browse files
authored
Merge pull request #1066 from PayButton/feat/clean-client-payment-instances-after-a-while
feat: job to clean up pending old payments
2 parents 7bea95a + ec9ac14 commit af72ccc

File tree

8 files changed

+138
-64
lines changed

8 files changed

+138
-64
lines changed

constants/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,3 +291,5 @@ export const TRIGGER_LOG_BATCH_SIZE = 200
291291
export const PRICES_CONNECTION_BATCH_SIZE = 1_000
292292
// interactive $transaction timeout in ms (for the single delete + several createMany of prices)
293293
export const PRICES_CONNECTION_TIMEOUT = 30_000
294+
295+
export const CLIENT_PAYMENT_EXPIRATION_TIME = (7) * (24 * 60 * 60 * 1000) // (number of days) * (24 * 60 * 60 * 1000)

jobs/initJobs.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import { CURRENT_PRICE_REPEAT_DELAY } from 'constants/index'
1+
import { CLIENT_PAYMENT_EXPIRATION_TIME, CURRENT_PRICE_REPEAT_DELAY } from 'constants/index'
22
import { Queue } from 'bullmq'
33
import { redisBullMQ } from 'redis/clientInstance'
44
import EventEmitter from 'events'
5-
import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker } from './workers'
5+
import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker, cleanupClientPaymentsWorker } from './workers'
66

77
EventEmitter.defaultMaxListeners = 20
88

@@ -24,6 +24,22 @@ const main = async (): Promise<void> => {
2424
const blockchainQueue = new Queue('blockchainSync', { connection: redisBullMQ })
2525
await blockchainQueue.add('syncBlockchainAndPrices', {}, { jobId: 'syncBlockchainAndPrices' })
2626
await syncBlockchainAndPricesWorker(blockchainQueue.name)
27+
28+
const cleanupQueue = new Queue('clientPaymentCleanup', { connection: redisBullMQ })
29+
30+
await cleanupQueue.add(
31+
'cleanupClientPayments',
32+
{},
33+
{
34+
jobId: 'cleanupClientPayments',
35+
removeOnFail: false,
36+
repeat: {
37+
every: CLIENT_PAYMENT_EXPIRATION_TIME
38+
}
39+
}
40+
)
41+
42+
await cleanupClientPaymentsWorker(cleanupQueue.name)
2743
}
2844

2945
void main()

jobs/workers.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { redisBullMQ } from 'redis/clientInstance'
33
import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index'
44
import { multiBlockchainClient } from 'services/chronikService'
55
import { connectAllTransactionsToPrices } from 'services/transactionService'
6+
import { cleanupExpiredClientPayments } from 'services/clientPaymentService'
67

78
import * as priceService from 'services/priceService'
89

@@ -60,3 +61,26 @@ export const syncBlockchainAndPricesWorker = async (queueName: string): Promise<
6061
}
6162
})
6263
}
64+
65+
export const cleanupClientPaymentsWorker = async (queueName: string): Promise<void> => {
66+
const worker = new Worker(
67+
queueName,
68+
async (job) => {
69+
console.log(`[CLIENT_PAYMENT CLEANUP] job ${job.id as string}: running expired payment cleanup...`)
70+
await cleanupExpiredClientPayments()
71+
console.log('[CLIENT_PAYMENT CLEANUP] cleanup finished.')
72+
},
73+
{
74+
connection: redisBullMQ,
75+
lockDuration: DEFAULT_WORKER_LOCK_DURATION
76+
}
77+
)
78+
79+
worker.on('completed', job => {
80+
console.log(`[CLIENT_PAYMENT CLEANUP] job ${job.id as string}: completed successfully`)
81+
})
82+
83+
worker.on('failed', (job, err) => {
84+
console.error(`[CLIENT_PAYMENT CLEANUP] job ${job?.id as string}: FAILED — ${err.message}`)
85+
})
86+
}

pages/api/payments/paymentId/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Decimal } from '@prisma/client/runtime/library'
2-
import { generatePaymentId } from 'services/transactionService'
2+
import { generatePaymentId } from 'services/clientPaymentService'
33
import { parseAddress, parseCreatePaymentIdPOSTRequest } from 'utils/validators'
44
import { RESPONSE_MESSAGES } from 'constants/index'
55
import { runMiddleware } from 'utils/index'

services/chronikService.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ import {
1010
fetchUnconfirmedTransactions,
1111
upsertTransaction,
1212
getSimplifiedTransactions,
13-
getSimplifiedTrasaction,
13+
getSimplifiedTrasaction
14+
} from './transactionService'
15+
import {
1416
updateClientPaymentStatus,
1517
getClientPayment
16-
} from './transactionService'
18+
} from './clientPaymentService'
1719
import { Address, Prisma, ClientPaymentStatus } from '@prisma/client'
1820
import xecaddr from 'xecaddrjs'
1921
import { getAddressPrefix, satoshisToUnit } from 'utils/index'

services/clientPaymentService.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import prisma from 'prisma-local/clientInstance'
2+
import { v4 as uuidv4 } from 'uuid'
3+
import { multiBlockchainClient } from 'services/chronikService'
4+
import { Prisma, ClientPaymentStatus } from '@prisma/client'
5+
import { NETWORK_IDS_FROM_SLUGS, CLIENT_PAYMENT_EXPIRATION_TIME } from 'constants/index'
6+
import { parseAddress } from 'utils/validators'
7+
import { addressExists } from './addressService'
8+
import moment from 'moment'
9+
10+
export const generatePaymentId = async (address: string, amount?: Prisma.Decimal): Promise<string> => {
11+
const rawUUID = uuidv4()
12+
const cleanUUID = rawUUID.replace(/-/g, '')
13+
const status = 'PENDING' as ClientPaymentStatus
14+
address = parseAddress(address)
15+
const prefix = address.split(':')[0].toLowerCase()
16+
const networkId = NETWORK_IDS_FROM_SLUGS[prefix]
17+
const isAddressRegistered = await addressExists(address)
18+
19+
const clientPayment = await prisma.clientPayment.create({
20+
data: {
21+
address: {
22+
connectOrCreate: {
23+
where: { address },
24+
create: {
25+
address,
26+
networkId
27+
}
28+
}
29+
},
30+
paymentId: cleanUUID,
31+
status,
32+
amount
33+
},
34+
include: {
35+
address: true
36+
}
37+
})
38+
39+
if (!isAddressRegistered) {
40+
void multiBlockchainClient.syncAndSubscribeAddresses([clientPayment.address])
41+
}
42+
43+
return clientPayment.paymentId
44+
}
45+
46+
export const updateClientPaymentStatus = async (paymentId: string, status: ClientPaymentStatus): Promise<void> => {
47+
await prisma.clientPayment.update({
48+
where: { paymentId },
49+
data: { status }
50+
})
51+
}
52+
53+
export const getClientPayment = async (paymentId: string): Promise<Prisma.ClientPaymentGetPayload<{ include: { address: true } }> | null> => {
54+
return await prisma.clientPayment.findUnique({
55+
where: { paymentId },
56+
include: { address: true }
57+
})
58+
}
59+
60+
export const cleanupExpiredClientPayments = async (): Promise<void> => {
61+
const cutoff = moment.utc().subtract(CLIENT_PAYMENT_EXPIRATION_TIME, 'milliseconds').toDate()
62+
63+
const oldPaymentsUnpaid = await prisma.clientPayment.findMany({
64+
where: {
65+
status: 'PENDING',
66+
createdAt: { lt: cutoff }
67+
},
68+
select: { paymentId: true }
69+
})
70+
71+
if (oldPaymentsUnpaid.length === 0) {
72+
console.log('[CLIENT_PAYMENT CLEANUP] no expired pending payments found.')
73+
return
74+
}
75+
76+
console.log(`[CLIENT_PAYMENT CLEANUP] deleting ${oldPaymentsUnpaid.length} expired pending payments...`)
77+
await prisma.clientPayment.deleteMany({
78+
where: {
79+
paymentId: { in: oldPaymentsUnpaid.map(p => p.paymentId) }
80+
}
81+
})
82+
}

services/transactionService.ts

Lines changed: 2 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import prisma from 'prisma-local/clientInstance'
2-
import { Prisma, Transaction, ClientPaymentStatus } from '@prisma/client'
3-
import { RESPONSE_MESSAGES, USD_QUOTE_ID, CAD_QUOTE_ID, N_OF_QUOTES, UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT, SupportedQuotesType, NETWORK_IDS, NETWORK_IDS_FROM_SLUGS, PRICES_CONNECTION_BATCH_SIZE, PRICES_CONNECTION_TIMEOUT } from 'constants/index'
2+
import { Prisma, Transaction } from '@prisma/client'
3+
import { RESPONSE_MESSAGES, USD_QUOTE_ID, CAD_QUOTE_ID, N_OF_QUOTES, UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT, SupportedQuotesType, NETWORK_IDS, PRICES_CONNECTION_BATCH_SIZE, PRICES_CONNECTION_TIMEOUT } from 'constants/index'
44
import { fetchAddressBySubstring, fetchAddressById, fetchAddressesByPaybuttonId, addressExists } from 'services/addressService'
55
import { AllPrices, QuoteValues, fetchPricesForNetworkAndTimestamp, flattenTimestamp } from 'services/priceService'
66
import _ from 'lodash'
@@ -9,8 +9,6 @@ import { SimplifiedTransaction } from 'ws-service/types'
99
import { OpReturnData, parseAddress } from 'utils/validators'
1010
import { generatePaymentFromTxWithInvoices } from 'redis/paymentCache'
1111
import { ButtonDisplayData, Payment } from 'redis/types'
12-
import { v4 as uuidv4 } from 'uuid'
13-
import { multiBlockchainClient } from 'services/chronikService'
1412

1513
export function getTransactionValue (transaction: TransactionWithPrices | TransactionsWithPaybuttonsAndPrices | SimplifiedTransaction): QuoteValues {
1614
const ret: QuoteValues = {
@@ -976,53 +974,3 @@ export const fetchDistinctPaymentYearsByUser = async (userId: string): Promise<n
976974

977975
return years.map(y => y.year)
978976
}
979-
980-
export const generatePaymentId = async (address: string, amount?: Prisma.Decimal): Promise<string> => {
981-
const rawUUID = uuidv4()
982-
const cleanUUID = rawUUID.replace(/-/g, '')
983-
const status = 'PENDING' as ClientPaymentStatus
984-
address = parseAddress(address)
985-
const prefix = address.split(':')[0].toLowerCase()
986-
const networkId = NETWORK_IDS_FROM_SLUGS[prefix]
987-
const isAddressRegistered = await addressExists(address)
988-
989-
const clientPayment = await prisma.clientPayment.create({
990-
data: {
991-
address: {
992-
connectOrCreate: {
993-
where: { address },
994-
create: {
995-
address,
996-
networkId
997-
}
998-
}
999-
},
1000-
paymentId: cleanUUID,
1001-
status,
1002-
amount
1003-
},
1004-
include: {
1005-
address: true
1006-
}
1007-
})
1008-
1009-
if (!isAddressRegistered) {
1010-
void multiBlockchainClient.syncAndSubscribeAddresses([clientPayment.address])
1011-
}
1012-
1013-
return clientPayment.paymentId
1014-
}
1015-
1016-
export const updateClientPaymentStatus = async (paymentId: string, status: ClientPaymentStatus): Promise<void> => {
1017-
await prisma.clientPayment.update({
1018-
where: { paymentId },
1019-
data: { status }
1020-
})
1021-
}
1022-
1023-
export const getClientPayment = async (paymentId: string): Promise<Prisma.ClientPaymentGetPayload<{ include: { address: true } }> | null> => {
1024-
return await prisma.clientPayment.findUnique({
1025-
where: { paymentId },
1026-
include: { address: true }
1027-
})
1028-
}

tests/unittests/handleUpdateClientPaymentStatus.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import { ChronikBlockchainClient } from '../../services/chronikService'
44

55
process.env.WS_AUTH_KEY = 'test-auth-key'
66

7-
// Mock the transactionService functions
8-
jest.mock('../../services/transactionService', () => ({
7+
// Mock the clientPayment functions
8+
jest.mock('../../services/clientPaymentService', () => ({
99
getClientPayment: jest.fn(),
1010
updateClientPaymentStatus: jest.fn()
1111
}))
@@ -71,9 +71,9 @@ describe('handleUpdateClientPaymentStatus tests', () => {
7171

7272
// Get the mocked functions
7373
// eslint-disable-next-line @typescript-eslint/no-var-requires
74-
const transactionService = require('../../services/transactionService')
75-
mockGetClientPayment = transactionService.getClientPayment
76-
mockUpdateClientPaymentStatus = transactionService.updateClientPaymentStatus
74+
const clientPaymentService = require('../../services/clientPaymentService')
75+
mockGetClientPayment = clientPaymentService.getClientPayment
76+
mockUpdateClientPaymentStatus = clientPaymentService.updateClientPaymentStatus
7777

7878
// Clear all mocks before each test
7979
jest.clearAllMocks()

0 commit comments

Comments
 (0)