This is a flow based project to create an event listener on the Provenance blockchain and receive block information.
<dependencies>
<dependency>
<groupId>tech.figure.eventstream</groupId>
<artifactId>es-core</artifactId>
<version>${version}</version>
</dependency>
<dependency>
<groupId>tech.figure.eventstream</groupId>
<artifactId>es-api</artifactId>
<version>${version}</version>
</dependency>
<dependency>
<groupId>tech.figure.eventstream</groupId>
<artifactId>es-api-model</artifactId>
<version>${version}</version>
</dependency>
</dependencies>
In build.gradle
:
implementation 'tech.figure.eventstream:es-core:${version}'
implementation 'tech.figure.eventstream:es-api:${version}'
implementation 'tech.figure.eventstream:es-api-model:${version}'
In build.gradle.kts
:
implementation("tech.figure.eventstream:es-core:${version}")
implementation("tech.figure.eventstream:es-api:${version}")
implementation("tech.figure.eventstream:es-api-model:${version}")
To get started using the provenance event stream library you need to create an httpAdapter that will create both the rpc client and the websocket client to your query node of choice.
The protocol is required on the host value and can be one of http | https | tcp | tcp+tls
.
val host = "https://rpc.test.provenance.io"
val netAdapter = okHttpNetAdapter(host)
With this adapter we can create streams for live data, historical data, metadata, or any combinations.
Historical flows require a fromHeight
parameter where you want your stream to start.
Optionally, you can add toHeight
as an optional parameter. If not supplied the stream will go to current block height.
Get block header flows:
val log = KotlinLogging.logger {}
historicalBlockHeaderFlow(netAdapter, 1, 100)
.onEach { log.info { "oldHeader: ${it.height}" } }
.collect()
Get block data flows:
val log = KotlinLogging.logger {}
historicalBlockDataFlow(netAdapter, 1, 100)
.onEach { log.info { "oldBlock: ${it.height}" } }
.collect()
Live flows require an adapter to decode the JSON responses from the chain.
The project includes a moshi
adapter configured to decode the RPC responses
Get live block headers:
val log = KotlinLogging.logger {}
val decoderAdapter = moshiDecoderAdapter()
liveBlockHeaderFlow(netAdapter, decoderAdapter)
.onEach { log.info { "liveHeader: ${it.height}" } }
.collect()
Get live block datas:
val log = KotlinLogging.logger {}
val decoderAdapter = moshiDecoderAdapter()
liveBlockDataFlow(netAdapter, decoderAdapter)
.onEach { log.info { "liveBlock: $it" } }
.collect()
These flows can also be combined to create historical + live flows
Get block headers:
val log = KotlinLogging.logger {}
// get the current block height from the node
val current = netAdapter.rpcAdapter.getCurrentHeight()!!
val decoderAdapter = moshiDecoderAdapter()
blockHeaderFlow(netAdapter, decoderAdapter, from = current - 1000, to = current)
.onEach { log.info { "received: ${it.height}" } }
.collect()
Get block datas:
val log = KotlinLogging.logger {}
// get the current block height from the node
val current = netAdapter.rpcAdapter.getCurrentHeight()!!
val decoderAdapter = moshiDecoderAdapter()
blockDataFlow(netAdapter, decoderAdapter, from = current - 1000, to = current)
.onEach { log.info { "received: ${it.height}" } }
.collect()
We can additionally subscribe to certain events on the node.
Currently, only MessageType.NewBlock
and MessageType.NewBlockHeader
are supported.
val log = KotlinLogging.logger {}
val decoderAdapter = moshiDecoderAdapter()
nodeEventStream<MessageType.NewBlock>(netAdapter, decoderAdapter)
.onEach { log.info {"liveBlock: $it" } }
.collect()