Skip to content

Commit

Permalink
Ingesting pending txns with a separate RW pool
Browse files Browse the repository at this point in the history
  • Loading branch information
madninja committed Feb 9, 2020
1 parent 0c10ef4 commit 51f270f
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 65 deletions.
3 changes: 2 additions & 1 deletion .env.template
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DATABASE_URL=postgresql://user:password@hostname/database
DATABASE_RO_URL=postgresql://localhost/ledger2
DATABASE_RW_URL=postgresql://localhost/ledger2
9 changes: 1 addition & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ work

* Clone this repository
* Create `.env` file by copying `.env.template` and editing it to
reflect your postgres and other keys and credentials

**Note:** In order for resets to work the postgres user specified in
the `.env` file needs to exist and have `CREATEDB` permissions.
reflect your postgres read-only and read-write access URLs

* Run `make release` in the top level folder

Expand All @@ -33,7 +30,3 @@ This application does NOT serve up over TLS, and does NOT rate
controll, or access control clients. Please run this service behind a
load balancer that terminates SSL and does some rate and access
control.

## IN PROGRESS

* Ingest of new transactions (aka pending transactions)
13 changes: 11 additions & 2 deletions config/sys.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,26 @@
{blockchain_http,
[
{port, 8080},
{db_pool,
{db_ro_pool,
[
{size, 10},
{max_overflow, 20}
]},
{db_handlers,
{db_rw_pool,
[
{size, 2},
{max_overflow, 5}
]},
{db_ro_handlers,
[
bh_route_blocks,
bh_route_accounts,
bh_route_hotspots,
bh_route_pending_txns
]},
{db_rw_handlers,
[
bh_route_pending_txns
]}
]},
{lager,
Expand Down
2 changes: 2 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
recon,
h3,
iso8601,
libp2p_crypto,
{poolboy, "1.5.2"},
{elli, "3.2.0"},
{epgsql, "4.3.0"},
{observer_cli, "1.5.0"},
{telemetry, "0.4.1"},
{psql_migration, {git, "https://github.com/helium/psql-migration.git", {branch, "master"}}},
{helium_proto, {git, "https://github.com/helium/proto.git", {branch, "master"}}},
{envloader, {git, "https://github.com/nuex/envloader.git", {branch, "master"}}}
]}.

Expand Down
22 changes: 20 additions & 2 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{"1.1.0",
[{<<"elli">>,{pkg,<<"elli">>,<<"3.2.0">>},0},
[{<<"base32">>,{pkg,<<"base32">>,<<"0.1.0">>},2},
{<<"ecc_compact">>,{pkg,<<"ecc_compact">>,<<"1.0.4">>},1},
{<<"elli">>,{pkg,<<"elli">>,<<"3.2.0">>},0},
{<<"enacl">>,{pkg,<<"enacl">>,<<"0.17.2">>},1},
{<<"envloader">>,
{git,"https://github.com/nuex/envloader.git",
{ref,"27a97e04f35c554995467b9236d8ae0188d468c7"}},
Expand All @@ -9,32 +12,47 @@
{git,"git://github.com/artemeff/eql.git",
{ref,"a6727a3f878bfdd06648b7a886cbd2c0630db172"}},
1},
{<<"erl_base58">>,{pkg,<<"erl_base58">>,<<"0.0.1">>},1},
{<<"getopt">>,{pkg,<<"getopt">>,<<"1.0.1">>},1},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1},
{<<"h3">>,{pkg,<<"h3">>,<<"3.1.1">>},0},
{<<"helium_proto">>,
{git,"https://github.com/helium/proto.git",
{ref,"6a68672f4d052e818931748628c1700ea29927bf"}},
0},
{<<"iso8601">>,{pkg,<<"iso8601">>,<<"1.3.1">>},0},
{<<"jsone">>,{pkg,<<"jsone">>,<<"1.5.2">>},0},
{<<"lager">>,{pkg,<<"lager">>,<<"3.8.0">>},0},
{<<"libp2p_crypto">>,{pkg,<<"libp2p_crypto">>,<<"1.0.1">>},0},
{<<"multiaddr">>,{pkg,<<"multiaddr">>,<<"1.1.3">>},1},
{<<"observer_cli">>,{pkg,<<"observer_cli">>,<<"1.5.0">>},0},
{<<"poolboy">>,{pkg,<<"poolboy">>,<<"1.5.2">>},0},
{<<"psql_migration">>,
{git,"https://github.com/helium/psql-migration.git",
{ref,"a8d8b9ac2d014270d60bdaa34035b022f1bae429"}},
{ref,"2236b5dbcea1208594c4107569cb80db88154dd7"}},
0},
{<<"recon">>,{pkg,<<"recon">>,<<"2.5.0">>},0},
{<<"small_ints">>,{pkg,<<"small_ints">>,<<"0.1.0">>},2},
{<<"telemetry">>,{pkg,<<"telemetry">>,<<"0.4.1">>},0}]}.
[
{pkg_hash,[
{<<"base32">>, <<"044F6DC95709727CA2176F3E97A41DDAA76B5BC690D3536908618C0CB32616A2">>},
{<<"ecc_compact">>, <<"B2DDA47A6810B59A24331F3C864001A2FDE1C85A7BC31E52E7B6B2EE532F5627">>},
{<<"elli">>, <<"7842861819869EBBFF7230BC77ECF2DF551AE3EAEF5FDE6B01A7561CACCB811E">>},
{<<"enacl">>, <<"4AD59142943E72D72C56E33C30DEDEF28ADD8EBEE79C51033562B0CB4B93EDE0">>},
{<<"epgsql">>, <<"26D9CF04D74773D1DC4DA24AD39E926B34E107232591FE1866EFDFBC0A098396">>},
{<<"erl_base58">>, <<"37710854461D71DF338E73C65776302DB41C4BAB4674D2EC134ED7BCFC7B5552">>},
{<<"getopt">>, <<"C73A9FA687B217F2FF79F68A3B637711BB1936E712B521D8CE466B29CBF7808A">>},
{<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>},
{<<"h3">>, <<"C848A0E4963E96EB43647D31937DD7F5FE169A4E7B499A222E88C2C3AC2D80B5">>},
{<<"iso8601">>, <<"D1CEE73F56D71C35590C6B2DB2074873BF410BABAAB768F6EA566366D8CA4810">>},
{<<"jsone">>, <<"87ADEA283C9CF24767B4DEED44602989A5331156DF5D60A2660E9C9114D54046">>},
{<<"lager">>, <<"3402B9A7E473680CA179FC2F1D827CAB88DD37DD1E6113090C6F45EF05228A1C">>},
{<<"libp2p_crypto">>, <<"0D7071A07CBBA012C049C06067874969A6A2FE765CCD33B9DBB582580E62929F">>},
{<<"multiaddr">>, <<"978E58E28F6FACAF428C87AF933612B1E2F3F2775F1794EDA5E831A4EACD2984">>},
{<<"observer_cli">>, <<"9944882B71F55B2503663D9CB54D3F1C7BBDF7CC6DD01CC40EA8EF51207601EC">>},
{<<"poolboy">>, <<"392B007A1693A64540CEAD79830443ABF5762F5D30CF50BC95CB2C1AAAFA006B">>},
{<<"recon">>, <<"2F7FCBEC2C35034BADE2F9717F77059DC54EB4E929A3049CA7BA6775C0BD66CD">>},
{<<"small_ints">>, <<"82A824C8794A2DDC73CB5CD00EAD11331DB296521AD16A619C13D668572B868A">>},
{<<"telemetry">>, <<"AE2718484892448A24470E6AA341BC847C3277BFB8D4E9289F7474D752C09C7F">>}]}
].
29 changes: 16 additions & 13 deletions src/bh_db_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,35 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).

-export([squery/1, equery/2, prepared_query/2]).
-export([squery/2, equery/3, prepared_query/3]).

-record(state,
{
db_conn :: epgsql:connection()
}).


squery(Sql) ->
poolboy:transaction(?DB_POOL,
-spec squery(Pool::atom(), Stmt::string()) -> epgsql_cmd_squery:response().
squery(Pool, Sql) ->
poolboy:transaction(Pool,
fun(Worker) ->
gen_server:call(Worker, {squery, Sql})
gen_server:call(Worker, {squery, Sql}, infinity)
end).

-spec equery(Stmt::string(), Params::[epgsql:bind_param()]) -> epgsql_cmd_equery:response().
equery(Stmt, Params) ->
poolboy:transaction(?DB_POOL,
-spec equery(Pool::atom(), Stmt::string(), Params::[epgsql:bind_param()]) -> epgsql_cmd_equery:response().
equery(Pool, Stmt, Params) ->
poolboy:transaction(Pool,
fun(Worker) ->
gen_server:call(Worker, {equery, Stmt, Params})
gen_server:call(Worker, {equery, Stmt, Params}, infinity)
end).

-spec prepared_query(Pool::atom(), Name::string(), Params::[epgsql:bind_param()]) -> epgsql_cmd_prepared_query:response().
prepared_query(Pool, Name, Params) ->
poolboy:transaction(Pool,
fun(Worker) ->
gen_server:call(Worker, {prepared_query, Name, Params}, infinity)
end).

-spec prepared_query(Name::string(), Params::[epgsql:bind_param()]) -> epgsql_cmd_prepared_query:response().
prepared_query(Name, Params) ->
poolboy:transaction(?DB_POOL, fun(Worker) ->
gen_server:call(Worker, {prepared_query, Name, Params})
end).

start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
Expand Down
13 changes: 9 additions & 4 deletions src/bh_db_worker.hrl
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
-define(DB_POOL, db_pool).
-define(DB_RO_POOL, db_ro_pool).
-define(DB_RW_POOL, db_rw_pool).

-define(SQUERY(S), bh_db_worker:squery((S))).
-define(EQUERY(S, P), bh_db_worker:equery((S), (P))).
-define(PREPARED_QUERY(S, P), bh_db_worker:prepared_query((S), (P))).
-define(SQUERY(S), ?SQUERY(?DB_RO_POOL, (S))).
-define(EQUERY(S, A), ?EQUERY(?DB_RO_POOL, (S), (A))).
-define(PREPARED_QUERY(S, A), ?PREPARED_QUERY(?DB_RO_POOL, (S), (A))).

-define(SQUERY(P, S), bh_db_worker:squery((P),(S))).
-define(EQUERY(P, S, A), bh_db_worker:equery((P), (S), (A))).
-define(PREPARED_QUERY(P, S, A), bh_db_worker:prepared_query((P), (S), (A))).
16 changes: 13 additions & 3 deletions src/bh_route_accounts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
-define(S_ACCOUNT_LIST, "account_list").
-define(S_ACCOUNT, "account").

-define(SELECT_ACCOUNT_BASE, "select l.address, l.dc_balance, l.dc_nonce, l.security_balance, l.security_nonce, l.balance, l.nonce from account_ledger l ").
-define(SELECT_ACCOUNT_BASE(A), "select l.address, l.dc_balance, l.dc_nonce, l.security_balance, l.security_nonce, l.balance, l.nonce" A " from account_ledger l ").
-define(SELECT_ACCOUNT_BASE, ?SELECT_ACCOUNT_BASE("")).


prepare_conn(Conn) ->
{ok, _} = epgsql:parse(Conn, ?S_ACCOUNT_LIST_BEFORE,
Expand All @@ -24,7 +26,10 @@ prepare_conn(Conn) ->
?SELECT_ACCOUNT_BASE "order by block desc, address limit $1", []),

{ok, _} = epgsql:parse(Conn, ?S_ACCOUNT,
?SELECT_ACCOUNT_BASE "where l.address = $1", []),
?SELECT_ACCOUNT_BASE(
", (select coalesce(max(nonce), l.nonce) from pending_transactions p where p.address = l.address and nonce_type='balance' and status != 'failed') as speculative_nonce"
)
"where l.address = $1", []),

ok.

Expand Down Expand Up @@ -70,4 +75,9 @@ account_to_json({Address, DCBalance, DCNonce, SecBalance, SecNonce, Balance, Non
<<"data_credit_nonce">> => DCNonce,
<<"security_balance">> => SecBalance,
<<"security_nonce">> => SecNonce
}.
};
account_to_json({Address, DCBalance, DCNonce, SecBalance, SecNonce, Balance, Nonce, SpecNonce}) ->
Base = account_to_json({Address, DCBalance, DCNonce, SecBalance, SecNonce, Balance, Nonce}),
Base#{
<<"speculative_nonce">> => SpecNonce
}.
3 changes: 3 additions & 0 deletions src/bh_route_handler.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@
-define(MK_RESPONSE(R), bh_route_handler:mk_response((R))).
-define(INSERT_LAT_LON(L, N, F), bh_route_handler:lat_lon((L), (N), (F))).
-define(INSERT_LAT_LON(L, F), bh_route_handler:lat_lon((L), (F))).

-define (BIN_TO_B64(B), list_to_binary(base64:encode_to_string((B)))).
-define (BIN_TO_B58(B), list_to_binary(libp2p_crypto:bin_to_b58((B)))).
52 changes: 47 additions & 5 deletions src/bh_route_pending_txns.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@
-behavior(bh_db_worker).

-include("bh_route_handler.hrl").
-include_lib("helium_proto/src/pb/blockchain_txn_pb.hrl").

-export([prepare_conn/1, handle/3]).
%% Utilities
-export([get_pending_txn_list/0,
get_pending_txn/1,
get_pending_txn_list/3]).
get_pending_txn_list/3,
insert_pending_txn/2]).


-define(S_PENDING_TXN_LIST_BEFORE, "pending_txn_list_before").
-define(S_PENDING_TXN, "pending_txn").
-define(S_INSERT_PENDING_TXN, "insert_pending_txn").

-define(SELECT_PENDING_TXN_BASE, "select t.created_at, t.updated_at, t.created_block, t.hash, t.status, t.failed_reason from pending_transactions t ").
-define(SELECT_PENDING_TXN_BASE, "select t.created_at, t.updated_at, t.hash, t.status, t.failed_reason from pending_transactions t ").

prepare_conn(Conn) ->
{ok, _} = epgsql:parse(Conn, ?S_PENDING_TXN_LIST_BEFORE,
Expand All @@ -26,16 +28,40 @@ prepare_conn(Conn) ->
?SELECT_PENDING_TXN_BASE "where hash = $1", []),

{ok, _} = epgsql:parse(Conn, ?S_INSERT_PENDING_TXN,
"insert into pending_transactions (hash, created_block, type, nonce, status, fields) values ($1, $2, $3, $4, $5, $6)", []),
"insert into pending_transactions (hash, type, address, nonce, nonce_type, status, data) values ($1, $2, $3, $4, $5, $6, $7)", []),

ok.

handle('GET', [TxnHash], _Req) ->
?MK_RESPONSE(get_pending_txn(elli_request:uri_decode(TxnHash)));
handle('POST', [], Req) ->
#{ <<"txn">> := EncodedTxn } = jsone:decode(elli_request:body(Req)),
BinTxn = base64:decode(EncodedTxn),
Txn = txn_unwrap(blockchain_txn_pb:decode_msg(BinTxn, blockchain_txn_pb)),
Result = insert_pending_txn(Txn, BinTxn),
?MK_RESPONSE(Result);

handle(_, _, _Req) ->
?RESPONSE_404.

-spec insert_pending_txn(#blockchain_txn_payment_v1_pb{}, binary()) -> {ok, jsone:json_object()}.
insert_pending_txn(#blockchain_txn_payment_v1_pb{
payer=Payer,
nonce=Nonce
}=Txn, Bin) ->
TxnHash = ?BIN_TO_B64(txn_hash(Txn)),
Params = [
TxnHash,
txn_type(Txn),
?BIN_TO_B58(Payer),
Nonce,
<<"balance">>,
<<"received">>,
Bin
],
{ok, _} = ?PREPARED_QUERY(?DB_RW_POOL, ?S_INSERT_PENDING_TXN, Params),
{ok, #{ <<"hash">> => TxnHash}}.

%% @equiv get_pending_txn_list(Status, Before, Limit)
get_pending_txn_list() ->
get_pending_txn_list(pending, calendar:universal_time(), ?MAX_LIMIT).
Expand All @@ -61,12 +87,28 @@ get_pending_txn(Key) ->
pending_txn_list_to_json(Results) ->
lists:map(fun pending_txn_to_json/1, Results).

pending_txn_to_json({CreatedAt, UpdatedAt, CreatedBlock, Hash, Status, FailedReason}) ->
pending_txn_to_json({CreatedAt, UpdatedAt, Hash, Status, FailedReason}) ->
#{
<<"created_at">> => iso8601:format(CreatedAt),
<<"updated_at">> => iso8601:format(UpdatedAt),
<<"created_block">> => CreatedBlock,
<<"hash">> => Hash,
<<"status">> => Status,
<<"failed_reason">> => FailedReason
}.

%%
%% txn decoders
%%

txn_unwrap(#blockchain_txn_pb{txn={bundle, #blockchain_txn_bundle_v1_pb{transactions=Txns} = Bundle}}) ->
Bundle#blockchain_txn_bundle_v1_pb{transactions=lists:map(fun txn_unwrap/1, Txns)};
txn_unwrap(#blockchain_txn_pb{txn={_, Txn}}) ->
Txn.

txn_hash(#blockchain_txn_payment_v1_pb{}=Txn) ->
BaseTxn = Txn#blockchain_txn_payment_v1_pb{signature = <<>>},
EncodedTxn = blockchain_txn_payment_v1_pb:encode_msg(BaseTxn),
crypto:hash(sha256, EncodedTxn).

txn_type(#blockchain_txn_payment_v1_pb{}) ->
<<"payment_v1">>.
Loading

0 comments on commit 51f270f

Please sign in to comment.