diff --git a/scala/api/src/main/scala/com/myodov/unicherrygarden/api/dlt/EthereumTransaction.scala b/scala/api/src/main/scala/com/myodov/unicherrygarden/api/dlt/EthereumTransaction.scala index 0360c7ea..abf760d1 100644 --- a/scala/api/src/main/scala/com/myodov/unicherrygarden/api/dlt/EthereumTransaction.scala +++ b/scala/api/src/main/scala/com/myodov/unicherrygarden/api/dlt/EthereumTransaction.scala @@ -27,7 +27,7 @@ class EthereumTransaction( val to: Option[String], val gas: BigInt, val gasPrice: BigInt, - val nonce: Int, + val nonce: Int, // account nonce max value 2^64 - 2: TODO: https://github.com/ethereum/go-ethereum/pull/23853 val value: BigInt ) { require(txhash != null && EthUtils.Hashes.isValidTransactionHash(txhash), txhash) diff --git a/scala/connectors/ethereum_rpc_connector/src/main/scala/com/myodov/unicherrygarden/connectors/AbstractEthereumNodeConnector.scala b/scala/connectors/ethereum_rpc_connector/src/main/scala/com/myodov/unicherrygarden/connectors/AbstractEthereumNodeConnector.scala index e65372f4..fd0fd85b 100644 --- a/scala/connectors/ethereum_rpc_connector/src/main/scala/com/myodov/unicherrygarden/connectors/AbstractEthereumNodeConnector.scala +++ b/scala/connectors/ethereum_rpc_connector/src/main/scala/com/myodov/unicherrygarden/connectors/AbstractEthereumNodeConnector.scala @@ -5,6 +5,9 @@ import com.myodov.unicherrygarden.api.dlt import com.myodov.unicherrygarden.api.dlt.{EthereumBlock, EthereumMinedTransaction} import com.myodov.unicherrygarden.ethereum.EthUtils +import scala.concurrent.duration._ +import scala.language.postfixOps + /** Any implementation of Ethereum node connector, no matter of the underlying network mechanism. */ abstract class AbstractEthereumNodeConnector(protected[this] val nodeUrl: String) @@ -18,6 +21,9 @@ trait Web3ReadOperations { * and the number of the last block synced by this Ethereum node (`eth.blockNumber`), * simultaneously in a single call. * + * @note using Int for block number should be fine up to 2B blocks; + * it must be fixed in about 1657 years. + * * @return The option of the tuple with two elements: *
    *
  1. the data about the syncing process (`eth.syncing`);
  2. @@ -26,7 +32,7 @@ trait Web3ReadOperations { * The Option is empty if the data could not be received * (probably due to some network error). */ - def ethSyncingBlockNumber: Option[(SyncingStatusResult, BigInt)] + def ethSyncingBlockNumber: Option[(SyncingStatusResult, Int)] /** Read the block from Ethereum node (by the block number), returning all parseable data. * @@ -153,7 +159,10 @@ trait Web3ReadOperations { private[connectors] def createNotSyncing(): SyncingStatusResult = new SyncingStatusResult(None) } +} +object AbstractEthereumNodeConnector { + val NETWORK_TIMEOUT: FiniteDuration = 10 seconds } private object Web3ReadOperations { diff --git a/scala/connectors/ethereum_rpc_connector/src/main/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeGraphQLConnector.scala b/scala/connectors/ethereum_rpc_connector/src/main/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeGraphQLConnector.scala new file mode 100644 index 00000000..eae5ce43 --- /dev/null +++ b/scala/connectors/ethereum_rpc_connector/src/main/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeGraphQLConnector.scala @@ -0,0 +1,199 @@ +package com.myodov.unicherrygarden.connectors + +import java.time.Instant + +import akka.actor.ActorSystem +import caliban.client.CalibanClientError +import com.myodov.unicherrygarden.api.dlt +import com.myodov.unicherrygarden.connectors.graphql.{BlockBasic, BlockBasicView, TransactionFullView} +import com.typesafe.scalalogging.LazyLogging +import sttp.capabilities +import sttp.capabilities.akka.AkkaStreams +import sttp.client3.akkahttp.AkkaHttpBackend +import sttp.client3.{Response, SttpBackend, UriContext} +import sttp.model.Uri + +import scala.concurrent.{Await, Future} +import scala.util.control.NonFatal + +/** Connector that communicates with a single Ethereum node using GraphQL (via Caliban library). */ +class EthereumSingleNodeGraphQLConnector(nodeUrl: String, + preferredActorSystem: Option[ActorSystem] = None) + extends AbstractEthereumNodeConnector(nodeUrl) + with Web3ReadOperations + with LazyLogging { + + override def toString: String = s"EthereumSingleNodeGraphQLConnector($nodeUrl)" + + protected val graphQLUri: Uri = uri"$nodeUrl/graphql" + + /** Backend used for sending out queries. */ + protected val sttpBackend: SttpBackend[Future, AkkaStreams with capabilities.WebSockets] = + preferredActorSystem match { + case None => AkkaHttpBackend() + case Some(actorSystem) => AkkaHttpBackend.usingActorSystem(actorSystem) + } + + override def ethSyncingBlockNumber: Option[(SyncingStatusResult, Int)] = { + import caliban.Geth._ + + val query = Query.syncing { + SyncState.view + } + + val rq = query.toRequest(graphQLUri) + try { + val value: Response[Either[CalibanClientError, Option[SyncState.SyncStateView]]] = + Await.result(rq.send(sttpBackend), AbstractEthereumNodeConnector.NETWORK_TIMEOUT) + + value.body match { + case Left(err) => + logger.error(s"Error for GraphQL querying sync state", err) + None + case Right(optSyncing) => + // If optSyncing is None, it means the network request failed. + // So it’s a `map`. + val maybeResult = optSyncing.map { syncing => + val current = Math.toIntExact(syncing.currentBlock) + val highest = Math.toIntExact(syncing.highestBlock) + ( + SyncingStatusResult.createSyncing( + currentBlock = current, + highestBlock = highest + ), + highest + ) + } + maybeResult + case other => + logger.error(s"Unhandled GraphQL response for GraphQL querying sync state: $other") + None + } + } catch { + case NonFatal(e) => + logger.error(s"Some nonfatal error happened during GraphQL querying sync state", e) + None + } + } + + override def readBlock(blockNumber: BigInt): Option[(dlt.EthereumBlock, Seq[dlt.EthereumMinedTransaction])] = { + import caliban.Geth._ + + val query = Query.block(number = Some(blockNumber.longValue)) { + BlockBasic.view + } + + val rq = query.toRequest(graphQLUri) + + try { + val value: Response[Either[CalibanClientError, Option[BlockBasicView]]] = + Await.result(rq.send(sttpBackend), AbstractEthereumNodeConnector.NETWORK_TIMEOUT) + + value.body match { + case Left(err) => + logger.error(s"Error for GraphQL querying block $blockNumber", err) + None + case Right(optBlockBasic) => + // This is a legit response; but it may have no contents. + // For None, return None; for Some return a result,... hey it’s a map! + optBlockBasic.map { blockBasic => + // Validate block + { + // Different validations depending on whether parent is Some(block) or None: + // “parent is absent” may happen only on the block 0; + // “parent is not absent” implies the parent block has number lower by one. + require(blockBasic.parent match { + case None => blockBasic.number == 0 + case Some(parentBlock) => parentBlock.number == blockBasic.number - 1 + }, + blockBasic) + require( + blockBasic.transactions match { + // If the transactions are not available at all – that’s legit + case None => true + // If the transactions are available - all of them must refer to the same block + case Some(trs) => trs.forall { tr => + // Inner block must refer to the outer block + (tr.block match { + case Some(innerBlock) => innerBlock == blockBasic.asMinimalBlock + case None => false + }) && + // All inner logs must refer to the outer transaction + (tr.logs match { + // If there are no logs at all, that’s okay + case None => true + // But if there are some logs, all of them must refer to the same transaction + case Some(logs) => logs.forall(_.transaction == tr.asMinimalTransaction) + }) + } + }, + blockBasic + ) + } + + val block = dlt.EthereumBlock( + number = blockBasic.number.toInt, + hash = blockBasic.hash, + parentHash = blockBasic.parent match { + // We need some custom handling of parent + // to make it compatible with RPC/block explorers + case None => Some("0x0000000000000000000000000000000000000000000000000000000000000000") + case Some(parent) => Some(parent.hash) + }, + timestamp = Instant.ofEpochSecond(blockBasic.timestamp) + ) + val transactions = blockBasic.transactions match { + case None => Seq() + case Some(transactions) => transactions.map { (tr: TransactionFullView) => + dlt.EthereumMinedTransaction( + // *** Before-mined transaction *** + txhash = tr.hash, + from = tr.from.address, + to = tr.to.map(_.address), // Option(nullable) + gas = tr.gas, + gasPrice = tr.gasPrice, + nonce = Math.toIntExact(tr.nonce), + value = tr.value, + // *** Mined transaction *** + // "status" – EIP 658, since Byzantium fork + // using Option(nullable) + status = tr.status.map(Math.toIntExact), // Option[Long] to Option[Int] + blockNumber = tr.block.get.number, // block must exist! + transactionIndex = tr.index.get, // transaction must exist! + gasUsed = tr.gasUsed.get, // presumed non-null if mined + effectiveGasPrice = tr.effectiveGasPrice.get, // presumed non-null if mined + cumulativeGasUsed = tr.cumulativeGasUsed.get, // presumed non-null if mined + txLogs = tr.logs match { + case None => Seq.empty + case Some(logs) => logs.map { log => + dlt.EthereumTxLog( + logIndex = log.index, + address = log.account.address, + topics = log.topics, + data = log.data + ) + } + } + ) + } + } + (block, transactions) + } + case other => + logger.error(s"Unhandled GraphQL response for block $blockNumber: $other") + None + } + } catch { + case NonFatal(e) => + logger.error(s"Some nonfatal error happened during GraphQL query for $blockNumber", e) + None + } + } +} + +/** Connector that handles a connection to single Ethereum node via RPC, and communicates with it. */ +object EthereumSingleNodeGraphQLConnector { + @inline def apply(nodeUrl: String, + preferredActorSystem: Option[ActorSystem] = None): EthereumSingleNodeGraphQLConnector = + new EthereumSingleNodeGraphQLConnector(nodeUrl, preferredActorSystem) +} diff --git a/scala/connectors/ethereum_rpc_connector/src/main/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeJsonRpcConnector.scala b/scala/connectors/ethereum_rpc_connector/src/main/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeJsonRpcConnector.scala index 287b02cc..6321c1ee 100644 --- a/scala/connectors/ethereum_rpc_connector/src/main/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeJsonRpcConnector.scala +++ b/scala/connectors/ethereum_rpc_connector/src/main/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeJsonRpcConnector.scala @@ -3,9 +3,7 @@ package com.myodov.unicherrygarden.connectors import java.time.Instant import java.util.concurrent.TimeUnit -import caliban.client.CalibanClientError import com.myodov.unicherrygarden.api.dlt -import com.myodov.unicherrygarden.connectors.graphql._ import com.typesafe.scalalogging.LazyLogging import org.web3j.protocol.Web3j import org.web3j.protocol.core.DefaultBlockParameterNumber @@ -13,8 +11,6 @@ import org.web3j.protocol.core.methods.response.EthBlock.TransactionObject import org.web3j.protocol.core.methods.response._ import org.web3j.protocol.http.HttpService import org.web3j.utils.Numeric.decodeQuantity -import sttp.client3._ -import sttp.client3.akkahttp.AkkaHttpBackend import scala.concurrent.ExecutionContext.Implicits._ import scala.concurrent.duration._ @@ -22,7 +18,6 @@ import scala.concurrent.{Await, Future} import scala.jdk.CollectionConverters._ import scala.jdk.FutureConverters._ import scala.jdk.OptionConverters._ -import scala.language.postfixOps import scala.util.control.NonFatal /** Connector that communicates with a single Ethereum node using JSON-RPC (via Web3J library). */ @@ -31,7 +26,7 @@ class EthereumSingleNodeJsonRpcConnector(nodeUrl: String) with Web3ReadOperations with LazyLogging { - override def toString: String = s"EthereumWeb3jSingleNodeConnector($nodeUrl)" + override def toString: String = s"EthereumSingleNodeJsonRpcConnector($nodeUrl)" private[this] var web3j: Web3j = rebuildWeb3j() @@ -69,9 +64,9 @@ class EthereumSingleNodeJsonRpcConnector(nodeUrl: String) } } - private[this] def ethBlockNumber: Option[BigInt] = { + private[this] def ethBlockNumber: Option[Int] = { try { - Some(web3j.ethBlockNumber.send.getBlockNumber) + Some(web3j.ethBlockNumber.send.getBlockNumber.intValueExact) } catch { case NonFatal(e) => { logger.error("Cannot call eth.blockNumber!", e) @@ -80,7 +75,7 @@ class EthereumSingleNodeJsonRpcConnector(nodeUrl: String) } } - override def ethSyncingBlockNumber: Option[(SyncingStatusResult, BigInt)] = + override def ethSyncingBlockNumber: Option[(SyncingStatusResult, Int)] = ethSyncing zip ethBlockNumber /** Get the Ethereum data for a block, by its number; the data is returned in Web3j-style classes. @@ -373,103 +368,15 @@ class EthereumSingleNodeJsonRpcConnector(nodeUrl: String) //// val resultList = FunctionReturnDecoder.decode(l.getData, Erc20TransferEvent.eventNonIndexedParametersJava).asScala.toList //// val transferAmountType = resultList(0) //// val transferAmount = transferAmountType.getValue - - /** Using GraphQL, read the block from Ethereum node (by the block number), filtered for specific addresses. - * - * @param blockNumber what block to read (by its number). - * @param addressesOfInterest list of address hashes (all lowercased); only these addresses are returned. - */ - def readBlockGraphQL(blockNumber: BigInt, - addressesOfInterest: Set[String]): Option[(dlt.EthereumBlock, Seq[dlt.EthereumMinedTransaction])] = { - import caliban.Geth._ - - val query = Query.block(number = Some(blockNumber.longValue)) { - BlockBasic.view - } - - val rq = query.toRequest(uri"$nodeUrl/graphql") - - // val backend = AkkaHttpBackend.usingActorSystem() - val backend = AkkaHttpBackend() - - try { - val value: Response[Either[CalibanClientError, Option[BlockBasicView]]] = - Await.result(rq.send(backend), EthereumSingleNodeJsonRpcConnector.NETWORK_TIMEOUT) - - value.body match { - case Left(err) => - logger.error(s"Error for GraphQL querying block $blockNumber", err) - None - case Right(optBlockBasic) => - // This is a legit response; but it may have no contents. - // For None, return None; for Some return a result,... hey it’s a flatMap! - optBlockBasic.flatMap { blockBasic => - // Validate block - { - // Different validations depending on whether parent is Some(block) or None: - // “parent is absent” may happen only on the block 0; - // “parent is not absent” implies the parent block has number lower by one. - require(blockBasic.parent match { - case None => blockBasic.number == 0 - case Some(parentBlock) => parentBlock.number == blockBasic.number - 1 - }, - blockBasic) - require( - blockBasic.transactions match { - // If the transactions are not available at all – that’s legit - case None => true - // If the transactions are available - all of them must refer to the same block - case Some(trs) => trs.forall { tr => - // Inner block must refer to the outer block - (tr.block match { - case Some(innerBlock) => innerBlock == blockBasic.asMinimalBlock - case None => false - }) && - // All inner logs must refer to the outer transaction - (tr.logs match { - // If there are no logs at all, that’s okay - case None => true - // But if there are some logs, all of them must refer to the same transaction - case Some(logs) => logs.forall(_.transaction == tr.asMinimalTransaction) - }) - } - }, - blockBasic - ) - } - - System.err.println(s"Received block $blockBasic") - - val block = dlt.EthereumBlock( - number = blockBasic.number.toInt, - hash = blockBasic.hash, - parentHash = Some(blockBasic.parent.get.hash), - timestamp = Instant.ofEpochSecond(blockBasic.timestamp) - ) - val transactions = Seq.empty[dlt.EthereumMinedTransaction] - Some((block, transactions)) - } - None - case other => - logger.error(s"Unhandled GraphQL response for block $blockNumber: $other") - None - } - } catch { - case NonFatal(e) => - logger.error(s"Some nonfatar error happened during GraphQL query for $blockNumber", e) - None - } - } } /** Connector that handles a connection to single Ethereum node via RPC, and communicates with it. */ object EthereumSingleNodeJsonRpcConnector { - val NETWORK_TIMEOUT: FiniteDuration = 10 seconds - - @inline def apply(nodeUrl: String): EthereumSingleNodeJsonRpcConnector = new EthereumSingleNodeJsonRpcConnector(nodeUrl) + @inline def apply(nodeUrl: String): EthereumSingleNodeJsonRpcConnector = + new EthereumSingleNodeJsonRpcConnector(nodeUrl) /** Convert the web3j-provided [[TransactionReceipt]] to the [[Seq]] of [[dlt.EthereumTxLog]]. */ - def getLogsFromTransactionReceipt(trReceipt: TransactionReceipt): Seq[dlt.EthereumTxLog] = trReceipt + private def getLogsFromTransactionReceipt(trReceipt: TransactionReceipt): Seq[dlt.EthereumTxLog] = trReceipt .getLogs .asScala .map((l: Log) => dlt.EthereumTxLog( diff --git a/scala/connectors/ethereum_rpc_connector/src/test/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeGraphQLConnectorSpec.scala b/scala/connectors/ethereum_rpc_connector/src/test/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeGraphQLConnectorSpec.scala index 6a1bf4cb..50bc4418 100644 --- a/scala/connectors/ethereum_rpc_connector/src/test/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeGraphQLConnectorSpec.scala +++ b/scala/connectors/ethereum_rpc_connector/src/test/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeGraphQLConnectorSpec.scala @@ -1,5 +1,5 @@ package com.myodov.unicherrygarden.connectors class EthereumSingleNodeGraphQLConnectorSpec extends AbstractEthereumNodeConnectorSpec { - lazy val sharedConnector = EthereumSingleNodeJsonRpcConnector(config.getStringList("ethereum.rpc_servers").get(0)) + lazy val sharedConnector = EthereumSingleNodeGraphQLConnector(config.getStringList("ethereum.rpc_servers").get(0)) } diff --git a/scala/connectors/ethereum_rpc_connector/src/test/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeJsonRpcConnectorSpec.scala b/scala/connectors/ethereum_rpc_connector/src/test/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeJsonRpcConnectorSpec.scala index 046c8192..9637f1c4 100644 --- a/scala/connectors/ethereum_rpc_connector/src/test/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeJsonRpcConnectorSpec.scala +++ b/scala/connectors/ethereum_rpc_connector/src/test/scala/com/myodov/unicherrygarden/connectors/EthereumSingleNodeJsonRpcConnectorSpec.scala @@ -1,29 +1,5 @@ package com.myodov.unicherrygarden.connectors -import java.time.Instant - -import com.myodov.unicherrygarden.api.dlt.EthereumBlock - class EthereumSingleNodeJsonRpcConnectorSpec extends AbstractEthereumNodeConnectorSpec { lazy val sharedConnector = EthereumSingleNodeJsonRpcConnector(config.getStringList("ethereum.rpc_servers").get(0)) - - "readBlockGraphQL(12345)" should "read and parse a block" in { - assertResult( - Some( // Option of Tuple - ( - EthereumBlock( - 0, - hash = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", - parentHash = Some("0x0000000000000000000000000000000000000000000000000000000000000000"), - Instant.parse("1970-01-01T00:00:00Z") - ), - List() - ) - ) - )( - sharedConnector.readBlockGraphQL( - blockNumber = 10381084, - addressesOfInterest = Set.empty) - ) - } }