Skip to content

Commit 36afa02

Browse files
authored
Merge pull request #26 from windingtree/feat/new-queue
New Queue library
2 parents 7fcdb1f + e1fdc80 commit 36afa02

File tree

15 files changed

+1843
-771
lines changed

15 files changed

+1843
-771
lines changed

.github/workflows/test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ on:
33
push:
44
branches:
55
- develop
6+
pull_request:
7+
branches:
8+
- develop
69
jobs:
710
test:
811
name: Test

examples/client/src/App.tsx

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@ export const App = () => {
4545
const requestsManager = useRef<
4646
ClientRequestsManager<RequestQuery, OfferOptions> | undefined
4747
>();
48-
const dealsManager = useRef<ClientDealsManager<
49-
RequestQuery,
50-
OfferOptions
51-
>>();
48+
const dealsManager = useRef<ClientDealsManager<RequestQuery, OfferOptions>>();
5249
const { publicClient } = useWallet();
5350
const [connected, setConnected] = useState<boolean>(false);
5451
const [selectedTab, setSelectedTab] = useState<number>(0);
@@ -105,19 +102,27 @@ export const App = () => {
105102
);
106103
};
107104

108-
const onRequestSubscribe: EventHandler<CustomEvent<ClientRequestRecord>> = ({ detail }) => {
105+
const onRequestSubscribe: EventHandler<
106+
CustomEvent<ClientRequestRecord>
107+
> = ({ detail }) => {
109108
client.current?.subscribe(detail.data.id);
110109
};
111110

112-
const onRequestUnsubscribe: EventHandler<CustomEvent<ClientRequestRecord>> = ({ detail }) => {
111+
const onRequestUnsubscribe: EventHandler<
112+
CustomEvent<ClientRequestRecord>
113+
> = ({ detail }) => {
113114
client.current?.unsubscribe(detail.data.id);
114115
};
115116

116-
const onRequestPublish: EventHandler<CustomEvent<RequestData<RequestQuery>>> = ({ detail }) => {
117+
const onRequestPublish: EventHandler<
118+
CustomEvent<RequestData<RequestQuery>>
119+
> = ({ detail }) => {
117120
requestsManager.current?.add(detail);
118121
};
119122

120-
const onOffer: EventHandler<CustomEvent<OfferData<RequestQuery, OfferOptions>>> = ({ detail }) => {
123+
const onOffer: EventHandler<
124+
CustomEvent<OfferData<RequestQuery, OfferOptions>>
125+
> = ({ detail }) => {
121126
requestsManager.current?.addOffer(detail);
122127
};
123128

@@ -168,8 +173,14 @@ export const App = () => {
168173
requestsManager.current.addEventListener('delete', updateRequests);
169174
requestsManager.current.addEventListener('clear', updateRequests);
170175
requestsManager.current.addEventListener('offer', updateRequests);
171-
requestsManager.current.addEventListener('subscribe', onRequestSubscribe);
172-
requestsManager.current.addEventListener('unsubscribe', onRequestUnsubscribe);
176+
requestsManager.current.addEventListener(
177+
'subscribe',
178+
onRequestSubscribe,
179+
);
180+
requestsManager.current.addEventListener(
181+
'unsubscribe',
182+
onRequestUnsubscribe,
183+
);
173184

174185
dealsManager.current.addEventListener('changed', updateDeals);
175186

@@ -200,8 +211,14 @@ export const App = () => {
200211
requestsManager.current?.removeEventListener('delete', updateRequests);
201212
requestsManager.current?.removeEventListener('clear', updateRequests);
202213
requestsManager.current?.removeEventListener('offer', updateRequests);
203-
requestsManager.current?.removeEventListener('subscribe', onRequestSubscribe);
204-
requestsManager.current?.removeEventListener('unsubscribe', onRequestUnsubscribe);
214+
requestsManager.current?.removeEventListener(
215+
'subscribe',
216+
onRequestSubscribe,
217+
);
218+
requestsManager.current?.removeEventListener(
219+
'unsubscribe',
220+
onRequestUnsubscribe,
221+
);
205222

206223
dealsManager.current?.removeEventListener('changed', updateDeals);
207224

examples/client/src/components/Deals.tsx

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import { useState, useCallback, useEffect } from 'react';
22
import { DateTime } from 'luxon';
33
import { Address } from 'viem';
4-
import { ClientDealsManager, DealRecord, DealStatus } from '../../../../src/index.js'; // @windingtree/sdk
4+
import {
5+
ClientDealsManager,
6+
DealRecord,
7+
DealStatus,
8+
} from '../../../../src/index.js'; // @windingtree/sdk
59
import { RequestQuery, OfferOptions } from '../../../shared/index.js';
610
import { centerEllipsis, formatBalance, parseWalletError } from '../utils.js';
711
import { useWallet } from '../providers/WalletProvider/WalletProviderContext.js';
@@ -12,27 +16,18 @@ export type DealsRegistryRecord = Required<
1216

1317
export interface DealsProps {
1418
deals: DealsRegistryRecord[];
15-
manager?: ClientDealsManager<
16-
RequestQuery,
17-
OfferOptions
18-
>;
19+
manager?: ClientDealsManager<RequestQuery, OfferOptions>;
1920
}
2021

2122
export interface TransferFormProps {
2223
deal?: DealsRegistryRecord;
23-
manager?: ClientDealsManager<
24-
RequestQuery,
25-
OfferOptions
26-
>;
24+
manager?: ClientDealsManager<RequestQuery, OfferOptions>;
2725
onClose: () => void;
2826
}
2927

3028
export interface CancelProps {
3129
deal?: DealsRegistryRecord;
32-
manager?: ClientDealsManager<
33-
RequestQuery,
34-
OfferOptions
35-
>;
30+
manager?: ClientDealsManager<RequestQuery, OfferOptions>;
3631
onClose: () => void;
3732
}
3833

@@ -68,12 +63,7 @@ export const TransferForm = ({ deal, manager, onClose }: TransferFormProps) => {
6863
throw new Error('Ethereum client not ready');
6964
}
7065

71-
await manager.transfer(
72-
deal.offer,
73-
to as Address,
74-
walletClient,
75-
setTx,
76-
);
66+
await manager.transfer(deal.offer, to as Address, walletClient, setTx);
7767
setLoading(false);
7868
setSuccess(true);
7969
} catch (err) {

examples/client/src/components/MakeDeal.tsx

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,7 @@ import { ConnectButton } from '../providers/WalletProvider/ConnectButton.js';
1414

1515
interface MakeDealProps {
1616
offer?: OfferData<RequestQuery, OfferOptions>;
17-
manager?: ClientDealsManager<
18-
RequestQuery,
19-
OfferOptions
20-
>;
17+
manager?: ClientDealsManager<RequestQuery, OfferOptions>;
2118
}
2219

2320
/**
@@ -56,13 +53,7 @@ export const MakeDeal = ({ offer, manager }: MakeDealProps) => {
5653
throw new Error('Invalid deal configuration');
5754
}
5855

59-
await manager.create(
60-
offer,
61-
paymentId,
62-
ZeroHash,
63-
walletClient,
64-
setTx,
65-
);
56+
await manager.create(offer, paymentId, ZeroHash, walletClient, setTx);
6657
setLoading(false);
6758
setSuccess(true);
6859
} catch (err) {

examples/node/index.ts

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ import {
1616
NodeOptions,
1717
NodeRequestManager,
1818
Queue,
19+
JobHandler,
1920
createNode,
20-
createJobHandler,
2121
} from '../../src/index.js';
2222
import { OfferData } from '../../src/shared/types.js';
2323
import { DealStatus, ProtocolContracts } from '../../src/shared/contracts.js';
@@ -63,6 +63,14 @@ process.once('unhandledRejection', (error) => {
6363
process.exit(1);
6464
});
6565

66+
const createJobHandler =
67+
<JobData = unknown, HandlerOptions = unknown>(
68+
handler: JobHandler<JobData, HandlerOptions>,
69+
) =>
70+
(options: HandlerOptions = {} as HandlerOptions) =>
71+
(data: JobData) =>
72+
handler(data, options);
73+
6674
/**
6775
* This is interface of object that you want to pass to the job handler as options
6876
*/
@@ -76,8 +84,18 @@ interface DealHandlerOptions {
7684
const dealHandler = createJobHandler<
7785
OfferData<RequestQuery, OfferOptions>,
7886
DealHandlerOptions
79-
>(async ({ name, id, data: offer }, { contracts }) => {
80-
logger.trace(`Job "${name}" #${id} Checking for a deal. Offer #${offer.id}`);
87+
>(async (offer, options) => {
88+
if (!offer || !options) {
89+
throw new Error('Invalid job execution configuration');
90+
}
91+
92+
const { contracts } = options;
93+
94+
if (!contracts) {
95+
throw new Error('Contracts manager must be provided to job handler config');
96+
}
97+
98+
logger.trace(`Checking for a deal. Offer #${offer.id}`);
8199

82100
// Check for a deal
83101
const [, , , buyer, , , status] = await contracts.getDeal(offer);
@@ -98,10 +116,10 @@ const dealHandler = createJobHandler<
98116
},
99117
);
100118

101-
return true; // Returning true means that the job must be stopped
119+
return false; // Returning true means that the job must be stopped
102120
}
103121

104-
return; // Job continuing
122+
return true; // Job continuing
105123
});
106124

107125
/**
@@ -155,17 +173,20 @@ const createRequestsHandler =
155173
checkOut: BigInt(nowSec() + 2000),
156174
});
157175

158-
queue.addEventListener('expired', ({ detail: job }) => {
159-
logger.trace(`Job #${job.id} is expired`);
176+
queue.addEventListener('status', ({ detail: job }) => {
177+
logger.trace(`Job #${job.id} status changed`, job);
160178
});
161179

162180
/**
163181
* On every published offer we expecting a deal.
164182
* So, we add a job for detection of deals
165183
*/
166-
queue.addJob('deal', offer, {
184+
queue.add({
185+
handlerName: 'deal',
186+
data: offer,
187+
isRecurrent: true,
188+
recurrenceInterval: 5000,
167189
expire: Number(offer.expire),
168-
every: 5000, // 5 sec
169190
});
170191
};
171192

@@ -211,8 +232,8 @@ const main = async (): Promise<void> => {
211232

212233
const queue = new Queue({
213234
storage,
214-
hashKey: 'jobs',
215-
concurrentJobsNumber: 3,
235+
idsKeyName: 'jobsIds',
236+
concurrencyLimit: 3,
216237
});
217238

218239
const requestManager = new NodeRequestManager<RequestQuery>({
@@ -234,7 +255,7 @@ const main = async (): Promise<void> => {
234255
requestManager.add(topic, data);
235256
});
236257

237-
queue.addJobHandler('deal', dealHandler({ contracts: contractsManager }));
258+
queue.registerHandler('deal', dealHandler({ contracts: contractsManager }));
238259

239260
/**
240261
* Graceful Shutdown handler

src/client/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events';
22
import { createLibp2p, Libp2pOptions, Libp2p, Libp2pInit } from 'libp2p';
33
import { noise } from '@chainsafe/libp2p-noise';
44
import { mplex } from '@libp2p/mplex';
5+
import { yamux } from '@chainsafe/libp2p-yamux';
56
import { webSockets } from '@libp2p/websockets';
67
import { all } from '@libp2p/websockets/filters';
78
import { multiaddr, Multiaddr } from '@multiformats/multiaddr';
@@ -19,7 +20,6 @@ import { centerSub, CenterSub } from '../shared/pubsub.js';
1920
import { ServerAddressOption } from '../shared/options.js';
2021
import { encodeText, decodeText } from '../utils/text.js';
2122
import { createLogger } from '../utils/logger.js';
22-
import { yamux } from '@chainsafe/libp2p-yamux';
2323

2424
const logger = createLogger('Client');
2525

src/node/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { createLibp2p, Libp2pInit, Libp2pOptions, Libp2p } from 'libp2p';
22
import { noise } from '@chainsafe/libp2p-noise';
33
import { mplex } from '@libp2p/mplex';
4+
import { yamux } from '@chainsafe/libp2p-yamux';
45
import { webSockets } from '@libp2p/websockets';
56
import { all } from '@libp2p/websockets/filters';
67
import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events';
@@ -32,7 +33,6 @@ import { CenterSub, centerSub } from '../shared/pubsub.js';
3233
import { RequestEvent } from './requestManager.js';
3334
import { decodeText, encodeText } from '../utils/text.js';
3435
import { createLogger } from '../utils/logger.js';
35-
import { yamux } from '@chainsafe/libp2p-yamux';
3636

3737
const logger = createLogger('Node');
3838

src/server/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { createLibp2p, Libp2pOptions, Libp2p } from 'libp2p';
22
import { createFromJSON } from '@libp2p/peer-id-factory';
33
import { noise } from '@chainsafe/libp2p-noise';
44
import { mplex } from '@libp2p/mplex';
5+
import { yamux } from '@chainsafe/libp2p-yamux';
56
import { webSockets } from '@libp2p/websockets';
67
import { all } from '@libp2p/websockets/filters';
78
import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events';
@@ -10,7 +11,6 @@ import { centerSub, CenterSub } from '../shared/pubsub.js';
1011
import { decodeText } from '../utils/text.js';
1112
import { StorageInitializer } from '../storage/abstract.js';
1213
import { createLogger } from '../utils/logger.js';
13-
import { yamux } from '@chainsafe/libp2p-yamux';
1414

1515
const logger = createLogger('Server');
1616

0 commit comments

Comments
 (0)