Skip to content

Commit a363260

Browse files
committed
Use RpcRequest in PubSubSubscriptionPlan
1 parent db144da commit a363260

File tree

5 files changed

+23
-33
lines changed

5 files changed

+23
-33
lines changed

packages/rpc-subscriptions-api/src/__tests__/index-test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ describe('createSolanaRpcSubscriptionsApi', () => {
3131
});
3232
expect(executeRpcPubSubSubscriptionPlan).toHaveBeenCalledWith(
3333
expect.objectContaining({
34-
subscribeMethodName: 'thingSubscribe',
34+
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
3535
unsubscribeMethodName: 'thingUnsubscribe',
3636
}),
3737
);

packages/rpc-subscriptions-api/src/index.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,25 +51,25 @@ type Config = RequestTransformerConfig;
5151
function createSolanaRpcSubscriptionsApi_INTERNAL<TApi extends RpcSubscriptionsApiMethods>(
5252
config?: Config,
5353
): RpcSubscriptionsApi<TApi> {
54+
const requestTransformer = getDefaultRequestTransformerForSolanaRpc(config);
5455
const responseTransformer = getDefaultResponseTransformerForSolanaRpcSubscriptions({
5556
allowedNumericKeyPaths: getAllowedNumericKeypaths(),
5657
});
57-
// TODO(loris): Replace with request transformer.
58-
const parametersTransformer = <T extends unknown[]>(notificationName: string, params?: T) => {
59-
return getDefaultRequestTransformerForSolanaRpc(config)({ methodName: notificationName, params })
60-
.params as unknown[];
61-
};
6258
return createRpcSubscriptionsApi<TApi>({
6359
getSubscriptionConfigurationHash({ notificationName, params }) {
6460
return fastStableStringify([notificationName, params]);
6561
},
6662
planExecutor({ notificationName, params, ...rest }) {
63+
const request = { methodName: notificationName, params };
64+
const transformedRequest = requestTransformer(request);
6765
return executeRpcPubSubSubscriptionPlan({
6866
...rest,
6967
responseTransformer,
70-
subscribeMethodName: notificationName.replace(/Notifications$/, 'Subscribe'),
71-
subscribeParams: parametersTransformer(notificationName, params),
72-
unsubscribeMethodName: notificationName.replace(/Notifications$/, 'Unsubscribe'),
68+
subscribeRequest: {
69+
...transformedRequest,
70+
methodName: transformedRequest.methodName.replace(/Notifications$/, 'Subscribe'),
71+
},
72+
unsubscribeMethodName: transformedRequest.methodName.replace(/Notifications$/, 'Unsubscribe'),
7373
});
7474
},
7575
});

packages/rpc-subscriptions-spec/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ Subscription channels publish events on two channel names:
5858

5959
## Functions
6060

61-
### `executeRpcPubSubSubscriptionPlan({ channel, responseTransformer, signal, subscribeMethodName, subscribeParams, unsubscribeMethodName })`
61+
### `executeRpcPubSubSubscriptionPlan({ channel, responseTransformer, signal, subscribeRequest, unsubscribeMethodName })`
6262

6363
Given a channel, this function executes the particular subscription plan required by the Solana JSON RPC Subscriptions API.
6464

65-
1. Calls the `subscribeMethodName` on the remote RPC
65+
1. Calls the `subscribeRequest` on the remote RPC
6666
2. Waits for a response containing the subscription id
6767
3. Returns a `DataPublisher` that publishes notifications related to that subscriptions id, filtering out all others
6868
4. Calls the `unsubscribeMethodName` on the remote RPC when the abort signal is fired.

packages/rpc-subscriptions-spec/src/__tests__/rpc-subscriptions-pubsub-plan-test.ts

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
4646
const publisherPromise = executeRpcPubSubSubscriptionPlan({
4747
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
4848
signal: abortController.signal,
49-
subscribeMethodName: 'thingSubscribe',
50-
subscribeParams: [],
49+
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
5150
unsubscribeMethodName: 'thingUnsubscribe',
5251
});
5352
await expect(publisherPromise).rejects.toThrow();
@@ -56,8 +55,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
5655
executeRpcPubSubSubscriptionPlan({
5756
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
5857
signal: abortController.signal,
59-
subscribeMethodName: 'thingSubscribe',
60-
subscribeParams: [],
58+
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
6159
unsubscribeMethodName: 'thingUnsubscribe',
6260
}).catch(() => {});
6361
expect(mockChannel.on).toHaveBeenCalledWith('error', expect.any(Function), {
@@ -69,8 +67,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
6967
executeRpcPubSubSubscriptionPlan({
7068
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
7169
signal: abortController.signal,
72-
subscribeMethodName: 'thingSubscribe',
73-
subscribeParams: expectedParams,
70+
subscribeRequest: { methodName: 'thingSubscribe', params: expectedParams },
7471
unsubscribeMethodName: 'thingUnsubscribe',
7572
}).catch(() => {});
7673
expect(mockSend).toHaveBeenCalledWith(
@@ -91,8 +88,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
9188
const publisherPromise = executeRpcPubSubSubscriptionPlan({
9289
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
9390
signal: abortController.signal,
94-
subscribeMethodName: 'thingSubscribe',
95-
subscribeParams: [],
91+
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
9692
unsubscribeMethodName: 'thingUnsubscribe',
9793
});
9894
await expect(publisherPromise).rejects.toBe('o no');
@@ -110,8 +106,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
110106
publisherPromise = executeRpcPubSubSubscriptionPlan({
111107
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
112108
signal: abortController.signal,
113-
subscribeMethodName: 'thingSubscribe',
114-
subscribeParams: [],
109+
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
115110
unsubscribeMethodName: 'thingUnsubscribe',
116111
});
117112
});
@@ -135,8 +130,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
135130
const publisherPromise = executeRpcPubSubSubscriptionPlan({
136131
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
137132
signal: abortController.signal,
138-
subscribeMethodName: 'thingSubscribe',
139-
subscribeParams: [],
133+
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
140134
unsubscribeMethodName: 'thingUnsubscribe',
141135
});
142136
await Promise.resolve();
@@ -158,8 +152,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
158152
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
159153
responseTransformer: mockResponseTransformer,
160154
signal: abortController.signal,
161-
subscribeMethodName: 'thingSubscribe',
162-
subscribeParams: [],
155+
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
163156
unsubscribeMethodName: 'thingUnsubscribe',
164157
});
165158
await jest.runAllTimersAsync();
@@ -285,8 +278,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
285278
executeRpcPubSubSubscriptionPlan({
286279
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
287280
signal: secondAbortController.signal,
288-
subscribeMethodName: 'thingSubscribe',
289-
subscribeParams: [],
281+
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
290282
unsubscribeMethodName: 'thingUnsubscribe',
291283
}).catch(() => {});
292284
await jest.runAllTimersAsync();

packages/rpc-subscriptions-spec/src/rpc-subscriptions-pubsub-plan.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {
55
SolanaError,
66
} from '@solana/errors';
77
import { safeRace } from '@solana/promises';
8-
import { createRpcMessage, RpcResponseData } from '@solana/rpc-spec-types';
8+
import { createRpcMessage, RpcRequest, RpcResponseData } from '@solana/rpc-spec-types';
99
import { DataPublisher } from '@solana/subscribable';
1010
import { demultiplexDataPublisher } from '@solana/subscribable';
1111

@@ -16,8 +16,7 @@ type Config<TNotification> = Readonly<{
1616
channel: RpcSubscriptionsChannel<unknown, RpcNotification<TNotification> | RpcResponseData<RpcSubscriptionId>>;
1717
responseTransformer?: <T>(response: unknown, notificationName: string) => T;
1818
signal: AbortSignal;
19-
subscribeMethodName: string;
20-
subscribeParams?: unknown[];
19+
subscribeRequest: RpcRequest;
2120
unsubscribeMethodName: string;
2221
}>;
2322

@@ -98,8 +97,7 @@ export async function executeRpcPubSubSubscriptionPlan<TNotification>({
9897
channel,
9998
responseTransformer,
10099
signal,
101-
subscribeMethodName,
102-
subscribeParams,
100+
subscribeRequest,
103101
unsubscribeMethodName,
104102
}: Config<TNotification>): Promise<DataPublisher<RpcSubscriptionNotificationEvents<TNotification>>> {
105103
let subscriptionId: number | undefined;
@@ -147,7 +145,7 @@ export async function executeRpcPubSubSubscriptionPlan<TNotification>({
147145
* STEP 2
148146
* Send the subscription request.
149147
*/
150-
const subscribePayload = createRpcMessage({ methodName: subscribeMethodName, params: subscribeParams });
148+
const subscribePayload = createRpcMessage(subscribeRequest);
151149
await channel.send(subscribePayload);
152150
/**
153151
* STEP 3

0 commit comments

Comments
 (0)