Skip to content

Commit 0eaa052

Browse files
fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! feat(indexer): add streaming support for events and transactions
1 parent 81c226c commit 0eaa052

File tree

1 file changed

+63
-33
lines changed

1 file changed

+63
-33
lines changed

crates/iota-indexer/src/stream.rs

Lines changed: 63 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ use crate::{
2828

2929
/// Buffer size used in the bounded channels.
3030
const BUFFER_SIZE: usize = 1000;
31+
/// Transaction batch size to send to subscribers.
32+
const TRANSACTION_BATCH_SIZE: i64 = 50;
3133
/// Postgres Notify channel name.
3234
const CHANNEL_NAME: &str = "checkpoint_committed";
3335

@@ -263,49 +265,77 @@ impl IndexerStreamer {
263265
event_tx: broadcast::Sender<StoredEvent>,
264266
transaction_tx: broadcast::Sender<StoredTransaction>,
265267
) -> Result<(), IndexerError> {
266-
// Batch checkpoints in smaller chunks to prevent subscriber overload.
267-
// Large batches (1000 checkpoints = ~4000-5000 transactions) would cause
268-
// immediate subscriber lag. Smaller batches (~10 checkpoints = ~40-50 tx) keep
269-
// pace.
270-
let buffer_size = BUFFER_SIZE / 100;
271268
// create a stream from the connection that forwards messages to the channel.
272-
let mut stream =
273-
stream::poll_fn(move |cx| connection.poll_message(cx)).ready_chunks(buffer_size);
269+
let mut stream = stream::poll_fn(move |cx| connection.poll_message(cx)).ready_chunks(10);
274270

275271
while let Some(messages) = stream.next().await {
276-
let mut filtered_messages = Self::filter_checkpoint_notifications(messages);
277-
278-
let first = filtered_messages.next().transpose()?;
279-
let last = filtered_messages.last().transpose()?;
280-
281-
if let Some((first, last)) = first.map(|f| (f, last.unwrap_or(f))) {
282-
debug!(notification = ?last);
283-
284-
let instant = Instant::now();
285-
let transactions: Vec<StoredTransaction> = indexer_reader
286-
.spawn_blocking(move |this| {
287-
this.multi_get_transactions_by_sequence_numbers_range(
288-
first.min_tx_sequence_number,
289-
last.max_tx_sequence_number,
290-
)
291-
})
292-
.await?;
272+
if let Some((min_tx_sequence_number, max_tx_sequence_number)) =
273+
Self::resolve_tx_bounds(messages)?
274+
{
275+
let mut start = min_tx_sequence_number;
276+
277+
while start <= max_tx_sequence_number {
278+
let end = (start + TRANSACTION_BATCH_SIZE - 1).min(max_tx_sequence_number);
293279

294-
let duration = instant.elapsed();
295-
debug!(
296-
"transactions query took: {duration:?}, tx: {}",
297-
transactions.len()
298-
);
280+
Self::process_transaction_batch(
281+
start,
282+
end,
283+
&indexer_reader,
284+
&event_tx,
285+
&transaction_tx,
286+
)
287+
.await?;
299288

300-
let instant = Instant::now();
301-
Self::publish_tx_and_events(transactions, &event_tx, &transaction_tx).await?;
302-
let duration = instant.elapsed();
303-
debug!("broadcast data took: {duration:?}");
289+
start = end + 1;
290+
}
304291
}
305292
}
306293
Ok(())
307294
}
308295

296+
fn resolve_tx_bounds(
297+
messages: Vec<Result<AsyncMessage, tokio_postgres::Error>>,
298+
) -> Result<Option<(i64, i64)>, IndexerError> {
299+
let mut filtered_messages = Self::filter_checkpoint_notifications(messages);
300+
301+
let first = filtered_messages.next().transpose()?;
302+
let last = filtered_messages.last().transpose()?;
303+
304+
Ok(first.map(|f| {
305+
(
306+
f.min_tx_sequence_number,
307+
last.unwrap_or(f).max_tx_sequence_number,
308+
)
309+
}))
310+
}
311+
312+
async fn process_transaction_batch(
313+
start: i64,
314+
end: i64,
315+
indexer_reader: &IndexerReader,
316+
event_tx: &broadcast::Sender<StoredEvent>,
317+
transaction_tx: &broadcast::Sender<StoredTransaction>,
318+
) -> Result<(), IndexerError> {
319+
let instant = Instant::now();
320+
let transactions: Vec<StoredTransaction> = indexer_reader
321+
.spawn_blocking(move |this| {
322+
this.multi_get_transactions_by_sequence_numbers_range(start, end)
323+
})
324+
.await?;
325+
326+
debug!(
327+
"transactions query took: {:?}, tx: {}",
328+
instant.elapsed(),
329+
transactions.len()
330+
);
331+
332+
let instant = Instant::now();
333+
Self::publish_tx_and_events(transactions, event_tx, transaction_tx).await?;
334+
debug!("broadcast data took: {:?}", instant.elapsed());
335+
336+
Ok(())
337+
}
338+
309339
async fn publish_tx_and_events(
310340
transactions: Vec<StoredTransaction>,
311341
event_tx: &broadcast::Sender<StoredEvent>,

0 commit comments

Comments
 (0)