Skip to content
This repository was archived by the owner on Apr 22, 2024. It is now read-only.

standalone libp2p-peerbook integration #265

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
6baf501
initial pass at libp2p-peerbook integration
andymck Mar 20, 2020
437f35f
fix erl version
andymck Mar 25, 2020
1d49396
bump libp2p_peerbook version
andymck Mar 25, 2020
c4313f8
fix incorrect 1918 check, update notify and peer times. tidy up
andymck Mar 26, 2020
51046dd
adding debug
andymck Mar 26, 2020
0c941cd
adding debug and pushing down notify/peer times as part of test failu…
andymck Mar 26, 2020
1473c78
ensure peerbook options supplied to swarm are passed to peerbook
andymck Mar 27, 2020
f5324f0
fix test contagion
andymck Mar 27, 2020
7a1f71e
support is_seed_nodes in peerbook. tidy up, test tweaks
andymck Mar 27, 2020
a3a716d
bump to lastest peerbook
andymck Mar 27, 2020
37e8959
tidy up and adding debug
andymck Mar 30, 2020
a01dec0
bump peerbook version
andymck Mar 30, 2020
ce5b0ec
override default peerbook timer values
andymck Mar 30, 2020
6c5ca65
restore makefile, removed debug from tests
andymck Mar 30, 2020
498e71a
override default wait until timings
andymck Mar 30, 2020
246e1fc
override default wait until timings
andymck Mar 30, 2020
6a9e1ee
test stability tweaks
andymck Mar 31, 2020
8a91c4d
test stability tweaks
andymck Mar 31, 2020
c4477c7
test stability tweaks
andymck Mar 31, 2020
8b8ee9c
fix test to be compatible with multiple listen addrs
andymck Mar 31, 2020
cfbd20a
really fix test
andymck Mar 31, 2020
26c5aea
fix moved function
andymck Mar 31, 2020
d85dbc8
test tweaks
andymck Apr 1, 2020
4b5bee4
unload all mecked mods at end per testcase, tweak test timings
andymck Apr 1, 2020
f6f059d
add artifact support
andymck Apr 1, 2020
7377d97
tmp remove non ct suite
andymck Apr 1, 2020
dbc0399
tmp run only proxy suite
andymck Apr 1, 2020
f92367c
testing
andymck Apr 1, 2020
3bfcdb5
restore makefile, fix specs
andymck Apr 1, 2020
e176d43
fix makefile, remove space indents
andymck Apr 1, 2020
910dbe8
fix makefile
andymck Apr 1, 2020
c14a1ee
ignore unknown functions
andymck Apr 1, 2020
161c704
add peerbook app dep
andymck Apr 7, 2020
90f3834
fix proto deps location
andymck Apr 7, 2020
1703404
tidy up logging
andymck Apr 7, 2020
6eb646b
remove tmp added artifact support. Fix dialyzer warning
andymck Apr 7, 2020
eeba5fe
tweaking test wait time
andymck Apr 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/hooks/pre-command
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set -e
echo '--- :house_with_garden: Setting up the environment'

. "$HOME/.asdf/asdf.sh"
asdf local erlang 22.1
asdf local erlang 22.1.8
asdf local python 3.7.3
asdf local ruby 2.6.2

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ typecheck:
$(REBAR) dialyzer

doc:
$(REBAR) edoc
$(REBAR) edoc
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
{enacl, "0.17.2"},
{erl_base58, "0.0.1"},
{libp2p_crypto, "1.0.1"},
{libp2p_peerbook, ".*", {git, "https://github.com/helium/libp2p-peerbook", {branch, "andymck/use-peerbook-in-libp2p"}}},
{nat, ".*", {git, "https://github.com/benoitc/erlang-nat", {branch, "master"}}},
{gpb, "4.2.1"},
{backoff, "1.1.6"},
Expand Down Expand Up @@ -54,6 +55,7 @@

{gpb_opts, [
{i, "src"},
{i, {deps,"./libp2p_peerbook/src"}},
{o_erl, "src/pb"},
{o_hrl, "src/pb"},
{msg_name_prefix, "libp2p_"},
Expand All @@ -66,7 +68,6 @@
{shell, [{apps, [lager, ranch]}]}.

{dialyzer, [
{warnings, [unknown]},
{plt_apps,all_deps}
]}.

Expand Down
4 changes: 4 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
{<<"intercept">>,{pkg,<<"intercept">>,<<"1.0.0">>},1},
{<<"lager">>,{pkg,<<"lager">>,<<"3.6.5">>},0},
{<<"libp2p_crypto">>,{pkg,<<"libp2p_crypto">>,<<"1.0.1">>},0},
{<<"libp2p_peerbook">>,
{git,"https://github.com/helium/libp2p-peerbook",
{ref,"d198028c356521d40389f5c75e187c1cfc065a6b"}},
0},
{<<"multiaddr">>,{pkg,<<"multiaddr">>,<<"1.1.3">>},0},
{<<"nat">>,
{git,"https://github.com/benoitc/erlang-nat",
Expand Down
100 changes: 70 additions & 30 deletions src/group/libp2p_group_gossip_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
-define(DEFAULT_DROP_TIMEOUT, 5 * 60 * 1000).
-define(GROUP_ID, "gossip").
-define(GROUP_PATH, "gossip/1.0.0").
-define(DEFAULT_NOTIFY_PEER_GOSSIP_LIMIT, 100).
-define(GOSSIP_GROUP_PEER_ADD_KEY, "peer_add").
-define(GOSSIP_GROUP_PEER_REMOVE_KEY, "peer_remove").


%% API
%%
Expand Down Expand Up @@ -69,6 +73,7 @@ accept_stream(Pid, SessionPid, StreamPid) ->

init([Sup, TID]) ->
erlang:process_flag(trap_exit, true),
lager:debug("starting gossip server for tid ~p", [TID]),
libp2p_swarm_sup:register_gossip_group(TID),
Opts = libp2p_swarm:opts(TID),
PeerBookCount = get_opt(Opts, peerbook_connections, ?DEFAULT_PEERBOOK_CONNECTIONS),
Expand All @@ -84,6 +89,12 @@ init([Sup, TID]) ->
DropTimeOut = get_opt(Opts, drop_timeout, ?DEFAULT_DROP_TIMEOUT),

self() ! start_workers,

PeerBook = libp2p_swarm:peerbook(TID),
ok = libp2p_peer_add_gossip:install_handler(TID),
ok = libp2p_peer_remove_gossip:install_handler(TID),
ok = libp2p_peer_resolution:install_handler(TID),
ok = libp2p_peerbook:join_notify(PeerBook, self()),
{ok, update_metadata(#state{sup=Sup, tid=TID,
seed_nodes=SeedNodes,
max_inbound_connections=InboundCount,
Expand Down Expand Up @@ -111,7 +122,8 @@ handle_call(Msg, _From, State) ->
handle_cast({handle_data, StreamPid, Key, ListOrData}, State=#state{}) ->
%% Incoming message from a gossip stream for a given key
case maps:find(Key, State#state.handlers) of
error -> {noreply, State};
error ->
{noreply, State};
{ok, {M, S}} ->
%% Catch the callback response. This avoids a crash in the
%% handler taking down the gossip_server itself.
Expand Down Expand Up @@ -162,34 +174,17 @@ handle_cast({request_target, seed, WorkerPid}, State=#state{tid=TID, seed_nodes=
sets:from_list(ExcludedAddrs))),
{noreply, assign_target(WorkerPid, TargetAddrs, State)};
handle_cast({send, Key, Fun}, State=#state{}) when is_function(Fun, 0) ->
%% use a fun to generate the send data for each gossip peer
%% this can be helpful to send a unique random subset of data to each peer
{_, Pids} = lists:unzip(connections(all, State)),
lists:foreach(fun(Pid) ->
Data = Fun(),
%% Catch errors encoding the given arguments to avoid a bad key or
%% value taking down the gossip server
case (catch libp2p_gossip_stream:encode(Key, Data)) of
{'EXIT', Error} ->
lager:warning("Error encoding gossip data ~p", [Error]);
Msg ->
libp2p_group_worker:send(Pid, Key, Msg)
end
end, Pids),
%% handle send accepts a fun which is used to derive the data to be sent to each worker
%% this permits each worker to potentially receive differing data
%% in this case the fun is generated by the sender, so we just pass it along to handle_send/3
ok = handle_send(Key, Fun, State),
{noreply, State};

handle_cast({send, Key, Data}, State=#state{}) ->
{_, Pids} = lists:unzip(connections(all, State)),
%% Catch errors encoding the given arguments to avoid a bad key or
%% value taking down the gossip server
case (catch libp2p_gossip_stream:encode(Key, Data)) of
{'EXIT', Error} ->
lager:warning("Error encoding gossip data ~p", [Error]);
Msg ->
lists:foreach(fun(Pid) ->
libp2p_group_worker:send(Pid, Key, Msg)
end, Pids)
end,
%% handle send accepts a fun which is used to derive the final data to be sent to each worker
%% this permits each worker to potentially receive differing data
%% in this case we want all workers to receive the same data, so the fun here just returns Data
Fun = fun() -> Data end,
ok = handle_send(Key, Fun, State),
{noreply, State};
handle_cast({send_ready, _target, _Ref, false}, State=#state{}) ->
%% Ignore any not ready messages from group workers. The gossip
Expand Down Expand Up @@ -222,6 +217,18 @@ handle_cast(Msg, State) ->
lager:warning("Unhandled cast: ~p", [Msg]),
{noreply, State}.

handle_info({changed_peers, {{add, NewPeersToAdd0}, {remove, PeersToRemove}}}, #state{tid=TID}=State) ->
%% NOTE: NewPeersToAdd is a map where key is the peer id and the value is the peer
%% PeersRemoved meanwhile is a list of peer IDs
Opts = libp2p_swarm:opts(TID),
MaxPeerCount = libp2p_config:get_opt(Opts, [?MODULE, notify_peer_gossip_limit], ?DEFAULT_NOTIFY_PEER_GOSSIP_LIMIT),
%% Gossip a random subset of the newly added peers
AddPeerList = maps:values(NewPeersToAdd0),
ok = handle_changed_peers(?GOSSIP_GROUP_PEER_ADD_KEY, AddPeerList, MaxPeerCount, State),
%% and a subset of the removed peers, PeersRemoved is a list unlike PeersToAdd which is a map
ok = handle_changed_peers(?GOSSIP_GROUP_PEER_REMOVE_KEY, PeersToRemove, MaxPeerCount, State),
{noreply, State};

handle_info(start_workers, State=#state{tid=TID, seednode_connections=SeedCount, peerbook_connections=PeerCount}) ->
PeerBookWorkers = [start_worker(peerbook, State) || _ <- lists:seq(1, PeerCount)],
SeedWorkers = [start_worker(seed, State) || _ <- lists:seq(1, SeedCount)],
Expand Down Expand Up @@ -271,14 +278,47 @@ handle_info(Msg, State) ->
lager:warning("Unhandled cast: ~p", [Msg]),
{noreply, State}.


terminate(_Reason, #state{tid=TID}) ->
libp2p_swarm:remove_stream_handler(TID, ?GROUP_PATH).

libp2p_swarm:remove_stream_handler(TID, ?GROUP_PATH),
lager:debug("terminating with reason ~p",[_Reason]),
ok.


%% Internal
%%
handle_changed_peers(_Key, [] = _PeerList, _MaxPeerCount, _State)->
ok;
handle_changed_peers(?GOSSIP_GROUP_PEER_ADD_KEY = Key, PeerList, MaxPeerCount, State)->
%% Gossip a random subset of the specified peer list
%% PeerList here will be a list of peers
Fun = fun() ->
{_, RandomNPeers} = lists:unzip(lists:sublist(lists:keysort(1, [ {rand:uniform(), E} || E <- PeerList]), MaxPeerCount)),
libp2p_peer:encode_list(RandomNPeers)
end,
handle_send(Key, Fun, State);
handle_changed_peers(?GOSSIP_GROUP_PEER_REMOVE_KEY = Key, PeerList, MaxPeerCount, State)->
%% Gossip a random subset of the specified peers IDs to remove
%% PeerList here will be a list of peer IDs
Fun = fun() ->
{_, RandomNPeers} = lists:unzip(lists:sublist(lists:keysort(1, [ {rand:uniform(), E} || E <- PeerList]), MaxPeerCount)),
RandomNPeers
end,
handle_send(Key, Fun, State).

handle_send(Key, Fun, State)->
{_, Pids} = lists:unzip(connections(all, State)),
lager:debug("sending on connections ~p", [Pids]),
ok = lists:foreach(fun(Pid) ->
Data = Fun(),
%% Catch errors encoding the given arguments to avoid a bad key or
%% value taking down the gossip server
case (catch libp2p_gossip_stream:encode(Key, Data)) of
{'EXIT', Error} ->
lager:warning("Error encoding gossip data ~p", [Error]);
Msg ->
libp2p_group_worker:send(Pid, Key, Msg)
end
end, Pids).

-spec schedule_drop_timer(pos_integer()) -> reference().
schedule_drop_timer(DropTimeOut) ->
Expand Down
71 changes: 71 additions & 0 deletions src/group/libp2p_peer_add_gossip.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
-module(libp2p_peer_add_gossip).
-behavior(libp2p_peer_gossip_handler).
%% API
-export([install_handler/1, init_gossip_data/1, handle_gossip_data/3]).

-define(GOSSIP_GROUP_KEY, "peer_add").

-ifdef(TEST).
-define(GOSSIP_PEER_MIN_CONNS, 0).
-else.
-define(GOSSIP_PEER_MIN_CONNS, 5).
-endif.

-spec install_handler(ets:tid())-> ok | {error, any()}.
install_handler(TID)->
libp2p_group_gossip:add_handler(self(), ?GOSSIP_GROUP_KEY, {?MODULE, TID}),
ok.

%% init_gossip_data1 & handle_gossip_data/2 used to be in peerbook
-spec init_gossip_data(ets:tab()) -> libp2p_group_gossip_handler:init_result().
init_gossip_data(TID) ->
LocalAddr = libp2p_swarm:pubkey_bin(TID),
Peerbook = libp2p_swarm:peerbook(TID),
gossip_eligible_peer(Peerbook, LocalAddr).


-spec handle_gossip_data(pid(), binary(), any()) -> noreply.
handle_gossip_data(_StreamPid, Data, Handle) ->
lager:debug("~p got gossip data ~p",[Handle, Data]),
Peerbook = libp2p_swarm:peerbook(Handle),
{ok, GossipedPeerList} = libp2p_peer:decode_list(Data),
F = fun(Peer)->
lager:debug("~p putting peer: ~p", [Handle, Peer]),
case libp2p_peer_resolution:is_rfc1918_allowed(Handle) orelse
not libp2p_peer_resolution:has_private_ip(Peer) of
true ->
libp2p_peerbook:put(Peerbook, Peer);
false ->
lager:debug("not putting peer ~p",[Peer]),
ok
end
end,
ok = lists:foreach(F, GossipedPeerList),
noreply.

%%
%% Internal functions
%%
gossip_eligible_peer(Peerbook, LocalAddr)->
gossip_eligible_peer(Peerbook, LocalAddr, 10).
gossip_eligible_peer(_Peerbook, _LocalAddr, 0)->
lager:warning("failed to get random dialable peer", []),
ok;
gossip_eligible_peer(Peerbook, LocalAddr, Attempts)->
%% find a peer with a min of 5 connections
case libp2p_peerbook:random(Peerbook, [LocalAddr], ?GOSSIP_PEER_MIN_CONNS) of
{_Addr, Peer} ->
%% ok we got a peer with our required min num of connections
%% but check its dialable
case libp2p_peer_resolution:is_dialable(Peer) of
true->
Data = libp2p_peer:encode_list([Peer]),
{send, Data};
false ->
gossip_eligible_peer(Peerbook, LocalAddr, Attempts - 1)
end;
_ ->
gossip_eligible_peer(Peerbook, LocalAddr, Attempts - 1)
end.


7 changes: 7 additions & 0 deletions src/group/libp2p_peer_gossip_handler.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-module(libp2p_peer_gossip_handler).
-callback init_gossip_data(State::any()) -> init_result().
-callback handle_gossip_data(StreamPid::pid(), MsgOrData::[libp2p_crypto:pubkey_bin()] | binary(), State::any()) -> {reply, iodata()} | noreply.

-type init_result() :: ok | {send, binary()}.

-export_type([init_result/0]).
25 changes: 25 additions & 0 deletions src/group/libp2p_peer_remove_gossip.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-module(libp2p_peer_remove_gossip).
-behavior(libp2p_peer_gossip_handler).
%% API
-export([install_handler/1, init_gossip_data/1, handle_gossip_data/3]).

-define(GOSSIP_GROUP_KEY, "peer_remove").

-spec install_handler(ets:tid())-> ok | {error, any()}.
install_handler(TID)->
libp2p_group_gossip:add_handler(self(), ?GOSSIP_GROUP_KEY, {?MODULE, TID}),
ok.

-spec init_gossip_data(ets:tab()) -> libp2p_group_gossip_handler:init_result().
init_gossip_data(_TID) ->
ok.


-spec handle_gossip_data(pid(), [libp2p_crypto:pubkey_bin()], any()) -> noreply.
handle_gossip_data(_StreamPid, Data, Handle) ->
lager:debug("~p got gossip data ~p",[Handle, Data]),
Peerbook = libp2p_swarm:peerbook(Handle),
ok = lists:foreach(fun(PeerId)-> libp2p_peerbook:remove(Peerbook, PeerId) end, Data),
noreply.


Loading