Skip to content

Commit

Permalink
feat: rpc batch requests
Browse files Browse the repository at this point in the history
  • Loading branch information
ugur-eren committed May 13, 2024
1 parent d002bae commit ce48afc
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 1 deletion.
75 changes: 75 additions & 0 deletions src/batch/default.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { BatchClientOptions } from './interface';
import { DelayedAction } from './delayedAction';
import { stringify } from '../utils/json';
import { RPC } from '../types';
import { JRPC } from '../types/api';

export class BatchClient extends DelayedAction {
public nodeUrl: string;

public headers: object;

public interval: number;

public requestId: number = 0;

private pendingRequests: Record<string | number, JRPC.RequestBody> = {};

private batchPromise?: Promise<any>;

constructor(options: BatchClientOptions) {
super(options.interval);

this.nodeUrl = options.nodeUrl;
this.headers = options.headers;
this.interval = options.interval;
}

private addPendingRequest<T extends keyof RPC.Methods>(
method: T,
params?: RPC.Methods[T]['params'],
id?: string | number
) {
const request: JRPC.RequestBody = {
id: id ?? `batched_${(this.requestId += 1)}`,
jsonrpc: '2.0',
method,
params: params ?? undefined,
};

this.pendingRequests[request.id] = request;

return request.id;
}

private async sendBatch(requests: JRPC.RequestBody[]) {
const raw = await fetch(this.nodeUrl, {
method: 'POST',
body: stringify(requests),
headers: this.headers as Record<string, string>,
});

return raw.json();
}

public async fetch<T extends keyof RPC.Methods>(
method: T,
params?: RPC.Methods[T]['params'],
id?: string | number
) {
const requestId = this.addPendingRequest(method, params, id);

await this.wait();

const requests = this.pendingRequests;
this.pendingRequests = {};

if (!this.batchPromise) {
this.batchPromise = this.sendBatch(Object.values(requests));
}
const results = await this.batchPromise;
this.batchPromise = undefined;

return results.find((result: any) => result.id === requestId);
}
}
46 changes: 46 additions & 0 deletions src/batch/delayedAction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
export class DelayedAction {
private delay: number;

private timer: NodeJS.Timeout | null;

private promise?: Promise<void>;

private promiseResolve?: () => void;

constructor(delay: number = 5000) {
this.delay = delay;
this.timer = null;
}

/**
* Waits for the delay to pass, then resolves the promise.
* All calls to this method will return the same promise until the delay has passed
*
* @returns {Promise<void>}
*/
public async wait(): Promise<void> {
// If the promise is not set, create a new one and store the resolve function
if (!this.promise || !this.promiseResolve) {
this.promise = new Promise((resolve) => {
this.promiseResolve = resolve;
});
}

if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}

this.timer = setTimeout(() => {
if (this.promiseResolve) {
this.promiseResolve();

// Reset the promise and resolve function so that a new promise is created next time
this.promise = undefined;
this.promiseResolve = undefined;
}
}, this.delay);

return this.promise;
}
}
2 changes: 2 additions & 0 deletions src/batch/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './interface';
export * from './default';
32 changes: 32 additions & 0 deletions src/batch/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { RPC } from '../types';
import { JRPC } from '../types/api';

export type BatchClientOptions = {
nodeUrl: string;
headers: object;
interval: number;
};

export abstract class BatchClientInterface {
/**
* Fetch batched JSON-RPC requests
*
* @param body - JSON-RPC request body
* @returns JSON-RPC response
*/
public abstract fetch<T extends keyof RPC.Methods>(
method: T,
params?: RPC.Methods[T]['params'],
id?: string | number
): Promise<
JRPC.ResponseBody &
(
| {
result?: RPC.Methods[T]['result'];
}
| {
error?: RPC.Methods[T] extends { error: infer E } ? E : never;
}
)
>;
}
23 changes: 22 additions & 1 deletion src/channel/rpc_0_6.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { BatchClient } from '../batch';
import { NetworkName, StarknetChainId } from '../constants';
import { LibraryError } from '../provider/errors';
import {
Expand Down Expand Up @@ -52,8 +53,10 @@ export class RpcChannel {

readonly waitMode: Boolean; // behave like web2 rpc and return when tx is processed

private batchClient?: BatchClient;

constructor(optionsOrProvider?: RpcProviderOptions) {
const { nodeUrl, retries, headers, blockIdentifier, chainId, specVersion, waitMode } =
const { nodeUrl, retries, headers, blockIdentifier, chainId, specVersion, waitMode, batch } =
optionsOrProvider || {};
if (Object.values(NetworkName).includes(nodeUrl as NetworkName)) {
this.nodeUrl = getDefaultNodeUrl(nodeUrl as NetworkName, optionsOrProvider?.default);
Expand All @@ -69,6 +72,14 @@ export class RpcChannel {
this.specVersion = specVersion;
this.waitMode = waitMode || false;
this.requestId = 0;

if (typeof batch === 'number') {
this.batchClient = new BatchClient({
nodeUrl: this.nodeUrl,
headers: this.headers,
interval: batch,
});
}
}

public setChainId(chainId: StarknetChainId) {
Expand Down Expand Up @@ -110,6 +121,16 @@ export class RpcChannel {
params?: RPC.Methods[T]['params']
): Promise<RPC.Methods[T]['result']> {
try {
if (this.batchClient) {
const { error, result } = await this.batchClient.fetch(
method,
params,
(this.requestId += 1)
);
this.errorHandler(method, params, error);
return result as RPC.Methods[T]['result'];
}

const rawResult = await this.fetch(method, params, (this.requestId += 1));
const { error, result } = await rawResult.json();
this.errorHandler(method, params, error);
Expand Down
22 changes: 22 additions & 0 deletions src/channel/rpc_0_7.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { BatchClient } from '../batch';
import { NetworkName, StarknetChainId } from '../constants';
import { LibraryError } from '../provider/errors';
import {
Expand Down Expand Up @@ -54,6 +55,8 @@ export class RpcChannel {

readonly waitMode: Boolean; // behave like web2 rpc and return when tx is processed

private batchClient?: BatchClient;

constructor(optionsOrProvider?: RpcProviderOptions) {
const {
nodeUrl,
Expand All @@ -64,6 +67,7 @@ export class RpcChannel {
specVersion,
waitMode,
transactionRetryIntervalFallback,
batch,
} = optionsOrProvider || {};
if (Object.values(NetworkName).includes(nodeUrl as NetworkName)) {
this.nodeUrl = getDefaultNodeUrl(nodeUrl as NetworkName, optionsOrProvider?.default);
Expand All @@ -80,6 +84,14 @@ export class RpcChannel {
this.waitMode = waitMode || false;
this.requestId = 0;
this.transactionRetryIntervalFallback = transactionRetryIntervalFallback;

if (typeof batch === 'number') {
this.batchClient = new BatchClient({
nodeUrl: this.nodeUrl,
headers: this.headers,
interval: batch,
});
}
}

private get transactionRetryIntervalDefault() {
Expand Down Expand Up @@ -125,6 +137,16 @@ export class RpcChannel {
params?: RPC.Methods[T]['params']
): Promise<RPC.Methods[T]['result']> {
try {
if (this.batchClient) {
const { error, result } = await this.batchClient.fetch(
method,
params,
(this.requestId += 1)
);
this.errorHandler(method, params, error);
return result as RPC.Methods[T]['result'];
}

const rawResult = await this.fetch(method, params, (this.requestId += 1));
const { error, result } = await rawResult.json();
this.errorHandler(method, params, error);
Expand Down
1 change: 1 addition & 0 deletions src/types/provider/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ export type RpcProviderOptions = {
l1BoundMaxPricePerUnit: number;
maxFee: number;
};
batch?: false | number;
};

0 comments on commit ce48afc

Please sign in to comment.