Skip to content

feat: Include support for getblockreceipts #3521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f82c7dd
Adds new getBlockReceipts endpoint
Feb 25, 2025
b3ec3dd
sorts cache_key enum in constants alphabetically
Feb 25, 2025
9d97ae8
Adds draft unit tests for getBlockReceipts
Feb 26, 2025
4b4e9ca
Fixes wrong way of importing in eth.ts
Feb 26, 2025
b8d280b
remove unecessary edge cases
Feb 26, 2025
4fa1028
Adds e2e tests; improves logic on endpoint and improve unit tests
Feb 27, 2025
69772a5
improves getBlockReceipts implementation
Mar 5, 2025
0b8447d
fixes unit test
Mar 5, 2025
260808e
Reduces duplications in unit tests; adds more coverage
Mar 6, 2025
d3b724d
Improves construction of cache key
Mar 6, 2025
15f5891
removes redundant getCacheKeyFromLog method; adjusts unit test
Mar 10, 2025
b6f42c5
Adds new method to rate limiting config
Mar 11, 2025
23ef432
Changes allowed parameters to not include blockHash object
Mar 17, 2025
c8c1718
fixes getBlockReceipts acceptance test
Mar 17, 2025
0b27481
adds k6 test
Mar 17, 2025
cf9b398
improves endpoint performance using improved req flow to MN
konstantinabl Mar 18, 2025
d2a191c
fixes unit tests after improving perf
konstantinabl Mar 18, 2025
aa0e286
adds default param to getHistoricalBlockResponse
konstantinabl Mar 18, 2025
806df94
fix k6 tests
konstantinabl Mar 18, 2025
3ac03d9
removes unecessary filter
konstantinabl Mar 18, 2025
44d8daa
fixes problem with k6 test
konstantinabl Mar 18, 2025
a3fe8ca
improves code readability, removed unecessary functions and fixes uni…
konstantinabl Mar 19, 2025
c09a3ad
moves poller and subscriptions controller to ws-server
konstantinabl Mar 19, 2025
f4858e4
Revert "moves poller and subscriptions controller to ws-server"
konstantinabl Mar 19, 2025
358558d
Fixes acceptance test
konstantinabl Mar 21, 2025
b79a315
address PR comments
konstantinabl Mar 21, 2025
8c39bea
fixes acceptance test
konstantinabl Mar 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
moves poller and subscriptions controller to ws-server
Signed-off-by: Konstantina Blazhukova <konstantina.blajukova@gmail.com>
  • Loading branch information
konstantinabl committed Mar 26, 2025
commit c09a3ad2d9e84351b4137e15c5da6d30f4e8febe
18 changes: 0 additions & 18 deletions packages/relay/src/lib/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ import { DebugImpl } from './debug';
import { EthImpl } from './eth';
import { AdminImpl } from './admin';
import { NetImpl } from './net';
import { Poller } from './poller';
import { CacheService } from './services/cacheService/cacheService';
import HAPIService from './services/hapiService/hapiService';
import { HbarLimitService } from './services/hbarLimitService';
import MetricService from './services/metricService/metricService';
import { SubscriptionController } from './subscriptionController';
import { RequestDetails } from './types';
import { Web3Impl } from './web3';

Expand Down Expand Up @@ -70,13 +68,6 @@ export class Relay {
*/
private readonly ethImpl: Eth;

/**
* @private
* @readonly
* @property {Subs} [subImpl] - An optional implementation for handling subscription-related JSON-RPC requests.
*/
private readonly subImpl?: Subs;

/**
* @private
* @readonly
Expand Down Expand Up @@ -197,11 +188,6 @@ export class Relay {
ipAddressHbarSpendingPlanRepository,
);

if (ConfigService.get('SUBSCRIPTIONS_ENABLED')) {
const poller = new Poller(this.ethImpl, logger.child({ name: `poller` }), register);
this.subImpl = new SubscriptionController(poller, logger.child({ name: `subscr-ctrl` }), register);
}

this.initOperatorMetric(this.clientMain, this.mirrorNodeClient, logger, register);

this.populatePreconfiguredSpendingPlans().then();
Expand Down Expand Up @@ -292,10 +278,6 @@ export class Relay {
return this.ethImpl;
}

subs(): Subs | undefined {
return this.subImpl;
}

mirrorClient(): MirrorNodeClient {
return this.mirrorNodeClient;
}
Expand Down
53 changes: 16 additions & 37 deletions packages/ws-server/src/controllers/eth_subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
// SPDX-License-Identifier: Apache-2.0

import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services';
import { predefined, Relay } from '@hashgraph/json-rpc-relay/dist';
import { validateSubscribeEthLogsParams } from '../utils/validators';
import constants from '@hashgraph/json-rpc-relay/dist/lib/constants';
import { MirrorNodeClient } from '@hashgraph/json-rpc-relay/dist/lib/clients';
import jsonResp from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcResponse';
import { constructValidLogSubscriptionFilter, getMultipleAddressesEnabled } from '../utils/utils';
import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services';
import constants from '@hashgraph/json-rpc-relay/dist/lib/constants';
import { RequestDetails } from '@hashgraph/json-rpc-relay/dist/lib/types';
import { ISharedParams } from './index';
import { IJsonRpcRequest } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/IJsonRpcRequest';
import { IJsonRpcResponse } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/IJsonRpcResponse';
import jsonResp from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcResponse';
import { Context } from 'koa';
import { Logger } from 'pino';

import { getSubscriptionController } from '../subscriptionsManager';
import { constructValidLogSubscriptionFilter, getMultipleAddressesEnabled } from '../utils/utils';
import { validateSubscribeEthLogsParams } from '../utils/validators';
import { ISharedParams } from './index';
/**
* Subscribes to new block headers (newHeads) events and returns the response and subscription ID.
* @param {any} filters - The filters object specifying criteria for the subscription.
Expand All @@ -23,16 +24,12 @@ import { Logger } from 'pino';
* @param {Logger} logger - The logger object used for logging subscription information.
* @returns {string | undefined} Returns the subscription ID.
*/
const subscribeToNewHeads = (
filters: any,
ctx: Context,
event: string,
relay: Relay,
logger: Logger,
): string | undefined => {
const subscriptionId = relay.subs()?.subscribe(ctx.websocket, event, filters);
logger.info(`Subscribed to newHeads, subscriptionId: ${subscriptionId}`);
return subscriptionId;
const subscribeToNewHeads = (filters: any, ctx: Context, event: string, logger: Logger): string | undefined => {
const subscriptionController = getSubscriptionController();

const id = subscriptionController?.subscribe(ctx.websocket, event, filters);
logger.info(`Subscribed to newHeads, subscriptionId: ${id}`);
return id;
};

/**
Expand Down Expand Up @@ -65,7 +62,7 @@ const handleEthSubscribeNewHeads = (
throw predefined.UNSUPPORTED_METHOD;
}

const subscriptionId = subscribeToNewHeads(filters, ctx, event, relay, logger);
const subscriptionId = subscribeToNewHeads(filters, ctx, event, logger);
return jsonResp(request.id, null, subscriptionId);
};

Expand Down Expand Up @@ -101,8 +98,8 @@ const handleEthSubscribeLogs = async (
) {
throw predefined.INVALID_PARAMETER('filters.address', 'Only one contract address is allowed');
}

const subscriptionId = relay.subs()?.subscribe(ctx.websocket, event, validFiltersObject);
const subscriptionController = getSubscriptionController();
const subscriptionId = subscriptionController?.subscribe(ctx.websocket, event, validFiltersObject);
return jsonResp(request.id, null, subscriptionId);
};

Expand Down Expand Up @@ -151,21 +148,3 @@ export const handleEthSubscribe = async ({

return response;
};

/**
* Handles unsubscription requests for on-chain events.
* Unsubscribes the WebSocket from the specified subscription ID and returns the response.
* @param {object} args - An object containing the function parameters as properties.
* @param {Context} args.ctx - The context object containing information about the WebSocket connection.
* @param {any[]} args.params - The parameters of the unsubscription request.
* @param {IJsonRpcRequest} args.request - The request object received from the client.
* @param {Relay} args.relay - The relay object used for managing WebSocket subscriptions.
* @param {ConnectionLimiter} args.limiter - The limiter object used for rate limiting WebSocket connections.
* @returns {IJsonRpcResponse} Returns the response to the unsubscription request.
*/
export const handleEthUnsubscribe = ({ ctx, params, request, relay, limiter }: ISharedParams): IJsonRpcResponse => {
const subId = params[0];
const unsubbedCount = relay.subs()?.unsubscribe(ctx.websocket, subId);
limiter.decrementSubs(ctx, unsubbedCount);
return jsonResp(request.id, null, unsubbedCount !== 0);
};
25 changes: 25 additions & 0 deletions packages/ws-server/src/controllers/eth_unsubscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// SPDX-License-Identifier: Apache-2.0
import { IJsonRpcResponse } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/IJsonRpcResponse';
import jsonResp from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcResponse';

import { getSubscriptionController } from '../subscriptionsManager';
import { ISharedParams } from '.';

/**
* Handles unsubscription requests for on-chain events.
* Unsubscribes the WebSocket from the specified subscription ID and returns the response.
* @param {object} args - An object containing the function parameters as properties.
* @param {Context} args.ctx - The context object containing information about the WebSocket connection.
* @param {any[]} args.params - The parameters of the unsubscription request.
* @param {IJsonRpcRequest} args.request - The request object received from the client.
* @param {Relay} args.relay - The relay object used for managing WebSocket subscriptions.
* @param {ConnectionLimiter} args.limiter - The limiter object used for rate limiting WebSocket connections.
* @returns {IJsonRpcResponse} Returns the response to the unsubscription request.
*/
export const handleEthUnsubscribe = ({ ctx, params, request, limiter }: ISharedParams): IJsonRpcResponse => {
const subscriptionsController = getSubscriptionController();
const subId = params[0];
const unsubbedCount = subscriptionsController?.unsubscribe(ctx.websocket, subId);
limiter.decrementSubs(ctx, unsubbedCount);
return jsonResp(request.id, null, unsubbedCount !== 0);
};
25 changes: 14 additions & 11 deletions packages/ws-server/src/controllers/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
// SPDX-License-Identifier: Apache-2.0

import { WS_CONSTANTS } from '../utils/constants';
import WsMetricRegistry from '../metrics/wsMetricRegistry';
import ConnectionLimiter from '../metrics/connectionLimiter';
import { Validator } from '@hashgraph/json-rpc-server/dist/validator';
import { handleEthSubscribe, handleEthUnsubscribe } from './eth_subscribe';
import { JsonRpcError, predefined, Relay } from '@hashgraph/json-rpc-relay/dist';
import { MirrorNodeClient } from '@hashgraph/json-rpc-relay/dist/lib/clients';
import jsonResp from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcResponse';
import { paramRearrangementMap, validateJsonRpcRequest, verifySupportedMethod } from '../utils/utils';
import { RequestDetails } from '@hashgraph/json-rpc-relay/dist/lib/types';
import { IJsonRpcRequest } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/IJsonRpcRequest';
import { IJsonRpcResponse } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/IJsonRpcResponse';
import {
InvalidRequest,
IPRateLimitExceeded,
MethodNotFound,
} from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcError';
import { RequestDetails } from '@hashgraph/json-rpc-relay/dist/lib/types';
import { Logger } from 'pino';
import { IJsonRpcRequest } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/IJsonRpcRequest';
import jsonResp from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcResponse';
import { Validator } from '@hashgraph/json-rpc-server/dist/validator';
import Koa from 'koa';
import { IJsonRpcResponse } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/IJsonRpcResponse';
import { Logger } from 'pino';

import ConnectionLimiter from '../metrics/connectionLimiter';
import WsMetricRegistry from '../metrics/wsMetricRegistry';
import { WS_CONSTANTS } from '../utils/constants';
import { paramRearrangementMap, validateJsonRpcRequest, verifySupportedMethod } from '../utils/utils';
import { handleEthSubscribe } from './eth_subscribe';
import { handleEthUnsubscribe } from './eth_unsubscribe';

export type ISharedParams = {
request: IJsonRpcRequest;
Expand Down Expand Up @@ -110,6 +112,7 @@ export const getRequestResult = async (
requestDetails: RequestDetails,
): Promise<any> => {
// Extract the method and parameters from the received request
// eslint-disable-next-line
let { method, params } = request;

// support go-ethereum client by turning undefined into empty array
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
// SPDX-License-Identifier: Apache-2.0

import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services';
import { generateRandomHex } from '@hashgraph/json-rpc-relay/dist/formatters';
import crypto from 'crypto';
import { Context } from 'koa';
import LRU from 'lru-cache';
import { Logger } from 'pino';
import { Counter, Histogram, Registry } from 'prom-client';

import { generateRandomHex } from '../formatters';
import { Subs } from '../index';
import constants from './constants';
import { Poller } from './poller';
import { Poller } from '../poller';

export interface Subscriber {
connection: any;
Expand All @@ -19,7 +18,7 @@ export interface Subscriber {

const CACHE_TTL = ConfigService.get('WS_CACHE_TTL');

export class SubscriptionController implements Subs {
export class SubscriptionController {
private poller: Poller;
private logger: Logger;
private subscriptions: { [key: string]: Subscriber[] };
Expand Down Expand Up @@ -63,16 +62,16 @@ export class SubscriptionController implements Subs {
});
}

createHash(data) {
private createHash(data: string) {
return crypto.createHash('sha256').update(data.toString()).digest('hex');
}

// Generates a random 16 byte hex string
generateId() {
public generateId() {
return generateRandomHex();
}

subscribe(connection, event: string, filters?: {}) {
public subscribe(connection: Context, event: string, filters?: {}) {
let tag: any = { event };
if (filters && Object.keys(filters).length) {
tag.filters = filters;
Expand Down Expand Up @@ -110,7 +109,7 @@ export class SubscriptionController implements Subs {
return subId;
}

unsubscribe(connection, subId?: string) {
public unsubscribe(connection, subId?: string) {
const { id } = connection;

if (subId) {
Expand Down Expand Up @@ -148,7 +147,7 @@ export class SubscriptionController implements Subs {
return subCount;
}

notifySubscribers(tag, data) {
public notifySubscribers(tag, data) {
if (this.subscriptions[tag] && this.subscriptions[tag].length) {
this.subscriptions[tag].forEach((sub) => {
const subscriptionData = {
Expand Down
Loading