Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store beacon aggregate after self-repair #606

Merged
merged 15 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 69 additions & 28 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ defmodule Archethic.BeaconChain do
alias Archethic.P2P
alias Archethic.P2P.Node
alias Archethic.P2P.Message.GetBeaconSummaries
alias Archethic.P2P.Message.GetBeaconSummariesAggregate
alias Archethic.P2P.Message.BeaconSummaryList
alias Archethic.P2P.Message.NotFound

alias Archethic.TaskSupervisor

Expand Down Expand Up @@ -274,10 +276,6 @@ defmodule Archethic.BeaconChain do
[ ] [ ] [ ] Aggregate addresses
| | |
[ ] [ ] [ ] Fetch summaries
|\ /|\ /|
| \/ | \/ |
| /\ | /\ |
[D1] [D2] [D3] Partition by date
| | |
[ ] [ ] [ ] Aggregate and consolidate summaries
\ | /
Expand All @@ -287,18 +285,17 @@ defmodule Archethic.BeaconChain do
[ ]
```
"""
@spec fetch_summary_aggregates(list(DateTime.t()) | Enumerable.t()) ::
list(SummaryAggregate.t())
def fetch_summary_aggregates(dates) do
authorized_nodes = P2P.authorized_and_available_nodes()
@spec fetch_and_aggregate_summaries(DateTime.t()) :: SummaryAggregate.t()
def fetch_and_aggregate_summaries(date = %DateTime{}) do
authorized_nodes =
P2P.authorized_and_available_nodes()
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))

list_subsets()
|> Flow.from_enumerable(stages: 256)
|> Flow.flat_map(fn subset ->
# Foreach subset and date we compute concurrently the node election
dates
|> Stream.map(&get_summary_address_by_node(&1, subset, authorized_nodes))
|> Enum.flat_map(& &1)
# Foreach subset we compute concurrently the node election
get_summary_address_by_node(date, subset, authorized_nodes)
end)
# We partition by node
|> Flow.partition(key: {:elem, 0})
Expand All @@ -310,23 +307,16 @@ defmodule Archethic.BeaconChain do
# For this node we fetch the summaries
fetch_summaries(node, addresses)
end)
# We repartition by summary time to aggregate summaries for a date
|> Flow.partition(stages: System.schedulers_online() * 4, key: {:key, :summary_time})
|> Flow.reduce(
fn -> %{} end,
fn summary = %Summary{summary_time: time}, acc ->
Map.update(
acc,
time,
%SummaryAggregate{summary_time: time} |> SummaryAggregate.add_summary(summary),
&SummaryAggregate.add_summary(&1, summary)
)
end
# We departition to build the final summarie aggregate
|> Flow.departition(
fn -> %SummaryAggregate{summary_time: date} end,
fn summaries, acc ->
Enum.reduce(summaries, acc, &SummaryAggregate.add_summary(&2, &1))
end,
& &1
)
|> Flow.on_trigger(&{Map.values(&1), &1})
|> Stream.reject(&SummaryAggregate.empty?/1)
|> Stream.map(&SummaryAggregate.aggregate/1)
|> Enum.sort_by(& &1.summary_time, {:asc, DateTime})
|> Enum.to_list()
|> Enum.at(0)
end

defp get_summary_address_by_node(date, subset, authorized_nodes) do
Expand Down Expand Up @@ -373,4 +363,55 @@ defmodule Archethic.BeaconChain do
)
end)
end

@doc """
Get a beacon summaries aggregate for a given date
"""
@spec get_summaries_aggregate(DateTime.t()) ::
{:ok, SummaryAggregate.t()} | {:error, :not_exists}
defdelegate get_summaries_aggregate(datetime), to: DB, as: :get_beacon_summaries_aggregate

@doc """
Persists a beacon summaries aggregate
"""
@spec write_summaries_aggregate(SummaryAggregate.t()) :: :ok
defdelegate write_summaries_aggregate(aggregate), to: DB, as: :write_beacon_summaries_aggregate

@doc """
Fetch a summaries aggregate for a given date
"""
@spec fetch_summaries_aggregate(DateTime.t()) ::
{:ok, SummaryAggregate.t()} | {:error, :not_exists} | {:error, :network_issue}
def fetch_summaries_aggregate(summary_time = %DateTime{}) do
storage_nodes =
summary_time
|> Crypto.derive_beacon_aggregate_address()
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())

conflict_resolver = fn results ->
# Prioritize results over not found
with nil <- Enum.find(results, &match?(%SummaryAggregate{}, &1)),
nil <- Enum.find(results, &match?(%NotFound{}, &1)) do
%NotFound{}
else
res ->
res
end
end

case P2P.quorum_read(
storage_nodes,
%GetBeaconSummariesAggregate{date: summary_time},
conflict_resolver
) do
{:ok, aggregate = %SummaryAggregate{}} ->
{:ok, aggregate}

{:ok, %NotFound{}} ->
{:error, :not_exists}

{:error, :network_issue} = e ->
e
end
end
end
18 changes: 13 additions & 5 deletions lib/archethic/beacon_chain/slot_timer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@ defmodule Archethic.BeaconChain.SlotTimer do

use GenServer

alias Crontab.CronExpression.Parser, as: CronParser
alias Crontab.Scheduler, as: CronScheduler

alias Archethic.BeaconChain
alias Archethic.BeaconChain.SubsetRegistry
alias Archethic.BeaconChain.SummaryTimer

alias Archethic.P2P
alias Archethic.P2P.Node
alias Archethic.DB

alias Archethic.Crypto

alias Archethic.P2P
alias Archethic.P2P.Node
alias Archethic.PubSub

alias Archethic.Utils

alias Crontab.CronExpression.Parser, as: CronParser
alias Crontab.Scheduler, as: CronScheduler

require Logger

@slot_timer_ets :archethic_slot_timer
Expand Down Expand Up @@ -136,6 +138,12 @@ defmodule Archethic.BeaconChain.SlotTimer do

PubSub.notify_current_epoch_of_slot_timer(slot_time)

if SummaryTimer.match_interval?(slot_time) do
# We clean the previously stored summaries - The retention time is for a self repair cycle
# as the aggregates will be handle for long term storage.
DB.clear_beacon_summaries()
end

case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do
%Node{authorized?: true, available?: true} ->
Logger.debug("Trigger beacon slots creation at #{Utils.time_to_string(slot_time)}")
Expand Down
Loading