Skip to content

Commit

Permalink
fix: validate ts client predicates before registering (#639)
Browse files Browse the repository at this point in the history
Changes the TS client server code so it now checks if a predicate UUID
already exists and is active in the target Chainhook node. If it is, it
reuses it. If it isn't it removes it and registers again.
  • Loading branch information
rafaelcr authored Aug 15, 2024
1 parent fb1e9ac commit 2d08f72
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 40 deletions.
4 changes: 2 additions & 2 deletions components/client/typescript/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/client/typescript/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@hirosystems/chainhook-client",
"version": "1.11.0",
"version": "1.12.0",
"description": "Chainhook TypeScript client",
"main": "./dist/index.js",
"typings": "./dist/index.d.ts",
Expand Down
69 changes: 69 additions & 0 deletions components/client/typescript/src/schemas/predicate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,72 @@ export const PredicateSchema = Type.Composite([
}),
]);
export type Predicate = Static<typeof PredicateSchema>;

export const PredicateExpiredDataSchema = Type.Object({
expired_at_block_height: Type.Integer(),
last_evaluated_block_height: Type.Integer(),
last_occurrence: Type.Optional(Type.Integer()),
number_of_blocks_evaluated: Type.Integer(),
number_of_times_triggered: Type.Integer(),
});
export type PredicateExpiredData = Static<typeof PredicateExpiredDataSchema>;

export const PredicateStatusSchema = Type.Union([
Type.Object({
info: Type.Object({
number_of_blocks_to_scan: Type.Integer(),
number_of_blocks_evaluated: Type.Integer(),
number_of_times_triggered: Type.Integer(),
last_occurrence: Type.Optional(Type.Integer()),
last_evaluated_block_height: Type.Integer(),
}),
type: Type.Literal('scanning'),
}),
Type.Object({
info: Type.Object({
last_occurrence: Type.Optional(Type.Integer()),
last_evaluation: Type.Integer(),
number_of_times_triggered: Type.Integer(),
number_of_blocks_evaluated: Type.Integer(),
last_evaluated_block_height: Type.Integer(),
}),
type: Type.Literal('streaming'),
}),
Type.Object({
info: PredicateExpiredDataSchema,
type: Type.Literal('unconfirmed_expiration'),
}),
Type.Object({
info: PredicateExpiredDataSchema,
type: Type.Literal('confirmed_expiration'),
}),
Type.Object({
info: Type.String(),
type: Type.Literal('interrupted'),
}),
Type.Object({
type: Type.Literal('new'),
}),
]);
export type PredicateStatus = Static<typeof PredicateStatusSchema>;

export const SerializedPredicateSchema = Type.Object({
chain: Type.Union([Type.Literal('stacks'), Type.Literal('bitcoin')]),
uuid: Type.String(),
network: Type.Union([Type.Literal('mainnet'), Type.Literal('testnet')]),
predicate: Type.Any(),
status: PredicateStatusSchema,
enabled: Type.Boolean(),
});
export type SerializedPredicate = Static<typeof SerializedPredicateSchema>;

export const SerializedPredicateResponseSchema = Type.Union([
Type.Object({
status: Type.Literal(404),
}),
Type.Object({
result: SerializedPredicateSchema,
status: Type.Literal(200),
}),
]);
export type SerializedPredicateResponse = Static<typeof SerializedPredicateResponseSchema>;
114 changes: 77 additions & 37 deletions components/client/typescript/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ import { request } from 'undici';
import { logger, PINO_CONFIG } from './util/logger';
import { timeout } from './util/helpers';
import { Payload, PayloadSchema } from './schemas/payload';
import { Predicate, PredicateHeaderSchema, ThenThatHttpPost } from './schemas/predicate';
import {
Predicate,
PredicateHeaderSchema,
SerializedPredicate,
SerializedPredicateResponse,
ThenThatHttpPost,
} from './schemas/predicate';
import { BitcoinIfThisOptionsSchema, BitcoinIfThisSchema } from './schemas/bitcoin/if_this';
import { StacksIfThisOptionsSchema, StacksIfThisSchema } from './schemas/stacks/if_this';

Expand Down Expand Up @@ -104,9 +110,7 @@ export async function buildServer(
callback: OnEventCallback
) {
async function waitForNode(this: FastifyInstance) {
logger.info(
`ChainhookEventObserver connecting to chainhook node at ${chainhookOpts.base_url}...`
);
logger.info(`ChainhookEventObserver looking for chainhook node at ${chainhookOpts.base_url}`);
while (true) {
try {
await request(`${chainhookOpts.base_url}/ping`, { method: 'GET', throwOnError: true });
Expand All @@ -118,16 +122,61 @@ export async function buildServer(
}
}

async function registerPredicates(this: FastifyInstance) {
async function isPredicateActive(predicate: ServerPredicate): Promise<boolean | undefined> {
try {
const result = await request(`${chainhookOpts.base_url}/v1/chainhooks/${predicate.uuid}`, {
method: 'GET',
headers: { accept: 'application/json' },
throwOnError: true,
});
const response = (await result.body.json()) as SerializedPredicateResponse;
if (response.status == 404) return undefined;
if (
response.result.enabled == false ||
response.result.status.type == 'interrupted' ||
response.result.status.type == 'unconfirmed_expiration' ||
response.result.status.type == 'confirmed_expiration'
) {
return false;
}
return true;
} catch (error) {
logger.error(
error,
`ChainhookEventObserver unable to check if predicate ${predicate.uuid} is active`
);
return false;
}
}

async function registerAllPredicates(this: FastifyInstance) {
logger.info(predicates, `ChainhookEventObserver connected to ${chainhookOpts.base_url}`);
if (predicates.length === 0) {
logger.info(`ChainhookEventObserver does not have predicates to register`);
return;
}
const nodeType = serverOpts.node_type ?? 'chainhook';
const path = nodeType === 'chainhook' ? `/v1/chainhooks` : `/v1/observers`;
const registerUrl = `${chainhookOpts.base_url}${path}`;
logger.info(predicates, `ChainhookEventObserver registering predicates at ${registerUrl}`);
for (const predicate of predicates) {
if (nodeType === 'chainhook') {
switch (await isPredicateActive(predicate)) {
case undefined:
// Predicate doesn't exist.
break;
case true:
logger.info(
`ChainhookEventObserver predicate ${predicate.uuid} is already active, skipping registration`
);
continue;
case false:
logger.info(
`ChainhookEventObserver predicate ${predicate.uuid} was being used but is now inactive, removing for re-regristration`
);
await removePredicate(predicate);
}
}
logger.info(`ChainhookEventObserver registering predicate ${predicate.uuid}`);
const thenThat: ThenThatHttpPost = {
http_post: {
url: `${serverOpts.external_base_url}/payload`,
Expand All @@ -144,46 +193,37 @@ export async function buildServer(
headers: { 'content-type': 'application/json' },
throwOnError: true,
});
logger.info(
`ChainhookEventObserver registered '${predicate.name}' predicate (${predicate.uuid})`
);
} catch (error) {
logger.error(error, `ChainhookEventObserver unable to register predicate`);
}
}
}

async function removePredicates(this: FastifyInstance) {
async function removePredicate(predicate: ServerPredicate): Promise<void> {
const nodeType = serverOpts.node_type ?? 'chainhook';
const path =
nodeType === 'chainhook'
? `/v1/chainhooks/${predicate.chain}/${encodeURIComponent(predicate.uuid)}`
: `/v1/observers/${encodeURIComponent(predicate.uuid)}`;
try {
await request(`${chainhookOpts.base_url}${path}`, {
method: 'DELETE',
headers: { 'content-type': 'application/json' },
throwOnError: true,
});
logger.info(`ChainhookEventObserver removed predicate ${predicate.uuid}`);
} catch (error) {
logger.error(error, `ChainhookEventObserver unable to deregister predicate`);
}
}

async function removeAllPredicates(this: FastifyInstance) {
if (predicates.length === 0) {
logger.info(`ChainhookEventObserver does not have predicates to close`);
return;
}
logger.info(`ChainhookEventObserver closing predicates at ${chainhookOpts.base_url}`);
const nodeType = serverOpts.node_type ?? 'chainhook';
const removals = predicates.map(
predicate =>
new Promise<void>((resolve, reject) => {
const path =
nodeType === 'chainhook'
? `/v1/chainhooks/${predicate.chain}/${encodeURIComponent(predicate.uuid)}`
: `/v1/observers/${encodeURIComponent(predicate.uuid)}`;
request(`${chainhookOpts.base_url}${path}`, {
method: 'DELETE',
headers: { 'content-type': 'application/json' },
throwOnError: true,
})
.then(() => {
logger.info(
`ChainhookEventObserver removed '${predicate.name}' predicate (${predicate.uuid})`
);
resolve();
})
.catch(error => {
logger.error(error, `ChainhookEventObserver unable to deregister predicate`);
reject(error);
});
})
);
const removals = predicates.map(predicate => removePredicate(predicate));
await Promise.allSettled(removals);
}

Expand Down Expand Up @@ -242,8 +282,8 @@ export async function buildServer(
if (serverOpts.wait_for_chainhook_node ?? true) {
fastify.addHook('onReady', waitForNode);
}
fastify.addHook('onReady', registerPredicates);
fastify.addHook('onClose', removePredicates);
fastify.addHook('onReady', registerAllPredicates);
fastify.addHook('onClose', removeAllPredicates);

await fastify.register(ChainhookEventObserver);
return fastify;
Expand Down

0 comments on commit 2d08f72

Please sign in to comment.