Skip to content

Commit

Permalink
feat: optimize/rework init for replaced transactions fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
zachdaniel committed Feb 25, 2019
1 parent 3b56b23 commit 47032c0
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 5 deletions.
63 changes: 63 additions & 0 deletions apps/explorer/lib/explorer/chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Explorer.Chain do
order_by: 2,
order_by: 3,
preload: 2,
select: 2,
subquery: 1,
union_all: 2,
where: 2,
Expand Down Expand Up @@ -1271,6 +1272,37 @@ defmodule Explorer.Chain do
Repo.stream_reduce(query, initial, reducer)
end

@spec stream_pending_transactions(
fields :: [
:block_hash
| :internal_transactions_indexed_at
| :created_contract_code_indexed_at
| :from_address_hash
| :gas
| :gas_price
| :hash
| :index
| :input
| :nonce
| :r
| :s
| :to_address_hash
| :v
| :value
],
initial :: accumulator,
reducer :: (entry :: term(), accumulator -> accumulator)
) :: {:ok, accumulator}
when accumulator: term()
def stream_pending_transactions(fields, initial, reducer) when is_function(reducer, 2) do
query =
Transaction
|> pending_transactions_query()
|> select(^fields)

Repo.stream_reduce(query, initial, reducer)
end

@doc """
Returns a stream of all `t:Explorer.Chain.Block.t/0` `hash`es that are marked as unfetched in
`t:Explorer.Chain.Block.SecondDegreeRelation.t/0`.
Expand Down Expand Up @@ -2265,6 +2297,37 @@ defmodule Explorer.Chain do
|> Repo.all()
end

@spec find_and_update_replaced_transactions([
%{
required(:nonce) => non_neg_integer,
required(:from_address_hash) => Hash.Address.t(),
required(:hash) => Hash.t()
}
]) :: {integer(), nil | [term()]}
def find_and_update_replaced_transactions(transactions, timeout \\ :infinity) do
query =
Enum.reduce(transactions, Transaction, fn %{hash: hash, nonce: nonce, from_address_hash: from_address_hash},
query ->
from(t in query,
or_where:
t.nonce == ^nonce and t.from_address_hash == ^from_address_hash and t.hash != ^hash and
not is_nil(t.block_number)
)
end)

hashes = Enum.map(transactions, & &1.hash)

transactions_to_update =
from(pending in Transaction,
join: duplicate in subquery(query),
on: duplicate.nonce == pending.nonce,
on: duplicate.from_address_hash == pending.from_address_hash,
where: pending.hash in ^hashes
)

Repo.update_all(transactions_to_update, [set: [error: "dropped/replaced", status: :error]], timeout: timeout)
end

@spec update_replaced_transactions([
%{
required(:nonce) => non_neg_integer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
conflict_target: :hash,
on_conflict: on_conflict,
for: Transaction,
returning: ~w(block_number index hash internal_transactions_indexed_at)a,
returning: ~w(block_number index hash internal_transactions_indexed_at block_hash nonce from_address_hash)a,
timeout: timeout,
timestamps: timestamps
)
Expand Down
26 changes: 22 additions & 4 deletions apps/indexer/lib/indexer/replaced_transaction/fetcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ defmodule Indexer.ReplacedTransaction.Fetcher do
@impl BufferedTask
def init(initial, reducer, _) do
{:ok, final} =
Chain.stream_mined_transactions(
[:block_hash, :nonce, :from_address_hash],
[:block_hash, :nonce, :from_address_hash, :hash]
|> Chain.stream_pending_transactions(
initial,
fn transaction_fields, acc ->
transaction_fields
|> entry()
|> pending_entry()
|> reducer.(acc)
end
)
Expand All @@ -75,6 +75,10 @@ defmodule Indexer.ReplacedTransaction.Fetcher do
{block_hash_bytes, nonce, from_address_hash_bytes}
end

defp pending_entry(%{hash: %Hash{bytes: hash}, nonce: nonce, from_address_hash: %Hash{bytes: from_address_hash_bytes}}) do
{:pending, nonce, from_address_hash_bytes, hash}
end

defp params({block_hash_bytes, nonce, from_address_hash_bytes})
when is_integer(nonce) do
{:ok, from_address_hash} = Hash.Address.cast(from_address_hash_bytes)
Expand All @@ -83,6 +87,10 @@ defmodule Indexer.ReplacedTransaction.Fetcher do
%{nonce: nonce, from_address_hash: from_address_hash, block_hash: block_hash}
end

defp pending_params({:pending, nonce, from_address_hash, hash}) do
%{nonce: nonce, from_address_hash: from_address_hash, hash: hash}
end

@impl BufferedTask
@decorate trace(
name: "fetch",
Expand All @@ -94,7 +102,17 @@ defmodule Indexer.ReplacedTransaction.Fetcher do
Logger.debug("fetching replaced transactions for transactions")

try do
entries
{pending, realtime} =
entries
|> Enum.split_with(fn entry ->
match?({:pending, _, _, _}, entry)
end)

pending
|> Enum.map(&pending_params/1)
|> Chain.find_and_update_replaced_transactions()

realtime
|> Enum.map(&params/1)
|> Chain.update_replaced_transactions()

Expand Down
89 changes: 89 additions & 0 deletions apps/indexer/test/indexer/replaced_transaction/fetcher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,95 @@ defmodule Indexer.ReplacedTransaction.FetcherTest do
assert %Transaction{error: nil, status: nil} =
Repo.one!(from(t in Transaction, where: t.hash == ^second_replaced_transaction_hash))
end

test "updates a replaced transaction on init" do
replaced_transaction_hash = "0x2a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61"

address = insert(:address, hash: "0xb7cffe2ac19b9d5705a24cbe14fef5663af905a6")

insert(:transaction,
from_address: address,
nonce: 1,
block_hash: nil,
index: nil,
block_number: nil,
hash: replaced_transaction_hash
)

mined_transaction_hash = "0x1a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61"
block = insert(:block)

mined_transaction =
insert(:transaction,
from_address: address,
nonce: 1,
index: 0,
block_hash: block.hash,
block_number: block.number,
cumulative_gas_used: 1,
gas_used: 1,
hash: mined_transaction_hash
)

second_mined_transaction_hash = "0x3a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61"
second_block = insert(:block)

insert(:transaction,
from_address: address,
nonce: 1,
index: 0,
block_hash: second_block.hash,
block_number: second_block.number,
cumulative_gas_used: 1,
gas_used: 1,
hash: second_mined_transaction_hash
)

second_replaced_transaction_hash = "0x7a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61"
second_address = insert(:address, hash: "0xc7cffe2ac19b9d5705a24cbe14fef5663af905a6")

insert(:transaction,
from_address: second_address,
nonce: 1,
block_hash: nil,
index: nil,
block_number: nil,
hash: second_replaced_transaction_hash
)

insert(:transaction,
from_address: mined_transaction.from_address,
nonce: mined_transaction.nonce
)
|> with_block(block)

ReplacedTransaction.Supervisor.Case.start_supervised!()

# assert :ok =
# ReplacedTransaction.Fetcher.async_fetch([
# %{
# block_hash: mined_transaction.block_hash,
# nonce: mined_transaction.nonce,
# from_address_hash: mined_transaction.from_address_hash
# }
# ])

found_replaced_transaction =
wait(fn ->
Repo.one!(from(t in Transaction, where: t.hash == ^replaced_transaction_hash and t.status == ^:error))
end)

assert found_replaced_transaction.error == "dropped/replaced"

assert %Transaction{error: nil, status: nil} =
Repo.one!(from(t in Transaction, where: t.hash == ^mined_transaction_hash))

assert %Transaction{error: nil, status: nil} =
Repo.one!(from(t in Transaction, where: t.hash == ^second_mined_transaction_hash))

assert %Transaction{error: nil, status: nil} =
Repo.one!(from(t in Transaction, where: t.hash == ^second_replaced_transaction_hash))
end
end

defp wait(producer) do
Expand Down

0 comments on commit 47032c0

Please sign in to comment.