Skip to content

Commit

Permalink
Store beacon aggregate after self-repair (#606)
Browse files Browse the repository at this point in the history
* Handle DB operations for summaries aggregate
* Add message to get beacon summaries aggregate
* Store beacon aggregate after self repair
* Split self repair for summary and aggregate fetch
* Fix some self-repair tests
* Adapt explorer
* Fix ordering of retrieved aggregates
* Clear beacon summaries just before the next beacon summary
* Refactor beacon aggregate address derivation
* Adapt the fetch of the transaction summaries based on date in explore live
* Improve Self-Repair sync to avoid missed data
* Add script to migrate summaries to aggregate
  • Loading branch information
samuelmanzanera authored Oct 12, 2022
1 parent fa5fad4 commit df6fd91
Show file tree
Hide file tree
Showing 25 changed files with 908 additions and 386 deletions.
97 changes: 69 additions & 28 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ defmodule Archethic.BeaconChain do
alias Archethic.P2P
alias Archethic.P2P.Node
alias Archethic.P2P.Message.GetBeaconSummaries
alias Archethic.P2P.Message.GetBeaconSummariesAggregate
alias Archethic.P2P.Message.BeaconSummaryList
alias Archethic.P2P.Message.NotFound

alias Archethic.TaskSupervisor

Expand Down Expand Up @@ -274,10 +276,6 @@ defmodule Archethic.BeaconChain do
[ ] [ ] [ ] Aggregate addresses
| | |
[ ] [ ] [ ] Fetch summaries
|\ /|\ /|
| \/ | \/ |
| /\ | /\ |
[D1] [D2] [D3] Partition by date
| | |
[ ] [ ] [ ] Aggregate and consolidate summaries
\ | /
Expand All @@ -287,18 +285,17 @@ defmodule Archethic.BeaconChain do
[ ]
```
"""
@spec fetch_summary_aggregates(list(DateTime.t()) | Enumerable.t()) ::
list(SummaryAggregate.t())
def fetch_summary_aggregates(dates) do
authorized_nodes = P2P.authorized_and_available_nodes()
@spec fetch_and_aggregate_summaries(DateTime.t()) :: SummaryAggregate.t()
def fetch_and_aggregate_summaries(date = %DateTime{}) do
authorized_nodes =
P2P.authorized_and_available_nodes()
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))

list_subsets()
|> Flow.from_enumerable(stages: 256)
|> Flow.flat_map(fn subset ->
# Foreach subset and date we compute concurrently the node election
dates
|> Stream.map(&get_summary_address_by_node(&1, subset, authorized_nodes))
|> Enum.flat_map(& &1)
# Foreach subset we compute concurrently the node election
get_summary_address_by_node(date, subset, authorized_nodes)
end)
# We partition by node
|> Flow.partition(key: {:elem, 0})
Expand All @@ -310,23 +307,16 @@ defmodule Archethic.BeaconChain do
# For this node we fetch the summaries
fetch_summaries(node, addresses)
end)
# We repartition by summary time to aggregate summaries for a date
|> Flow.partition(stages: System.schedulers_online() * 4, key: {:key, :summary_time})
|> Flow.reduce(
fn -> %{} end,
fn summary = %Summary{summary_time: time}, acc ->
Map.update(
acc,
time,
%SummaryAggregate{summary_time: time} |> SummaryAggregate.add_summary(summary),
&SummaryAggregate.add_summary(&1, summary)
)
end
# We departition to build the final summarie aggregate
|> Flow.departition(
fn -> %SummaryAggregate{summary_time: date} end,
fn summaries, acc ->
Enum.reduce(summaries, acc, &SummaryAggregate.add_summary(&2, &1))
end,
& &1
)
|> Flow.on_trigger(&{Map.values(&1), &1})
|> Stream.reject(&SummaryAggregate.empty?/1)
|> Stream.map(&SummaryAggregate.aggregate/1)
|> Enum.sort_by(& &1.summary_time, {:asc, DateTime})
|> Enum.to_list()
|> Enum.at(0)
end

defp get_summary_address_by_node(date, subset, authorized_nodes) do
Expand Down Expand Up @@ -373,4 +363,55 @@ defmodule Archethic.BeaconChain do
)
end)
end

@doc """
Get a beacon summaries aggregate for a given date
"""
@spec get_summaries_aggregate(DateTime.t()) ::
{:ok, SummaryAggregate.t()} | {:error, :not_exists}
defdelegate get_summaries_aggregate(datetime), to: DB, as: :get_beacon_summaries_aggregate

@doc """
Persists a beacon summaries aggregate
"""
@spec write_summaries_aggregate(SummaryAggregate.t()) :: :ok
defdelegate write_summaries_aggregate(aggregate), to: DB, as: :write_beacon_summaries_aggregate

@doc """
Fetch a summaries aggregate for a given date
"""
@spec fetch_summaries_aggregate(DateTime.t()) ::
{:ok, SummaryAggregate.t()} | {:error, :not_exists} | {:error, :network_issue}
def fetch_summaries_aggregate(summary_time = %DateTime{}) do
storage_nodes =
summary_time
|> Crypto.derive_beacon_aggregate_address()
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())

conflict_resolver = fn results ->
# Prioritize results over not found
with nil <- Enum.find(results, &match?(%SummaryAggregate{}, &1)),
nil <- Enum.find(results, &match?(%NotFound{}, &1)) do
%NotFound{}
else
res ->
res
end
end

case P2P.quorum_read(
storage_nodes,
%GetBeaconSummariesAggregate{date: summary_time},
conflict_resolver
) do
{:ok, aggregate = %SummaryAggregate{}} ->
{:ok, aggregate}

{:ok, %NotFound{}} ->
{:error, :not_exists}

{:error, :network_issue} = e ->
e
end
end
end
18 changes: 13 additions & 5 deletions lib/archethic/beacon_chain/slot_timer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@ defmodule Archethic.BeaconChain.SlotTimer do

use GenServer

alias Crontab.CronExpression.Parser, as: CronParser
alias Crontab.Scheduler, as: CronScheduler

alias Archethic.BeaconChain
alias Archethic.BeaconChain.SubsetRegistry
alias Archethic.BeaconChain.SummaryTimer

alias Archethic.P2P
alias Archethic.P2P.Node
alias Archethic.DB

alias Archethic.Crypto

alias Archethic.P2P
alias Archethic.P2P.Node
alias Archethic.PubSub

alias Archethic.Utils

alias Crontab.CronExpression.Parser, as: CronParser
alias Crontab.Scheduler, as: CronScheduler

require Logger

@slot_timer_ets :archethic_slot_timer
Expand Down Expand Up @@ -136,6 +138,12 @@ defmodule Archethic.BeaconChain.SlotTimer do

PubSub.notify_current_epoch_of_slot_timer(slot_time)

if SummaryTimer.match_interval?(slot_time) do
# We clean the previously stored summaries - The retention time is for a self repair cycle
# as the aggregates will be handle for long term storage.
DB.clear_beacon_summaries()
end

case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do
%Node{authorized?: true, available?: true} ->
Logger.debug("Trigger beacon slots creation at #{Utils.time_to_string(slot_time)}")
Expand Down
Loading

0 comments on commit df6fd91

Please sign in to comment.