Skip to content

Commit

Permalink
Most of EthereumSingleNodeGraphQLConnector is covered by unit tests w…
Browse files Browse the repository at this point in the history
…ith feature equivalence to EthereumSingleNodeJsonRpcConnector;

only handling of transaction logs is remaining.
  • Loading branch information
amyodov committed Dec 16, 2021
1 parent d5b1348 commit 7915b86
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class EthereumTransaction(
val to: Option[String],
val gas: BigInt,
val gasPrice: BigInt,
val nonce: Int,
val nonce: Int, // account nonce max value 2^64 - 2: TODO: https://github.com/ethereum/go-ethereum/pull/23853
val value: BigInt
) {
require(txhash != null && EthUtils.Hashes.isValidTransactionHash(txhash), txhash)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import com.myodov.unicherrygarden.api.dlt
import com.myodov.unicherrygarden.api.dlt.{EthereumBlock, EthereumMinedTransaction}
import com.myodov.unicherrygarden.ethereum.EthUtils

import scala.concurrent.duration._
import scala.language.postfixOps

/** Any implementation of Ethereum node connector, no matter of the underlying network mechanism. */
abstract class AbstractEthereumNodeConnector(protected[this] val nodeUrl: String)

Expand All @@ -18,6 +21,9 @@ trait Web3ReadOperations {
* and the number of the last block synced by this Ethereum node (`eth.blockNumber`),
* simultaneously in a single call.
*
* @note using Int for block number should be fine up to 2B blocks;
* it must be fixed in about 1657 years.
*
* @return The option of the tuple with two elements:
* <ol>
* <li>the data about the syncing process (`eth.syncing`);</li>
Expand All @@ -26,7 +32,7 @@ trait Web3ReadOperations {
* The Option is empty if the data could not be received
* (probably due to some network error).
*/
def ethSyncingBlockNumber: Option[(SyncingStatusResult, BigInt)]
def ethSyncingBlockNumber: Option[(SyncingStatusResult, Int)]

/** Read the block from Ethereum node (by the block number), returning all parseable data.
*
Expand Down Expand Up @@ -153,7 +159,10 @@ trait Web3ReadOperations {
private[connectors] def createNotSyncing(): SyncingStatusResult =
new SyncingStatusResult(None)
}
}

object AbstractEthereumNodeConnector {
val NETWORK_TIMEOUT: FiniteDuration = 10 seconds
}

private object Web3ReadOperations {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package com.myodov.unicherrygarden.connectors

import java.time.Instant

import akka.actor.ActorSystem
import caliban.client.CalibanClientError
import com.myodov.unicherrygarden.api.dlt
import com.myodov.unicherrygarden.connectors.graphql.{BlockBasic, BlockBasicView, TransactionFullView}
import com.typesafe.scalalogging.LazyLogging
import sttp.capabilities
import sttp.capabilities.akka.AkkaStreams
import sttp.client3.akkahttp.AkkaHttpBackend
import sttp.client3.{Response, SttpBackend, UriContext}
import sttp.model.Uri

import scala.concurrent.{Await, Future}
import scala.util.control.NonFatal

/** Connector that communicates with a single Ethereum node using GraphQL (via Caliban library). */
class EthereumSingleNodeGraphQLConnector(nodeUrl: String,
preferredActorSystem: Option[ActorSystem] = None)
extends AbstractEthereumNodeConnector(nodeUrl)
with Web3ReadOperations
with LazyLogging {

override def toString: String = s"EthereumSingleNodeGraphQLConnector($nodeUrl)"

protected val graphQLUri: Uri = uri"$nodeUrl/graphql"

/** Backend used for sending out queries. */
protected val sttpBackend: SttpBackend[Future, AkkaStreams with capabilities.WebSockets] =
preferredActorSystem match {
case None => AkkaHttpBackend()
case Some(actorSystem) => AkkaHttpBackend.usingActorSystem(actorSystem)
}

override def ethSyncingBlockNumber: Option[(SyncingStatusResult, Int)] = {
import caliban.Geth._

val query = Query.syncing {
SyncState.view
}

val rq = query.toRequest(graphQLUri)
try {
val value: Response[Either[CalibanClientError, Option[SyncState.SyncStateView]]] =
Await.result(rq.send(sttpBackend), AbstractEthereumNodeConnector.NETWORK_TIMEOUT)

value.body match {
case Left(err) =>
logger.error(s"Error for GraphQL querying sync state", err)
None
case Right(optSyncing) =>
// If optSyncing is None, it means the network request failed.
// So it’s a `map`.
val maybeResult = optSyncing.map { syncing =>
val current = Math.toIntExact(syncing.currentBlock)
val highest = Math.toIntExact(syncing.highestBlock)
(
SyncingStatusResult.createSyncing(
currentBlock = current,
highestBlock = highest
),
highest
)
}
maybeResult
case other =>
logger.error(s"Unhandled GraphQL response for GraphQL querying sync state: $other")
None
}
} catch {
case NonFatal(e) =>
logger.error(s"Some nonfatal error happened during GraphQL querying sync state", e)
None
}
}

override def readBlock(blockNumber: BigInt): Option[(dlt.EthereumBlock, Seq[dlt.EthereumMinedTransaction])] = {
import caliban.Geth._

val query = Query.block(number = Some(blockNumber.longValue)) {
BlockBasic.view
}

val rq = query.toRequest(graphQLUri)

try {
val value: Response[Either[CalibanClientError, Option[BlockBasicView]]] =
Await.result(rq.send(sttpBackend), AbstractEthereumNodeConnector.NETWORK_TIMEOUT)

value.body match {
case Left(err) =>
logger.error(s"Error for GraphQL querying block $blockNumber", err)
None
case Right(optBlockBasic) =>
// This is a legit response; but it may have no contents.
// For None, return None; for Some return a result,... hey it’s a map!
optBlockBasic.map { blockBasic =>
// Validate block
{
// Different validations depending on whether parent is Some(block) or None:
// “parent is absent” may happen only on the block 0;
// “parent is not absent” implies the parent block has number lower by one.
require(blockBasic.parent match {
case None => blockBasic.number == 0
case Some(parentBlock) => parentBlock.number == blockBasic.number - 1
},
blockBasic)
require(
blockBasic.transactions match {
// If the transactions are not available at all – that’s legit
case None => true
// If the transactions are available - all of them must refer to the same block
case Some(trs) => trs.forall { tr =>
// Inner block must refer to the outer block
(tr.block match {
case Some(innerBlock) => innerBlock == blockBasic.asMinimalBlock
case None => false
}) &&
// All inner logs must refer to the outer transaction
(tr.logs match {
// If there are no logs at all, that’s okay
case None => true
// But if there are some logs, all of them must refer to the same transaction
case Some(logs) => logs.forall(_.transaction == tr.asMinimalTransaction)
})
}
},
blockBasic
)
}

val block = dlt.EthereumBlock(
number = blockBasic.number.toInt,
hash = blockBasic.hash,
parentHash = blockBasic.parent match {
// We need some custom handling of parent
// to make it compatible with RPC/block explorers
case None => Some("0x0000000000000000000000000000000000000000000000000000000000000000")
case Some(parent) => Some(parent.hash)
},
timestamp = Instant.ofEpochSecond(blockBasic.timestamp)
)
val transactions = blockBasic.transactions match {
case None => Seq()
case Some(transactions) => transactions.map { (tr: TransactionFullView) =>
dlt.EthereumMinedTransaction(
// *** Before-mined transaction ***
txhash = tr.hash,
from = tr.from.address,
to = tr.to.map(_.address), // Option(nullable)
gas = tr.gas,
gasPrice = tr.gasPrice,
nonce = Math.toIntExact(tr.nonce),
value = tr.value,
// *** Mined transaction ***
// "status" – EIP 658, since Byzantium fork
// using Option(nullable)
status = tr.status.map(Math.toIntExact), // Option[Long] to Option[Int]
blockNumber = tr.block.get.number, // block must exist!
transactionIndex = tr.index.get, // transaction must exist!
gasUsed = tr.gasUsed.get, // presumed non-null if mined
effectiveGasPrice = tr.effectiveGasPrice.get, // presumed non-null if mined
cumulativeGasUsed = tr.cumulativeGasUsed.get, // presumed non-null if mined
txLogs = tr.logs match {
case None => Seq.empty
case Some(logs) => logs.map { log =>
dlt.EthereumTxLog(
logIndex = log.index,
address = log.account.address,
topics = log.topics,
data = log.data
)
}
}
)
}
}
(block, transactions)
}
case other =>
logger.error(s"Unhandled GraphQL response for block $blockNumber: $other")
None
}
} catch {
case NonFatal(e) =>
logger.error(s"Some nonfatal error happened during GraphQL query for $blockNumber", e)
None
}
}
}

/** Connector that handles a connection to single Ethereum node via RPC, and communicates with it. */
object EthereumSingleNodeGraphQLConnector {
@inline def apply(nodeUrl: String,
preferredActorSystem: Option[ActorSystem] = None): EthereumSingleNodeGraphQLConnector =
new EthereumSingleNodeGraphQLConnector(nodeUrl, preferredActorSystem)
}
Loading

0 comments on commit 7915b86

Please sign in to comment.