Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock chain before start mining #1394

Merged
merged 6 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ defmodule Archethic do
GetBalance,
NewTransaction,
Ok,
StartMining
StartMining,
ValidationError
}

alias Archethic.SelfRepair
Expand Down Expand Up @@ -68,13 +69,17 @@ defmodule Archethic do
P2P.authorized_and_available_node?() and shared_secret_synced?() ->
validation_nodes = Mining.get_validation_nodes(tx)

responses = do_send_transaction(tx, validation_nodes, welcome_node_key, contract_context)
responses =
%{already_locked?: already_locked?} =
do_send_transaction(tx, validation_nodes, welcome_node_key, contract_context)

maybe_start_resync(responses)

if forward? and not enough_ack?(responses, length(validation_nodes)),
do: forward_transaction(tx, welcome_node_key, contract_context)

if already_locked?, do: notify_welcome_node(welcome_node_key, address, :already_locked)

forward? ->
forward_transaction(tx, welcome_node_key, contract_context)

Expand Down Expand Up @@ -131,7 +136,8 @@ defmodule Archethic do
%{
ok: 0,
network_chains_resync_needed: false,
p2p_resync_needed: false
p2p_resync_needed: false,
already_locked?: false
},
&reduce_start_mining_responses/2
)
Expand Down Expand Up @@ -215,6 +221,12 @@ defmodule Archethic do
%{acc | network_chains_resync_needed: true, p2p_resync_needed: true}
end

defp reduce_start_mining_responses({:ok, %Error{reason: :already_locked}}, acc) do
# In this case we don't want to forward transaction since one is already being valided.
# But we want to notify user that this new transaction is not being mined
%{acc | ok: acc.ok + 1, already_locked?: true}
end

defp reduce_start_mining_responses(_, acc) do
acc
end
Expand Down Expand Up @@ -249,6 +261,20 @@ defmodule Archethic do

defp get_welcome_node_public_key(_, key), do: key

defp notify_welcome_node(welcome_node_key, address, :already_locked) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
message = %ValidationError{
context: :invalid_transaction,
reason: "Transaction already in mining with different data",
address: address
}

P2P.send_message(welcome_node_key, message)
end)

:ok
end

@doc """
Retrieve the last transaction for a chain from the closest nodes
"""
Expand Down
49 changes: 49 additions & 0 deletions lib/archethic/mining.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ defmodule Archethic.Mining do

alias Archethic.P2P
alias Archethic.P2P.Message
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Message.ReplicationError
alias Archethic.P2P.Message.RequestChainLock

alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.CrossValidationStamp
Expand All @@ -30,6 +32,8 @@ defmodule Archethic.Mining do

@protocol_version 5

@lock_threshold 0.75

def protocol_version, do: @protocol_version

@doc """
Expand Down Expand Up @@ -96,6 +100,51 @@ defmodule Archethic.Mining do
validation_node_public_keys == Enum.map(validation_nodes, & &1.last_public_key)
end

@doc """
Request storage node to lock the mining of this transaction address and hash
"""
@spec request_chain_lock(tx :: Transaction.t()) :: :ok | {:error, :already_locked}
def request_chain_lock(tx = %Transaction{address: address, type: type}) do
storage_nodes =
address
|> Election.storage_nodes(P2P.authorized_and_available_nodes())
|> Enum.filter(&P2P.node_connected?/1)

nb_storage_nodes = length(storage_nodes)

hash =
tx
|> Transaction.to_pending()
|> Transaction.serialize()
|> Crypto.hash()

message = %RequestChainLock{address: address, hash: hash}

aggregated_responses =
Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
storage_nodes,
Neylix marked this conversation as resolved.
Show resolved Hide resolved
&P2P.send_message(&1, message),
max_concurrency: nb_storage_nodes,
timeout: Message.get_timeout(message) + 500,
on_timeout: :kill_task,
ordered: false
)
|> Stream.filter(&match?({:ok, {:ok, _}}, &1))
|> Stream.map(fn {:ok, {:ok, res}} -> res end)
|> Enum.frequencies()

nb_ok = Map.get(aggregated_responses, %Ok{}, 0)
total_response = Map.values(aggregated_responses) |> Enum.sum()

Logger.debug("Received #{nb_ok} lock confirmation on #{total_response}",
transaction_address: Base.encode16(address),
transaction_type: type
)

if nb_ok / total_response >= @lock_threshold, do: :ok, else: {:error, :already_locked}
end

@doc """
Add transaction mining context which built by another cross validation node
"""
Expand Down
100 changes: 100 additions & 0 deletions lib/archethic/mining/chain_lock.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
defmodule Archethic.Mining.ChainLock do
@moduledoc false

alias Archethic.Crypto
alias Archethic.PubSub

use GenServer
require Logger

@vsn 1

def start_link(args \\ [], opts \\ []) do
GenServer.start_link(__MODULE__, args, opts)
end

@spec lock(address :: Crypto.prepended_hash(), tx_hash :: binary()) ::
:ok | {:error, :already_locked}
def lock(address, tx_hash) do
address |> via_tuple() |> GenServer.call({:lock, address, tx_hash})
end

@spec unlock(address :: Crypto.prepended_hash()) :: :ok
def unlock(address) do
address |> via_tuple() |> GenServer.cast({:unlock, address})
end

defp via_tuple(address) do
{:via, PartitionSupervisor, {ChainLockSupervisor, address}}
end

def init(args) do
timeout = Keyword.fetch!(args, :mining_timeout)
{:ok, %{timeout: timeout, addresses_locked: Map.new()}}
end

def handle_call(
{:lock, address, tx_hash},
_from,
state = %{timeout: timeout, addresses_locked: addresses_locked}
) do
case Map.get(addresses_locked, address) do
nil ->
Logger.info("Lock transaction chain", transaction_address: Base.encode16(address))
PubSub.register_to_new_transaction_by_address(address)
timer = Process.send_after(self(), {:unlock, address}, timeout)

new_state = Map.update!(state, :addresses_locked, &Map.put(&1, address, {tx_hash, timer}))

{:reply, :ok, new_state}

{hash, _timer} when hash == tx_hash ->
{:reply, :ok, state}

_ ->
Logger.debug("Received lock with different transaction hash",
transaction_address: Base.encode16(address)
)

{:reply, {:error, :already_locked}, state}
end
end

# Unlock from message UnlockChain
def handle_cast({:unlock, address}, state) do
new_state = Map.update!(state, :addresses_locked, &unlock_address(&1, address))
{:noreply, new_state}
end

# Unlock from self unlock after timeout
def handle_info({:unlock, address}, state) do
new_state = Map.update!(state, :addresses_locked, &unlock_address(&1, address))
{:noreply, new_state}
end

# Unlock from transaction being replicated
def handle_info({:new_transaction, address}, state) do
new_state = Map.update!(state, :addresses_locked, &unlock_address(&1, address))
{:noreply, new_state}
end

# Unlock from transaction being replicated
def handle_info({:new_transaction, address, _type, _timestamp}, state) do
new_state = Map.update!(state, :addresses_locked, &unlock_address(&1, address))
{:noreply, new_state}
end

defp unlock_address(addresses_locked, address) do
PubSub.unregister_to_new_transaction_by_address(address)

case Map.pop(addresses_locked, address) do
{nil, _} ->
addresses_locked

{{_hash, timer}, new_addresses_locked} ->
Process.cancel_timer(timer)
Logger.debug("Unlock transaction chain", transaction_address: Base.encode16(address))
new_addresses_locked
end
end
end
17 changes: 12 additions & 5 deletions lib/archethic/mining/distributed_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
alias Archethic.P2P.Message.ReplicationError
alias Archethic.P2P.Message.ValidateTransaction
alias Archethic.P2P.Message.ValidationError
alias Archethic.P2P.Message.UnlockChain
alias Archethic.P2P.Node

alias Archethic.TransactionChain
Expand Down Expand Up @@ -1158,11 +1159,12 @@ defmodule Archethic.Mining.DistributedWorkflow do
end

defp notify_error(reason, %{
context: %ValidationContext{
welcome_node: welcome_node = %Node{},
transaction: %Transaction{address: tx_address},
pending_transaction_error_detail: pending_error_detail
}
context:
context = %ValidationContext{
welcome_node: welcome_node = %Node{},
transaction: %Transaction{address: tx_address},
pending_transaction_error_detail: pending_error_detail
}
}) do
{error_context, error_reason} =
case reason do
Expand Down Expand Up @@ -1201,5 +1203,10 @@ defmodule Archethic.Mining.DistributedWorkflow do

:ok
end)

# Notify storage nodes to unlock chain
message = %UnlockChain{address: tx_address}

context |> ValidationContext.get_chain_replication_nodes() |> P2P.broadcast_message(message)
end
end
15 changes: 11 additions & 4 deletions lib/archethic/mining/standalone_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ defmodule Archethic.Mining.StandaloneWorkflow do
alias Archethic.P2P.Message.ReplicationError
alias Archethic.P2P.Message.ValidationError
alias Archethic.P2P.Message.ValidateTransaction
alias Archethic.P2P.Message.UnlockChain
alias Archethic.P2P.Node

alias Archethic.TransactionChain
Expand Down Expand Up @@ -213,10 +214,11 @@ defmodule Archethic.Mining.StandaloneWorkflow do
def handle_info(
{:replication_error, reason},
state = %{
context: %ValidationContext{
transaction: %Transaction{address: tx_address},
pending_transaction_error_detail: pending_error_detail
}
context:
context = %ValidationContext{
transaction: %Transaction{address: tx_address},
pending_transaction_error_detail: pending_error_detail
}
}
) do
{error_context, error_reason} =
Expand Down Expand Up @@ -255,6 +257,11 @@ defmodule Archethic.Mining.StandaloneWorkflow do
)
end)

# Notify storage nodes to unlock chain
message = %UnlockChain{address: tx_address}

context |> ValidationContext.get_chain_replication_nodes() |> P2P.broadcast_message(message)

{:stop, :normal, state}
end

Expand Down
13 changes: 12 additions & 1 deletion lib/archethic/mining/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ defmodule Archethic.Mining.Supervisor do

use Supervisor

alias Archethic.Mining.ChainLock

@mining_timeout Application.compile_env!(:archethic, [
Archethic.Mining.DistributedWorkflow,
:global_timeout
])

def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: Archethic.MiningSupervisor)
end
Expand All @@ -13,7 +20,11 @@ defmodule Archethic.Mining.Supervisor do
name: Archethic.Mining.WorkflowRegistry,
keys: :unique,
partitions: System.schedulers_online()},
{DynamicSupervisor, strategy: :one_for_one, name: Archethic.Mining.WorkerSupervisor}
{DynamicSupervisor, strategy: :one_for_one, name: Archethic.Mining.WorkerSupervisor},
{PartitionSupervisor,
child_spec: {ChainLock, mining_timeout: @mining_timeout},
name: ChainLockSupervisor,
partitions: 20}
]

Supervisor.init(children, strategy: :rest_for_one)
Expand Down
6 changes: 5 additions & 1 deletion lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ defmodule Archethic.P2P.Message do
RegisterBeaconUpdates,
ReplicateTransaction,
ReplicationError,
RequestChainLock,
ShardRepair,
StartMining,
TransactionChainLength,
Expand All @@ -79,7 +80,8 @@ defmodule Archethic.P2P.Message do
ValidateSmartContractCall,
SmartContractCallValidation,
GetDashboardData,
DashboardData
DashboardData,
UnlockChain
}

require Logger
Expand Down Expand Up @@ -131,6 +133,8 @@ defmodule Archethic.P2P.Message do
| GetNetworkStats.t()
| ValidateSmartContractCall.t()
| GetDashboardData.t()
| RequestChainLock.t()
| UnlockChain.t()

@type response ::
Ok.t()
Expand Down
3 changes: 3 additions & 0 deletions lib/archethic/p2p/message/error.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Archethic.P2P.Message.Error do
| :network_chains_sync
| :p2p_sync
| :both_sync
| :already_locked

@type t :: %__MODULE__{
reason: reason()
Expand All @@ -39,6 +40,7 @@ defmodule Archethic.P2P.Message.Error do
def serialize_reason(:network_chains_sync), do: 4
def serialize_reason(:p2p_sync), do: 5
def serialize_reason(:both_sync), do: 6
def serialize_reason(:already_locked), do: 7

@doc """
Deserialize an error reason
Expand All @@ -51,4 +53,5 @@ defmodule Archethic.P2P.Message.Error do
def deserialize_reason(4), do: :network_chains_sync
def deserialize_reason(5), do: :p2p_sync
def deserialize_reason(6), do: :both_sync
def deserialize_reason(7), do: :already_locked
end
Loading
Loading