11// Copyright (c) 2025 IOTA Stiftung
22// SPDX-License-Identifier: Apache-2.0
33
4+ use std:: num:: NonZeroUsize ;
5+
46use async_graphql:: { Context , OutputType , ResultExt , SimpleObject , Subscription , Union } ;
57use futures:: { Stream , StreamExt , TryStreamExt , future} ;
68use iota_indexer:: read:: IndexerReader ;
7- use iota_indexer_streaming:: { memory:: InMemory , metrics:: InMemoryStreamMetrics } ;
9+ use iota_indexer_streaming:: {
10+ memory:: { Config , InMemory } ,
11+ metrics:: InMemoryStreamMetrics ,
12+ } ;
813use iota_json_rpc_types:: Filter ;
914use iota_types:: supported_protocol_versions:: Chain ;
1015use prometheus:: Registry ;
@@ -23,6 +28,10 @@ use crate::{
2328
2429mod filter;
2530
31+ /// Represents the channel size of the [`InMemory`] broker.
32+ const BROKER_CHANNEL_SIZE : NonZeroUsize =
33+ NonZeroUsize :: new ( 10_000 ) . expect ( "value should be greater than 0" ) ;
34+
2635/// Notifies that the subscription consumer has fallen behind the live
2736/// subscription stream and missed one or more payloads.
2837#[ derive( SimpleObject , Clone ) ]
@@ -134,9 +143,13 @@ impl GraphQLStream {
134143 indexer_reader : IndexerReader ,
135144 registry : & Registry ,
136145 ) -> Result < Self , Error > {
146+ let config = Config {
147+ channel_buffer_size : BROKER_CHANNEL_SIZE ,
148+ ..Default :: default ( )
149+ } ;
137150 let streamer = InMemory :: new (
138151 db_url,
139- Default :: default ( ) ,
152+ config ,
140153 indexer_reader,
141154 InMemoryStreamMetrics :: new ( registry) ,
142155 )
0 commit comments