Skip to content
This repository was archived by the owner on Jun 12, 2023. It is now read-only.

stream related grpc client fixes #1705

Merged
merged 5 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/miner_query.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
txns/2, txns/3,
blocks/2,
lookup_txns_by_hash/2,
lookup_txns_by_type/2]).
lookup_txns_by_type/2,
lookup_txns_by_onionhash/2
]).

poc_analyze(_Start, _End) ->
ok.
Expand Down Expand Up @@ -90,3 +92,25 @@ lookup_txns_by_hash(LastXBlocks, TxnHash) ->
end,
Current - LastXBlocks, Current, C,
[])).

lookup_txns_by_onionhash(LastXBlocks, OnionHash) ->
C = blockchain_worker:blockchain(),
{ok, Current} = blockchain:height(C),
lists:reverse(
fold_blocks(
fun(B, Acc) ->
I = blockchain_block:height(B),
case lists:filter(fun(T) ->
case blockchain_txn:type(T) == 'blockchain_txn_poc_receipts_v2' of
true ->
blockchain_txn_poc_receipts_v2:onion_key_hash(T) == OnionHash;
_ ->
false
end
end, blockchain_block:transactions(B)) of
[] -> Acc;
R -> [{I, R} | Acc]
end
end,
Current - LastXBlocks, Current, C,
[])).
27 changes: 17 additions & 10 deletions src/poc/grpc_client_stream_custom.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@
buffer := binary(),
handler_callback := undefined,
handler_state := undefined,
type := unary | streaming | undefined}.
type := unary | streaming | undefined,
conn_monitor_ref := reference()
}.

-export_type([stream/0]).

-spec new(Connection::grpc_client_custom:connection(),
Service::atom(),
Expand Down Expand Up @@ -121,14 +125,19 @@ call_rpc(Pid, Message, Timeout) ->

%% gen_server implementation
%% @private
init({Connection, Service, Rpc, Encoder, Options, HandlerMod}) ->
init({#{http_connection := ConnPid} = Connection, Service, Rpc, Encoder, Options, HandlerMod}) ->
try
StreamType = proplists:get_value(type, Options, undefined),
lager:info("init stream for RPC ~p and type ~p", [Rpc, StreamType]),
Stream = new_stream(Connection, Service, Rpc, Encoder, Options),
lager:info("init stream success with state ~p, handle_mod: ~p", [Stream, HandlerMod]),
HandlerState = HandlerMod:init(),
{ok, Stream#{handler_state => HandlerState, handler_callback => HandlerMod, type => StreamType}}
%% monitor our connection, so we can tear down stream if conn dies
Ref = monitor(process, ConnPid),
{ok, Stream#{handler_state => HandlerState,
handler_callback => HandlerMod,
type => StreamType,
conn_monitor_ref => Ref}}
catch
_Class:_Error:_Stack ->
lager:warning("failed to create stream, ~p ~p ~p", [_Class, _Error, _Stack]),
Expand Down Expand Up @@ -239,15 +248,16 @@ handle_info(timeout, #{response_pending := true,
client := Client} = Stream) ->
gen_server:reply(Client, {error, timeout}),
{noreply, Stream#{response_pending => false}};
handle_info({'DOWN', Ref, process, _, _Reason}, #{conn_mon := Ref} = C) ->
%% our connection is down, nothing more stream can do other then terminate
{stop, connection_down, C};
handle_info(Msg, #{handler_callback := HandlerCB} = Stream) ->
NewState =
case erlang:function_exported(HandlerCB, handle_info, 2) of
true -> HandlerCB:handle_info(Msg, Stream);
false -> Stream
end,
{noreply, NewState}.
%%handle_info(_InfoMessage, Stream) ->
%% {noreply, Stream}.

%% @private
terminate(_Reason, _State) ->
Expand Down Expand Up @@ -355,13 +365,10 @@ info_response(Response, #{response_pending := true,
info_response(Response, #{queue := Queue, type := unary} = Stream) ->
NewQueue = queue:in(Response, Queue),
{noreply, Stream#{queue => NewQueue}};
%%info_response(Response, #{queue := Queue} = Stream) ->
%% NewQueue = queue:in(Response, Queue),
%% {noreply, Stream#{queue => NewQueue}}.

info_response(eof = Response, #{type := Type} = Stream) ->
info_response(eof = Response, #{type := Type, state := closed} = Stream) ->
lager:info("info_response ~p, stream type: ~p", [Response, Type]),
{stop, normal, Stream};
%% pass any unmatched info msg to our handler
info_response(Response, #{handler_callback := CB, handler_state := CBState} = Stream) ->
lager:info("info_response ~p, CB: ~p", [Response, CB]),
NewCBState = CB:handle_msg(Response, CBState),
Expand Down
9 changes: 6 additions & 3 deletions src/poc/miner_poc_grpc_client_statem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,8 @@ send_grpc_unary_req(Connection, Req, RPC) ->
'helium.gateway',
RPC,
gateway_miner_client_pb,
[{callback_mod, miner_poc_grpc_client_handler}]
[{callback_mod, miner_poc_grpc_client_handler},
{timeout, 10000}]
),
lager:info("send unary result: ~p", [Res]),
process_unary_response(Res)
Expand All @@ -723,7 +724,8 @@ send_grpc_unary_req(PeerIP, GRPCPort, Req, RPC) ->
'helium.gateway',
RPC,
gateway_miner_client_pb,
[{callback_mod, miner_poc_grpc_client_handler}]
[{callback_mod, miner_poc_grpc_client_handler},
{timeout, 10000}]
),
lager:info("New Connection, send unary result: ~p", [Res]),
%% we dont need the connection to hang around, so close it out
Expand Down Expand Up @@ -1234,7 +1236,8 @@ send_block_age_req(Connection) ->
'helium.gateway',
config,
gateway_miner_client_pb,
[{callback_mod, miner_poc_grpc_client_handler}]
[{callback_mod, miner_poc_grpc_client_handler},
{timeout, 10000}]
) of
{ok, #{http_status := 200, result := #gateway_resp_v1_pb{block_age = BlockAge}}} when is_integer(BlockAge) ->
{ok, BlockAge};
Expand Down
9 changes: 7 additions & 2 deletions src/poc/miner_poc_mgr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ save_local_poc_keys(CurHeight, KeyList) ->
OnionKeyHash = crypto:hash(sha256, libp2p_crypto:pubkey_to_bin(PubKey)),
POCKeyRec = #poc_local_key_data{receive_height = CurHeight,
keys = Keys, onion_key_hash = OnionKeyHash},
lager:debug("saving local poc key ~p at height ~p", [OnionKeyHash, CurHeight]),
catch write_local_poc_keys(POCKeyRec, DB, CF)
end
|| Keys <- KeyList
Expand Down Expand Up @@ -519,6 +520,7 @@ initialize_poc(BlockHash, POCStartHeight, Keys, Vars, Ledger, #state{pub_key = C
{ok, LastChallenge} = blockchain_ledger_v1:current_height(Ledger),
{ok, B} = blockchain_ledger_v1:get_block(LastChallenge, Ledger),
Time = blockchain_block:time(B),
BlockHeight = blockchain_block:height(B),
Path = blockchain_poc_path_v4:build(TargetPubkeybin, TargetRandState, Ledger, Time, Vars),
N = erlang:length(Path),
[<<IV:16/integer-unsigned-little, _/binary>> | LayerData] = blockchain_txn_poc_receipts_v2:create_secret_hash(
Expand All @@ -544,8 +546,8 @@ initialize_poc(BlockHash, POCStartHeight, Keys, Vars, Ledger, #state{pub_key = C
start_height = POCStartHeight
},
ok = write_local_poc(LocalPOC, State),
lager:info("started poc for challengeraddr ~p, target: ~p, onionhash ~p",
[?TO_ANIMAL_NAME(Challenger), OnionKeyHash]),
lager:info("started poc at blockheight ~p for challengeraddr ~p, onionhash ~p, target: ~p",
[BlockHeight, ?TO_ANIMAL_NAME(Challenger), OnionKeyHash, ?TO_ANIMAL_NAME(TargetPubkeybin)]),
ok
end
end.
Expand Down Expand Up @@ -582,6 +584,7 @@ process_block_pocs(
{ok, _} ->
spawn(fun() -> initialize_poc(BlockHash, BlockHeight, Keys, Vars, Ledger, State) end);
_ ->
lager:warning("found a local poc key but public data missing, onionkeyhash: ~p", [OnionKeyHash]),
ok
end;
_ ->
Expand Down Expand Up @@ -672,6 +675,8 @@ purge_local_poc_keys(
case BlockHeight > (ReceiveHeight + POCEphemeralKeyTimeout + POCTimeout) of
true ->
%% the lifespan of any POC for this key has passed, we can GC
lager:debug("GCing local poc key ~p, blockheight: ~p, receive height: ~p",
[Key, BlockHeight, ReceiveHeight]),
ok = delete_local_poc_keys(Key, DB, CF);
_ ->
ok
Expand Down
10 changes: 5 additions & 5 deletions test/miner_poc_grpc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -584,11 +584,11 @@ extra_vars(grpc, TargetVersion) ->
?poc_activity_filter_enabled => true,
?validator_hb_reactivation_limit => 100,
?poc_validator_ct_scale => 0.8,
%% ?h3dex_gc_width => 10,
%% ?poc_target_hex_parent_res => 5,
%% ?poc_target_hex_collection_res => 5,
%% ?poc_target_pool_size => 2,
%% ?poc_hexing_type => hex_h3dex,
?h3dex_gc_width => 10,
?poc_target_hex_parent_res => 5,
?poc_target_hex_collection_res => 7,
?poc_target_pool_size => 2,
?poc_hexing_type => hex_h3dex,
?hip17_interactivity_blocks => 20
},
maps:merge(extra_vars(poc_v11), GrpcVars).
Expand Down