Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/datastore/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4524,4 +4524,13 @@ export class PgStore extends BasePgStore {
`;
if (result.count) return result[0];
}

async getStacksBlockCountAtPreviousBurnBlock(): Promise<number> {
const result = await this.sql<{ count: string }[]>`
SELECT COUNT(*) AS count
FROM blocks
WHERE burn_block_height = (SELECT burn_block_height - 1 FROM chain_tip) AND canonical = TRUE
`;
return parseInt(result[0]?.count ?? '0');
}
}
6 changes: 6 additions & 0 deletions src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1780,6 +1780,12 @@ export class PgWriteStore extends PgStore {
});
}

async updateBurnChainBlockHeight(args: { blockHeight: number }): Promise<void> {
await this.sql`
UPDATE chain_tip SET burn_block_height = GREATEST(${args.blockHeight}, burn_block_height)
`;
}

async insertSlotHoldersBatch(sql: PgSqlClient, slotHolders: DbRewardSlotHolder[]): Promise<void> {
const slotValues: RewardSlotHolderInsertValues[] = slotHolders.map(slot => ({
canonical: true,
Expand Down
35 changes: 25 additions & 10 deletions src/event-stream/event-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ async function handleBurnBlockMessage(
burnchainBlockHeight: burnBlockMsg.burn_block_height,
slotHolders: slotHolders,
});
await db.updateBurnChainBlockHeight({ blockHeight: burnBlockMsg.burn_block_height });
}

async function handleMempoolTxsMessage(rawTxs: string[], db: PgWriteStore): Promise<void> {
Expand Down Expand Up @@ -631,18 +632,32 @@ interface EventMessageHandler {
handleNewAttachment(msg: CoreNodeAttachmentMessage[], db: PgWriteStore): Promise<void> | void;
}

function createMessageProcessorQueue(): EventMessageHandler {
function createMessageProcessorQueue(db: PgWriteStore): EventMessageHandler {
// Create a promise queue so that only one message is handled at a time.
const processorQueue = new PQueue({ concurrency: 1 });

let eventTimer: prom.Histogram<'event'> | undefined;
let metrics:
| {
eventTimer: prom.Histogram;
blocksInPreviousBurnBlock: prom.Gauge;
}
| undefined;
if (isProdEnv) {
eventTimer = new prom.Histogram({
name: 'stacks_event_ingestion_timers',
help: 'Event ingestion timers',
labelNames: ['event'],
buckets: prom.exponentialBuckets(50, 3, 10), // 10 buckets, from 50 ms to 15 minutes
});
metrics = {
eventTimer: new prom.Histogram({
name: 'stacks_event_ingestion_timers',
help: 'Event ingestion timers',
labelNames: ['event'],
buckets: prom.exponentialBuckets(50, 3, 10), // 10 buckets, from 50 ms to 15 minutes
}),
blocksInPreviousBurnBlock: new prom.Gauge({
name: 'stacks_blocks_in_previous_burn_block',
help: 'Number of Stacks blocks produced in the previous burn block',
async collect() {
this.set(await db.getStacksBlockCountAtPreviousBurnBlock());
},
}),
};
}

const observeEvent = async (event: string, fn: () => Promise<void>) => {
Expand All @@ -651,7 +666,7 @@ function createMessageProcessorQueue(): EventMessageHandler {
await fn();
} finally {
const elapsedMs = timer.getElapsed();
eventTimer?.observe({ event }, elapsedMs);
metrics?.eventTimer.observe({ event }, elapsedMs);
}
};

Expand Down Expand Up @@ -738,7 +753,7 @@ export async function startEventServer(opts: {
serverPort?: number;
}): Promise<EventStreamServer> {
const db = opts.datastore;
const messageHandler = opts.messageHandler ?? createMessageProcessorQueue();
const messageHandler = opts.messageHandler ?? createMessageProcessorQueue(db);

let eventHost = opts.serverHost ?? process.env['STACKS_CORE_EVENT_HOST'];
const eventPort = opts.serverPort ?? parseInt(process.env['STACKS_CORE_EVENT_PORT'] ?? '', 10);
Expand Down
Loading