diff --git a/docs/onion-message-channels.md b/docs/onion-message-channels.md new file mode 100644 index 000000000..e883de5c2 --- /dev/null +++ b/docs/onion-message-channels.md @@ -0,0 +1,209 @@ +# HOW TO SETUP ONION MESSAGE CHANNELS IN JOINMARKET + +### Contents + +1. [Overview](#overview) + +2. [Testing, configuring for signet](#testing) + +4. [Directory nodes](#directory) + + + +## Overview + +This is a new way for Joinmarket bots to communicate, namely by serving and connecting to Tor onion services. This does not +introduce any new requirements to your Joinmarket installation, technically, because the use of Payjoin already required the need +to run such onion services, and connecting to IRC used a SOCKS5 proxy (used by almost all users) over Tor to +a remote onion service. + +(Note however that taker bots will *not* be required to serve onions; they will only make outbound SOCKS connections, as they currently do on IRC). + +The purpose of this new type of message channel is as follows: + +* less reliance on any service external to Joinmarket +* most of the transaction negotiation will be happening directly peer to peer, not passed over a central server ( +albeit it was and remains E2E encrypted data, in either case) +* the above can lead to better scalability at large numbers +* a substantial increase in the speed of transaction negotiation; this is mostly related to the throttling of high bursts of traffic on IRC + +The configuration for a user is simple; in their `joinmarket.cfg` they will get a new `[MESSAGING]` section like this, if they start from scratch: + +``` +[MESSAGING:onion] +# onion based message channels must have the exact type 'onion' +# (while the section name above can be MESSAGING:whatever), and there must +# be only ONE such message channel configured (note the directory servers +# can be multiple, below): +type = onion + +socks5_host = localhost +socks5_port = 9050 + +# the tor control configuration. +# for most people running the tor daemon +# on Linux, no changes are required here: +tor_control_host = localhost +# or, to use a UNIX socket +# tor_control_host = unix:/var/run/tor/control +tor_control_port = 9051 + +# the host/port actually serving the hidden service +# (note the *virtual port*, that the client uses, +# is hardcoded to 80): +onion_serving_host = 127.0.0.1 +onion_serving_port = 8080 + +# directory node configuration +# +# This is mandatory for directory nodes (who must also set their +# own *.onion:port as the only directory in directory_nodes, below), +# but NOT TO BE USED by non-directory nodes (which is you, unless +# you know otherwise!), as it will greatly degrade your privacy. +# (note the default is no value, don't replace it with ""). +hidden_service_dir = +# +# This is a comma separated list (comma can be omitted if only one item). +# Each item has format host:port ; both are required, though port will +# be 80 if created in this code. +# for MAINNET: +directory_nodes = 3kxw6lf5vf6y26emzwgibzhrzhmhqiw6ekrek3nqfjjmhwznb2moonad.onion,qqd22cwgygaxcy6vdw6mzwkyaxg5urb4ptbc5d74nrj25phspajxjbqd.onion + +# for SIGNET (testing network): +# directory_nodes = rr6f6qtleiiwic45bby4zwmiwjrj3jsbmcvutwpqxjziaydjydkk5iad.onion:80,k74oyetjqgcamsyhlym2vgbjtvhcrbxr4iowd4nv4zk5sehw4v665jad.onion:80 + +# This setting is ONLY for developer regtest setups, +# running multiple bots at once. Don't alter it otherwise +regtest_count = 0,0 +``` + +All of these can be left as default for most users - but most importantly, pay attention to: + +* The list of `directory_nodes`, which will be comma separated if multiple directory nodes are configured (we expect there will be 2 or 3 as a normal situation). Make sure to choose the ones for your network (mainnet by default, or signet or otherwise); if it's wrong your bot will just get auto-disconnected. +* The `onion_serving_port` is the port on the local machine on which the onion service is served; you won't usually need to use it, but it mustn't conflict with some other usage (so if you have something running on port 8080, change it). +The `type` field must always be `onion` in this case, and distinguishes it from IRC message channels and others. + +### Can/should I still run IRC message channels? + +In short, yes, at least for now, though you are free to disable any message channel you like. + +### Do I need to configure Tor, and if so, how? + +To make outbound Tor connections to other onions in the network, you will need to configure the +SOCKS5 proxy settings (so, only directory nodes may *not* need this; everyone else does). +This is identical to what we already do for IRC, except that in this case, we disallow clearnet connections. + +#### Running/testing as a maker + +A maker will additionally allow *inbound* connections to an onion service. +This onion service will be ephemeral, that is, it will have a different onion address every time +you restart. This should work automatically, using your existing Tor daemon (here, we are using +the same code as we use when running the `receive-payjoin` script, essentially). + +#### Running/testing as other bots (taker, ob-watcher) + +A taker will not attempt to serve an onion; it will only use outbound connections, first to directory +nodes and then, as according to need, to individual makers, also. + +As previously mentioned, both of these features - inbound and outbound, to onion, Tor connections - were already in use in Joinmarket. If you want to run/test as a maker bot, but never served an onion service before, it should work fine as long as you have the Tor service running in the background, +and the default control port 9051 (if not, change that value in the `joinmarket.cfg`, see above). + +#### Why not use Lightning based onions? + +(*Feel free to skip this section if you don't know what "Lightning based onions" refers to!*). The reason this architecture is +proposed as an alternative to the previously suggested Lightning-node-based network (see +[this PR](https://github.com/JoinMarket-Org/joinmarket-clientserver/pull/1000)), is mostly that: + +* the latter has a bunch of extra installation and maintenance dependencies (just one example: pyln-client requires coincurve, which we just +removed) +* the latter requires establishing a new node "identity" which can be refreshed, but that creates more concern +* longer term ideas to integrate Lightning payments to the coinjoin workflow (and vice versa!) are not realizable yet +* using multi-hop onion messaging in the LN network itself is also a way off, and a bit problematic + +So the short version is: the Lightning based alternative is certainly feasible, but has a lot more baggage that can't really be justified +unless we're actually using it for something. + + + + +## Testing, and configuring for signet. + +This testing section focuses on signet since that will be the less troublesome way of getting involved in tests for +the non-hardcore JM developer :) + +(For the latter, please use the regtest setup by running `test/e2e-coinjoin-test.py` under `pytest`, +and pay attention to the settings in `regtest_joinmarket.cfg`.) + +There is no separate/special configuration for signet other than the configuration that is already needed for running +Joinmarket against a signet backend (so e.g. RPC port of 38332). + +You can just uncomment the `directory_nodes` entry listed as SIGNET, and comment out the one for MAINNET. + +Then just make sure your bot has some signet coins and try running as maker or taker or both. + + + +## Directory nodes + +**This last section is for people with a lot of technical knowledge in this area, +who would like to help by running a directory node. You can ignore it if that does not apply.**. + +This requires a long running bot. It should be on a server you can keep running permanently, so perhaps a VPS, +but in any case, very high uptime. For reliability it also makes sense to configure to run as a systemd service. + +The currently suggested way to run a directory node is to use the script found [here](https://github.com/JoinMarket-Org/custom-scripts/blob/0eda6154265e012b907c43e2ecdacb895aa9e3ab/start-dn.py); you can place it in your `joinmarket-clientserver/scripts` directory and run it *without* arguments, but with one option flag: `--datadir=/your/chosen/datadir` (as you'll see below). + +This slightly unobvious approach is based on the following ideas: we run a Joinmarket script, with a Joinmarket python virtualenv, so that we are able to parse messages; this means that the directory node *can* be a bot, e.g. a maker bot, but need not be - and here it is basically a "crippled" maker bot that cannot do anything. This 'crippling' is actually very useful because (a) we use the `no-blockchain` argument (it is forced in-code; you don't need to set it) so we don't need a running Bitcoin node (of whatever flavour), and (b) we don't need a wallet either. + +#### Joinmarket-specific configuration + +Add a non-empty `hidden_service_dir` entry to your `[MESSAGING:onion]` with a directory accessible to your user. You may want to lock this down +a bit, but be careful changing permissions from what is created by the script, because Tor is very finicky about this. + +The hostname for your onion service will not change and will be stored permanently in that directory. + +The point to understand is: Joinmarket's `jmbase.JMHiddenService` will, if configured with a non-empty `hidden_service_dir` +field, actually start an *independent* instance of Tor specifically for serving this, under the current user. +(our Tor interface library `txtorcon` needs read access to the Tor HS dir, so it's troublesome to do this another way). + +##### Question: How to configure the `directory-nodes` list in our `joinmarket.cfg` for this directory node bot? + +Answer: **you must only enter your own node in this list!**. This way your bot will recognize that it is a directory node and it avoids weird edge case behaviour (so don't add *other* known directory nodes; you won't be talking to them). + +A natural retort is: but I don't know my own node's onion service hostname before I start it the first time. Indeed. So, just run it once with the default `directory_nodes` entries, then note down the new onion service hostname you created, and insert that as the only entry in the list. + + +#### Suggested setup of a systemd service: + +The most basic bare-bones service seems to work fine here: + +``` +[Unit] +Description=My JM signet directory node +Requires=network-online.target +After=network-online.target + +[Service] +Type=simple +ExecStart=/bin/bash -c 'cd /path/to/joinmarket-clientserver && source jmvenv/bin/activate && cd scripts && python start-dn.py --datadir=/path/to/chosen/datadir' +User=user + +[Install] +WantedBy=multi-user.target +``` + +... however, you need to kind of 'bootstrap' it the first time. For example: + +* run once with systemctl start + +* look at log with `journalctl`, service fails due to default `joinmarket.cfg` and quit. +* go to that cfg file. Remove the IRC settings, they serve no purpose here. Change the `hidden_service_dir` to `/yourlocation/hidserv` (the actual directory need not exist, it's better if it doesn't, this first time). Edit the `network` field in `BLOCKCHAIN` to whatever network (mainnet, signet) you intend to support - it can be only one for one directory node, for now. + +* `systemctl start` again, now note the onion hostname created from the log or the directory + +* set that hostname in `directory_nodes` in `joinmarket.cfg` + +* now the service should start correctly + +TODO: add some material on network hardening/firewalls here, I guess. + diff --git a/jmbase/jmbase/commands.py b/jmbase/jmbase/commands.py index 498b96ee0..d75721c91 100644 --- a/jmbase/jmbase/commands.py +++ b/jmbase/jmbase/commands.py @@ -27,11 +27,11 @@ class JMInit(JMCommand): """Communicates the client's required setup configuration. Blockchain source is communicated only as a naming - tag for messagechannels (currently IRC 'realname' field). + tag for messagechannels (for IRC, 'realname' field). """ arguments = [(b'bcsource', Unicode()), (b'network', Unicode()), - (b'irc_configs', JsonEncodable()), + (b'chan_configs', JsonEncodable()), (b'minmakers', Integer()), (b'maker_timeout_sec', Integer()), (b'dust_threshold', Integer()), diff --git a/jmbase/jmbase/twisted_utils.py b/jmbase/jmbase/twisted_utils.py index f7e2f287b..b7594d181 100644 --- a/jmbase/jmbase/twisted_utils.py +++ b/jmbase/jmbase/twisted_utils.py @@ -128,16 +128,23 @@ def config_to_hs_ports(virtual_port, host, port): class JMHiddenService(object): """ Wrapper class around the actions needed to create and serve on a hidden service; an object of - type Resource must be provided in the constructor, - which does the HTTP serving actions (GET, POST serving). + type either Resource or server.ProtocolFactory must + be provided in the constructor, which does the HTTP + (GET, POST) or other protocol serving actions. """ - def __init__(self, resource, info_callback, error_callback, - onion_hostname_callback, tor_control_host, + def __init__(self, proto_factory_or_resource, info_callback, + error_callback, onion_hostname_callback, tor_control_host, tor_control_port, serving_host, serving_port, - virtual_port = None, - shutdown_callback = None): - self.site = Site(resource) - self.site.displayTracebacks = False + virtual_port=None, + shutdown_callback=None, + hidden_service_dir=""): + if isinstance(proto_factory_or_resource, Resource): + # TODO bad naming, in this case it doesn't start + # out as a protocol factory; a Site is one, a Resource isn't. + self.proto_factory = Site(proto_factory_or_resource) + self.proto_factory.displayTracebacks = False + else: + self.proto_factory = proto_factory_or_resource self.info_callback = info_callback self.error_callback = error_callback # this has a separate callback for convenience, it should @@ -155,6 +162,13 @@ def __init__(self, resource, info_callback, error_callback, # config object, so no default here: self.serving_host = serving_host self.serving_port = serving_port + # this is used to serve an onion from the filesystem, + # NB: Because of how txtorcon is set up, this option + # uses a *separate tor instance* owned by the owner of + # this script (because txtorcon needs to read the + # HS dir), whereas if this option is "", we set up + # an ephemeral HS on the global or pre-existing tor. + self.hidden_service_dir = hidden_service_dir def start_tor(self): """ This function executes the workflow @@ -162,19 +176,31 @@ def start_tor(self): """ self.info_callback("Attempting to start onion service on port: {} " "...".format(self.virtual_port)) - if str(self.tor_control_host).startswith('unix:'): - control_endpoint = UNIXClientEndpoint(reactor, - self.tor_control_host[5:]) + if self.hidden_service_dir == "": + if str(self.tor_control_host).startswith('unix:'): + control_endpoint = UNIXClientEndpoint(reactor, + self.tor_control_host[5:]) + else: + control_endpoint = TCP4ClientEndpoint(reactor, + self.tor_control_host, self.tor_control_port) + d = txtorcon.connect(reactor, control_endpoint) + d.addCallback(self.create_onion_ep) + d.addErrback(self.setup_failed) + # TODO: add errbacks to the next two calls in + # the chain: + d.addCallback(self.onion_listen) + d.addCallback(self.print_host) else: - control_endpoint = TCP4ClientEndpoint(reactor, - self.tor_control_host, self.tor_control_port) - d = txtorcon.connect(reactor, control_endpoint) - d.addCallback(self.create_onion_ep) - d.addErrback(self.setup_failed) - # TODO: add errbacks to the next two calls in - # the chain: - d.addCallback(self.onion_listen) - d.addCallback(self.print_host) + ep = "onion:" + str(self.virtual_port) + ":localPort=" + ep += str(self.serving_port) + # endpoints.TCPHiddenServiceEndpoint creates version 2 by + # default for backwards compat (err, txtorcon needs to update that ...) + ep += ":version=3" + ep += ":hiddenServiceDir="+self.hidden_service_dir + onion_endpoint = serverFromString(reactor, ep) + d = onion_endpoint.listen(self.proto_factory) + d.addCallback(self.print_host_filesystem) + def setup_failed(self, arg): # Note that actions based on this failure are deferred to callers: @@ -195,7 +221,8 @@ def onion_listen(self, onion): serverstring = "tcp:{}:interface={}".format(self.serving_port, self.serving_host) onion_endpoint = serverFromString(reactor, serverstring) - return onion_endpoint.listen(self.site) + print("created the onion endpoint, now calling listen") + return onion_endpoint.listen(self.proto_factory) def print_host(self, ep): """ Callback fired once the HS is available @@ -204,6 +231,14 @@ def print_host(self, ep): """ self.onion_hostname_callback(self.onion.hostname) + def print_host_filesystem(self, port): + """ As above but needed to respect slightly different + callback chain for this case (where we start our own tor + instance for the filesystem-based onion). + """ + self.onion = port.onion_service + self.onion_hostname_callback(self.onion.hostname) + def shutdown(self): self.tor_connection.protocol.transport.loseConnection() self.info_callback("Hidden service shutdown complete") diff --git a/jmbase/test/test_commands.py b/jmbase/test/test_commands.py index c6e4100b6..0c05e9582 100644 --- a/jmbase/test/test_commands.py +++ b/jmbase/test/test_commands.py @@ -43,9 +43,9 @@ def end_test(): class JMTestServerProtocol(JMBaseProtocol): @JMInit.responder - def on_JM_INIT(self, bcsource, network, irc_configs, minmakers, + def on_JM_INIT(self, bcsource, network, chan_configs, minmakers, maker_timeout_sec, dust_threshold, blacklist_location): - show_receipt("JMINIT", bcsource, network, irc_configs, minmakers, + show_receipt("JMINIT", bcsource, network, chan_configs, minmakers, maker_timeout_sec, dust_threshold, blacklist_location) d = self.callRemote(JMInitProto, nick_hash_length=1, @@ -137,7 +137,7 @@ def clientStart(self): d = self.callRemote(JMInit, bcsource="dummyblockchain", network="dummynetwork", - irc_configs=['dummy', 'irc', 'config'], + chan_configs=['dummy', 'irc', 'config'], minmakers=7, maker_timeout_sec=8, dust_threshold=1500, diff --git a/jmclient/jmclient/__init__.py b/jmclient/jmclient/__init__.py index df2ed38e6..fe5bad251 100644 --- a/jmclient/jmclient/__init__.py +++ b/jmclient/jmclient/__init__.py @@ -24,7 +24,7 @@ TYPE_P2PKH, TYPE_P2SH_P2WPKH, TYPE_P2WPKH, detect_script_type) from .configure import (load_test_config, process_shutdown, load_program_config, jm_single, get_network, update_persist_config, - validate_address, is_burn_destination, get_irc_mchannels, + validate_address, is_burn_destination, get_mchannels, get_blockchain_interface_instance, set_config, is_segwit_mode, is_native_segwit_mode, JMPluginService, get_interest_rate, get_bondless_makers_allowance, check_and_start_tor) @@ -33,7 +33,7 @@ from .snicker_receiver import SNICKERError, SNICKERReceiver from .client_protocol import (JMTakerClientProtocol, JMClientProtocolFactory, start_reactor, SNICKERClientProtocolFactory, - BIP78ClientProtocolFactory, + BIP78ClientProtocolFactory, JMMakerClientProtocol, get_daemon_serving_params) from .podle import (set_commitment_file, get_commitment_file, add_external_commitments, diff --git a/jmclient/jmclient/client_protocol.py b/jmclient/jmclient/client_protocol.py index 68ea865ac..180932d5b 100644 --- a/jmclient/jmclient/client_protocol.py +++ b/jmclient/jmclient/client_protocol.py @@ -15,7 +15,7 @@ import sys from jmbase import (get_log, EXIT_FAILURE, hextobin, bintohex, utxo_to_utxostr, bdict_sdict_convert) -from jmclient import (jm_single, get_irc_mchannels, +from jmclient import (jm_single, get_mchannels, RegtestBitcoinCoreInterface, SNICKERReceiver, process_shutdown) import jmbitcoin as btc @@ -434,7 +434,7 @@ def clientStart(self): "blockchain_source") #needed only for channel naming convention network = jm_single().config.get("BLOCKCHAIN", "network") - irc_configs = get_irc_mchannels() + chan_configs = self.factory.get_mchannels(mode="MAKER") #only here because Init message uses this field; not used by makers TODO minmakers = jm_single().config.getint("POLICY", "minimum_makers") maker_timeout_sec = jm_single().maker_timeout_sec @@ -442,7 +442,7 @@ def clientStart(self): d = self.callRemote(commands.JMInit, bcsource=blockchain_source, network=network, - irc_configs=irc_configs, + chan_configs=chan_configs, minmakers=minmakers, maker_timeout_sec=maker_timeout_sec, dust_threshold=jm_single().DUST_THRESHOLD, @@ -601,7 +601,7 @@ def clientStart(self): "blockchain_source") #needed only for channel naming convention network = jm_single().config.get("BLOCKCHAIN", "network") - irc_configs = get_irc_mchannels() + chan_configs = self.factory.get_mchannels(mode="TAKER") minmakers = jm_single().config.getint("POLICY", "minimum_makers") maker_timeout_sec = jm_single().maker_timeout_sec @@ -614,7 +614,7 @@ def clientStart(self): d = self.callRemote(commands.JMInit, bcsource=blockchain_source, network=network, - irc_configs=irc_configs, + chan_configs=chan_configs, minmakers=minmakers, maker_timeout_sec=maker_timeout_sec, dust_threshold=jm_single().DUST_THRESHOLD, @@ -789,12 +789,21 @@ def __init__(self, client, proto_type="TAKER"): def setClient(self, client): self.proto_client = client + def getClient(self): return self.proto_client def buildProtocol(self, addr): return self.protocol(self, self.client) + def get_mchannels(self, mode): + """ A transparent wrapper that allows override, + so that a script can return a customised set of + message channel configs; currently used for testing + multiple bots on regtest. + """ + return get_mchannels(mode) + def start_reactor(host, port, factory=None, snickerfactory=None, bip78=False, jm_coinjoin=True, ish=True, daemon=False, rs=True, gui=False): #pragma: no cover diff --git a/jmclient/jmclient/configure.py b/jmclient/jmclient/configure.py index b4498616b..2899af6c7 100644 --- a/jmclient/jmclient/configure.py +++ b/jmclient/jmclient/configure.py @@ -141,53 +141,99 @@ def jm_single(): # information. rpc_wallet_file = -## SERVER 1/3) Darkscience IRC (Tor, IP) +[MESSAGING:onion] +# onion based message channels must have the exact type 'onion' +# (while the section name above can be MESSAGING:whatever), and there must +# be only ONE such message channel configured (note the directory servers +# can be multiple, below): +type = onion + +socks5_host = localhost +socks5_port = 9050 + +# the tor control configuration. +# for most people running the tor daemon +# on Linux, no changes are required here: +tor_control_host = localhost +# or, to use a UNIX socket +# tor_control_host = unix:/var/run/tor/control +tor_control_port = 9051 + +# the host/port actually serving the hidden service +# (note the *virtual port*, that the client uses, +# is hardcoded to 80): +onion_serving_host = 127.0.0.1 +onion_serving_port = 8080 + +# directory node configuration +# +# This is mandatory for directory nodes (who must also set their +# own *.onion:port as the only directory in directory_nodes, below), +# but NOT TO BE USED by non-directory nodes (which is you, unless +# you know otherwise!), as it will greatly degrade your privacy. +# (note the default is no value, don't replace it with ""). +hidden_service_dir = +# +# This is a comma separated list (comma can be omitted if only one item). +# Each item has format host:port ; both are required, though port will +# be 80 if created in this code. +# for MAINNET: +directory_nodes = 3kxw6lf5vf6y26emzwgibzhrzhmhqiw6ekrek3nqfjjmhwznb2moonad.onion:80,qqd22cwgygaxcy6vdw6mzwkyaxg5urb4ptbc5d74nrj25phspajxjbqd.onion:80 + +# for SIGNET (testing network): +# directory_nodes = rr6f6qtleiiwic45bby4zwmiwjrj3jsbmcvutwpqxjziaydjydkk5iad.onion:80,k74oyetjqgcamsyhlym2vgbjtvhcrbxr4iowd4nv4zk5sehw4v665jad.onion:80 + +# This setting is ONLY for developer regtest setups, +# running multiple bots at once. Don't alter it otherwise +regtest_count = 0,0 + +## IRC SERVER 1: Darkscience IRC (Tor, IP) ################################################################################ [MESSAGING:server1] +# by default the legacy format without a `type` field is +# understood to be IRC, but you can, optionally, add it: +# type = irc channel = joinmarket-pit port = 6697 usessl = true -# For traditional IP (default): -host = irc.darkscience.net -socks5 = false +# For traditional IP: +#host = irc.darkscience.net +#socks5 = false # For Tor (recommended as clearnet alternative): -#host = darkirc6tqgpnwd3blln3yfv5ckl47eg7llfxkmtovrv7c7iwohhb6ad.onion -#socks5 = true -#socks5_host = localhost -#socks5_port = 9050 +host = darkirc6tqgpnwd3blln3yfv5ckl47eg7llfxkmtovrv7c7iwohhb6ad.onion +socks5 = true +socks5_host = localhost +socks5_port = 9050 -## SERVER 2/3) hackint IRC (Tor, IP) +## IRC SERVER 2: ILITA IRC (optional IRC alternate, Tor only) ################################################################################ [MESSAGING:server2] channel = joinmarket-pit +port = 6667 +usessl = false +socks5 = true +socks5_host = localhost -# For traditional IP (default): -host = irc.hackint.org -port = 6697 -usessl = true -socks5 = false - -# For Tor (recommended as clearnet alternative): -#host = ncwkrwxpq2ikcngxq3dy2xctuheniggtqeibvgofixpzvrwpa77tozqd.onion -#port = 6667 -#usessl = false -#socks5 = true -#socks5_host = localhost -#socks5_port = 9050 +host = ilitafrzzgxymv6umx2ux7kbz3imyeko6cnqkvy4nisjjj4qpqkrptid.onion +socks5_port = 9050 -## SERVER 3/3) ILITA IRC (Tor - disabled by default) +## IRC SERVER 3: (backup) hackint IRC (Tor, IP) ################################################################################ #[MESSAGING:server3] -#channel = joinmarket-pit +# channel = joinmarket-pit +# For traditional IP: +## host = irc.hackint.org +## port = 6697 +## usessl = true +## socks5 = false +# For Tor (default): +#host = ncwkrwxpq2ikcngxq3dy2xctuheniggtqeibvgofixpzvrwpa77tozqd.onion #port = 6667 #usessl = false #socks5 = true #socks5_host = localhost - -# For Tor (recommended): -#host = ilitafrzzgxymv6umx2ux7kbz3imyeko6cnqkvy4nisjjj4qpqkrptid.onion #socks5_port = 9050 [LOGGING] @@ -488,7 +534,7 @@ def set_config(cfg, bcint=None): global_singleton.bc_interface = bcint -def get_irc_mchannels(): +def get_mchannels(mode="TAKER"): SECTION_NAME = 'MESSAGING' # FIXME: remove in future release if jm_single().config.has_section(SECTION_NAME): @@ -499,34 +545,64 @@ def get_irc_mchannels(): return _get_irc_mchannels_old() SECTION_NAME += ':' - irc_sections = [] - for s in jm_single().config.sections(): - if s.startswith(SECTION_NAME): - irc_sections.append(s) - assert irc_sections - req_fields = [("host", str), ("port", int), ("channel", str), ("usessl", str)] + irc_fields = [("host", str), ("port", int), ("channel", str), ("usessl", str), + ("socks5", str), ("socks5_host", str), ("socks5_port", int)] + onion_fields = [("type", str), ("directory_nodes", str), ("regtest_count", str), + ("socks5_host", str), ("socks5_port", int), + ("tor_control_host", str), ("tor_control_port", int), + ("onion_serving_host", str), ("onion_serving_port", int), + ("hidden_service_dir", str)] - configs = [] - for section in irc_sections: + def get_irc_section(s): server_data = {} - # check if socks5 is enabled for tor and load relevant config if so try: - server_data["socks5"] = jm_single().config.get(section, "socks5") + server_data["socks5"] = jm_single().config.get(s, "socks5") except NoOptionError: server_data["socks5"] = "false" if server_data["socks5"].lower() == 'true': - server_data["socks5_host"] = jm_single().config.get(section, "socks5_host") - server_data["socks5_port"] = jm_single().config.get(section, "socks5_port") + server_data["socks5_host"] = jm_single().config.get(s, "socks5_host") + server_data["socks5_port"] = jm_single().config.get(s, "socks5_port") - for option, otype in req_fields: - val = jm_single().config.get(section, option) + for option, otype in irc_fields: + val = jm_single().config.get(s, option) server_data[option] = otype(val) server_data['btcnet'] = get_network() - configs.append(server_data) - return configs - + return server_data + + def get_onion_section(s): + onion_data = {} + for option, otype in onion_fields: + try: + val = jm_single().config.get(s, option) + except NoOptionError: + continue + onion_data[option] = otype(val) + # the onion messaging section must specify whether + # to serve an onion: + onion_data["serving"] = mode == "MAKER" + onion_data['btcnet'] = get_network() + # Just to allow a dynamic set of var: + onion_data["section-name"] = s + return onion_data + + onion_sections = [] + irc_sections = [] + for section in jm_single().config.sections(): + if not section.startswith(SECTION_NAME): + continue + if jm_single().config.has_option(section, "type"): + channel_type = jm_single().config.get(section, "type").lower() + if channel_type == "onion": + onion_sections.append(get_onion_section(section)) + elif channel_type == "irc": + irc_sections.append(get_irc_section(section)) + else: + irc_sections.append(get_irc_section(section)) + assert irc_sections or onion_sections + assert len(onion_sections) < 2 + return irc_sections + onion_sections def _get_irc_mchannels_old(): fields = [("host", str), ("port", int), ("channel", str), ("usessl", str), @@ -655,28 +731,6 @@ def load_program_config(config_path="", bs=None, plugin_services=[]): "settings and restart joinmarket.", "info") sys.exit(EXIT_FAILURE) - #These are left as sanity checks but currently impossible - #since any edits are overlays to the default, these sections/options will - #always exist. - # FIXME: This check is a best-effort attempt. Certain incorrect section - # names can pass and so can non-first invalid sections. - for s in required_options: #pragma: no cover - # check for sections - avail = None - if not global_singleton.config.has_section(s): - for avail in global_singleton.config.sections(): - if avail.startswith(s): - break - else: - raise Exception( - "Config file does not contain the required section: " + s) - # then check for specific options - k = avail or s - for o in required_options[s]: - if not global_singleton.config.has_option(k, o): - raise Exception("Config file does not contain the required " - "option '{}' in section '{}'.".format(o, k)) - loglevel = global_singleton.config.get("LOGGING", "console_log_level") try: set_logging_level(loglevel) diff --git a/jmclient/jmclient/wallet_rpc.py b/jmclient/jmclient/wallet_rpc.py index 4597cf475..dd50b72b0 100644 --- a/jmclient/jmclient/wallet_rpc.py +++ b/jmclient/jmclient/wallet_rpc.py @@ -159,6 +159,9 @@ def __init__(self, port, wss_port, tls=True): # can be shut down cleanly: self.coinjoin_connection = None + def get_client_factory(self): + return JMClientProtocolFactory(self.taker) + def activate_coinjoin_state(self, state): """ To be set when a maker or taker operation is initialized; they cannot @@ -1003,13 +1006,13 @@ def dummy_user_callback(rel, abs): self.taker = Taker(self.services["wallet"], schedule, max_cj_fee = max_cj_fee, callbacks=(self.filter_orders_callback, - None, self.taker_finished)) + None, self.taker_finished)) # TODO ; this makes use of a pre-existing hack to allow # selectively disabling the stallMonitor function that checks # if transactions went through or not; here we want to cleanly # destroy the Taker after an attempt is made, successful or not. self.taker.testflag = True - self.clientfactory = JMClientProtocolFactory(self.taker) + self.clientfactory = self.get_client_factory() dhost, dport = self.check_daemon_ready() diff --git a/jmclient/test/test_client_protocol.py b/jmclient/test/test_client_protocol.py index d60adcbff..d1f07f9f5 100644 --- a/jmclient/test/test_client_protocol.py +++ b/jmclient/test/test_client_protocol.py @@ -167,9 +167,9 @@ def end_test(): class JMTestServerProtocol(JMBaseProtocol): @JMInit.responder - def on_JM_INIT(self, bcsource, network, irc_configs, minmakers, + def on_JM_INIT(self, bcsource, network, chan_configs, minmakers, maker_timeout_sec, dust_threshold, blacklist_location): - show_receipt("JMINIT", bcsource, network, irc_configs, minmakers, + show_receipt("JMINIT", bcsource, network, chan_configs, minmakers, maker_timeout_sec, dust_threshold, blacklist_location) d = self.callRemote(JMInitProto, nick_hash_length=1, diff --git a/jmdaemon/jmdaemon/__init__.py b/jmdaemon/jmdaemon/__init__.py index 384b5f720..fc1c4070b 100644 --- a/jmdaemon/jmdaemon/__init__.py +++ b/jmdaemon/jmdaemon/__init__.py @@ -4,6 +4,7 @@ from .enc_wrapper import as_init_encryption, decode_decrypt, \ encrypt_encode, init_keypair, init_pubkey, get_pubkey, NaclError from .irc import IRCMessageChannel +from .onionmc import OnionMessageChannel from jmbase.support import get_log from .message_channel import MessageChannel, MessageChannelCollection from .orderbookwatch import OrderbookWatch diff --git a/jmdaemon/jmdaemon/daemon_protocol.py b/jmdaemon/jmdaemon/daemon_protocol.py index b20a55107..9fdd641d6 100644 --- a/jmdaemon/jmdaemon/daemon_protocol.py +++ b/jmdaemon/jmdaemon/daemon_protocol.py @@ -7,8 +7,9 @@ from .protocol import (COMMAND_PREFIX, ORDER_KEYS, NICK_HASH_LENGTH, NICK_MAX_ENCODED, JM_VERSION, JOINMARKET_NICK_HEADER, COMMITMENT_PREFIXES) -from .irc import IRCMessageChannel +from .irc import IRCMessageChannel +from .onionmc import OnionMessageChannel from jmbase import (is_hs_uri, get_tor_agent, JMHiddenService, get_nontor_agent, BytesProducer, wrapped_urlparse, bdict_sdict_convert, JMHTTPResource) @@ -474,7 +475,7 @@ def __init__(self, factory): self.factory = factory self.jm_state = 0 self.restart_mc_required = False - self.irc_configs = None + self.chan_configs = None self.mcc = None #Default role is TAKER; must be overriden to MAKER in JMSetup message. self.role = "TAKER" @@ -503,7 +504,7 @@ def defaultCallbacks(self, d): d.addErrback(self.defaultErrback) @JMInit.responder - def on_JM_INIT(self, bcsource, network, irc_configs, minmakers, + def on_JM_INIT(self, bcsource, network, chan_configs, minmakers, maker_timeout_sec, dust_threshold, blacklist_location): """Reads in required configuration from client for a new session; feeds back joinmarket messaging protocol constants @@ -517,20 +518,25 @@ def on_JM_INIT(self, bcsource, network, irc_configs, minmakers, self.dust_threshold = int(dust_threshold) #(bitcoin) network only referenced in channel name construction self.network = network - if irc_configs == self.irc_configs: + if chan_configs == self.chan_configs: self.restart_mc_required = False log.msg("New init received did not require a new message channel" " setup.") else: - if self.irc_configs: + if self.chan_configs: #close the existing connections self.mc_shutdown() - self.irc_configs = irc_configs + self.chan_configs = chan_configs self.restart_mc_required = True - mcs = [IRCMessageChannel(c, - daemon=self, - realname='btcint=' + bcsource) - for c in self.irc_configs] + mcs = [] + for c in self.chan_configs: + if "type" in c and c["type"] == "onion": + mcs.append(OnionMessageChannel(c, daemon=self)) + else: + # default is IRC; TODO allow others + mcs.append(IRCMessageChannel(c, + daemon=self, + realname='btcint=' + bcsource)) self.mcc = MessageChannelCollection(mcs) OrderbookWatch.set_msgchan(self, self.mcc) #register taker-specific msgchan callbacks here @@ -946,7 +952,8 @@ def init_connections(self, nick): for a new transaction; effectively means any previous incomplete transaction is wiped. """ - self.jm_state = 0 #uninited + self.jm_state = 0 + self.mcc.set_nick(nick) if self.restart_mc_required: self.mcc.run() self.restart_mc_required = False @@ -954,7 +961,6 @@ def init_connections(self, nick): #if we are not restarting the MC, #we must simulate the on_welcome message: self.on_welcome() - self.mcc.set_nick(nick) def transfer_commitment(self, commit): """Send this commitment via privmsg to one (random) diff --git a/jmdaemon/jmdaemon/message_channel.py b/jmdaemon/jmdaemon/message_channel.py index 96be37ec6..4b46d817e 100644 --- a/jmdaemon/jmdaemon/message_channel.py +++ b/jmdaemon/jmdaemon/message_channel.py @@ -259,13 +259,13 @@ def privmsg(self, nick, cmd, message, mc=None): for x in self.available_channels() if mc == x.hostid] if len(matching_channels) != 1: #pragma: no cover - #this can happen if an IRC goes down shortly before a message + #this can happen if a m-channel goes down shortly before a message #is supposed to be sent. There used to be an exception raise. #to prevent a crash (especially in makers), we just inform #the user about it for now - log.error("Tried to communicate on this IRC server but " + log.error("Tried to communicate on this message channel but " "failed: " + str(mc)) - log.error("You might have to comment out this IRC server " + log.error("You might have to comment out this message channel" "in joinmarket.cfg and restart.") log.error("No action needed for makers / yield generators!") # todo: add logic to continue on other available mc @@ -444,7 +444,7 @@ def on_welcome_trigger(self, mc): if (not self.on_welcome_announce_id) and self.on_welcome: self.on_welcome_announce_id = reactor.callLater(60, self.on_welcome_setup_finished,) else: - log.info("All IRC servers connected, starting execution.") + log.info("All message channels connected, starting execution.") if self.on_welcome_announce_id: self.on_welcome_announce_id.cancel() self.on_welcome_setup_finished() diff --git a/jmdaemon/jmdaemon/onionmc.py b/jmdaemon/jmdaemon/onionmc.py new file mode 100644 index 000000000..400053fed --- /dev/null +++ b/jmdaemon/jmdaemon/onionmc.py @@ -0,0 +1,1358 @@ +from jmdaemon.message_channel import MessageChannel +from jmdaemon.protocol import COMMAND_PREFIX, JM_VERSION +from jmbase import get_log, JM_APP_NAME, JMHiddenService, stop_reactor +import json +import copy +from typing import Callable, Union, Tuple, List +from twisted.internet import reactor, task, protocol +from twisted.protocols import basic +from twisted.application.internet import ClientService +from twisted.internet.endpoints import TCP4ClientEndpoint +from twisted.internet.address import IPv4Address, IPv6Address +from txtorcon.socks import TorSocksEndpoint, HostUnreachableError + +log = get_log() + + +NOT_SERVING_ONION_HOSTNAME = "NOT-SERVING-ONION" + +# How many seconds to wait before treating an onion +# as unreachable +CONNECT_TO_ONION_TIMEOUT = 10 + +def location_tuple_to_str(t: Tuple[str, int]) -> str: + return f"{t[0]}:{t[1]}" + +def network_addr_to_string(location: Union[IPv4Address, IPv4Address]) -> str: + if isinstance(location, (IPv4Address, IPv6Address)): + host = location.host + port = location.port + else: + # TODO handle other addr types + assert False + return location_tuple_to_str((host, port)) + +# module-level var to control whether we use Tor or not +# (specifically for tests) +testing_mode = False +def set_testing_mode(configdata: dict) -> None: + """ Toggles testing mode which enables non-Tor + network setup: + """ + global testing_mode + if "regtest_count" not in configdata: + testing_mode = False + return + try: + s, e = [int(x) for x in configdata["regtest_count"].split(",")] + except Exception as e: + log.info("Failed to get regtest count settings, error: {}".format(repr(e))) + testing_mode = False + return + if s == e == 0: + testing_mode = False + return + testing_mode = True + +""" +Messaging protocol (which wraps the underlying Joinmarket +messaging protocol) used here is documented in: +Joinmarket-Docs/onion-messaging.md +""" + +LOCAL_CONTROL_MESSAGE_TYPES = {"connect": 785, "disconnect": 787, "connect-in": 797} +CONTROL_MESSAGE_TYPES = {"peerlist": 789, "getpeerlist": 791, + "handshake": 793, "dn-handshake": 795, + "ping": 797, "pong": 799, "disconnect": 801} +JM_MESSAGE_TYPES = {"privmsg": 685, "pubmsg": 687} + +# Used for some control message construction, as detailed below. +NICK_PEERLOCATOR_SEPARATOR = ";" + +# location_string, nick and network must be set before sending, +# otherwise invalid: +client_handshake_json = {"app-name": JM_APP_NAME, + "directory": False, + "location-string": "", + "proto-ver": JM_VERSION, + "features": {}, + "nick": "", + "network": "" +} + +# default acceptance false; code must switch it on: +server_handshake_json = {"app-name": JM_APP_NAME, + "directory": True, + "proto-ver-min": JM_VERSION, + "proto-ver-max": JM_VERSION, + "features": {}, + "accepted": False, + "nick": "", + "network": "", + "motd": "Default MOTD, replace with information for the directory." + } + +# states that keep track of relationship to a peer +PEER_STATUS_UNCONNECTED, PEER_STATUS_CONNECTED, PEER_STATUS_HANDSHAKED, \ + PEER_STATUS_DISCONNECTED = range(4) + + +class OnionPeerError(Exception): + pass + +class OnionPeerDirectoryWithoutHostError(OnionPeerError): + pass + +class OnionPeerConnectionError(OnionPeerError): + pass + +class OnionCustomMessageDecodingError(Exception): + pass + +class InvalidLocationStringError(Exception): + pass + +class OnionCustomMessage(object): + """ Encapsulates the messages passed over the wire + to and from other onion peers + """ + def __init__(self, text: str, msgtype: int): + self.text = text + self.msgtype = msgtype + + def encode(self) -> bytes: + self.encoded = json.dumps({"type": self.msgtype, + "line": self.text}).encode("utf-8") + return self.encoded + + @classmethod + def from_string_decode(cls, msg: bytes) -> 'OnionCustomMessage': + """ Build a custom message from a json-ified string. + """ + try: + msg_obj = json.loads(msg) + text = msg_obj["line"] + msgtype = msg_obj["type"] + # we insist on integer but not a valid msgtype, + # crudely 'syntax, not semantics': + # semantics is the job of the OnionMessageChannel object. + assert isinstance(msgtype, int) + assert isinstance(text, str) + except: + # this blanket catch and re-raise: + # we must handle untrusted input bytes without + # crashing under any circumstance. + raise OnionCustomMessageDecodingError + return cls(text, msgtype) + +class OnionLineProtocol(basic.LineReceiver): + def connectionMade(self): + self.factory.register_connection(self) + basic.LineReceiver.connectionMade(self) + + def connectionLost(self, reason): + self.factory.register_disconnection(self) + basic.LineReceiver.connectionLost(self, reason) + + def lineReceived(self, line: bytes) -> None: + try: + msg = OnionCustomMessage.from_string_decode(line) + except OnionCustomMessageDecodingError: + log.debug("Received invalid message: {}, " + "dropping connection.".format(line)) + self.transport.loseConnection() + return + self.factory.receive_message(msg, self) + + def message(self, message: OnionCustomMessage) -> None: + self.sendLine(message.encode()) + +class OnionLineProtocolFactory(protocol.ServerFactory): + """ This factory allows us to start up instances + of the LineReceiver protocol that are instantiated + towards us. + """ + protocol = OnionLineProtocol + + def __init__(self, client: 'OnionMessageChannel'): + self.client = client + self.peers = {} + + def register_connection(self, p: OnionLineProtocol) -> None: + # make a local control message registering + # the new connection + peer_location = network_addr_to_string(p.transport.getPeer()) + self.peers[peer_location] = p + self.client.register_connection(peer_location, direction=0) + + def register_disconnection(self, p: OnionLineProtocol) -> None: + # make a local control message registering + # the disconnection + peer_location = network_addr_to_string(p.transport.getPeer()) + self.client.register_disconnection(peer_location) + if peer_location not in self.peers: + log.warn("Disconnection event registered for non-existent peer.") + return + del self.peers[peer_location] + + def disconnect_inbound_peer(self, inbound_peer_str: str) -> None: + if inbound_peer_str not in self.peers: + log.warn("cannot disconnect peer at {}, not found".format( + inbound_peer_str)) + proto = self.peers[inbound_peer_str] + proto.transport.loseConnection() + + def receive_message(self, message: OnionCustomMessage, + p: OnionLineProtocol) -> None: + self.client.receive_msg(message, network_addr_to_string( + p.transport.getPeer())) + + def send(self, message: OnionCustomMessage, destination: str) -> bool: + if destination not in self.peers: + log.warn("sending message {}, destination {} was not in peers {}".format( + message.encode(), destination, self.peers)) + return False + proto = self.peers[destination] + proto.message(message) + return True + +class OnionClientFactory(protocol.ReconnectingClientFactory): + """ We define a distinct protocol factory for outbound connections. + Notably, this factory supports only *one* protocol instance at a time. + """ + protocol = OnionLineProtocol + + def __init__(self, message_receive_callback: Callable, + connection_callback: Callable, + disconnection_callback: Callable, + message_not_sendable_callback: Callable, + directory: bool, + mc: 'OnionMessageChannel'): + self.proto_client = None + # callback takes OnionCustomMessage as arg and returns None + self.message_receive_callback = message_receive_callback + # connection callback, no args, returns None + self.connection_callback = connection_callback + # disconnection the same + self.disconnection_callback = disconnection_callback + # a callback that can be fired if we are not able to send messages, + # no args, returns None + self.message_not_sendable_callback = message_not_sendable_callback + # is this connection to a directory? + self.directory = directory + # to keep track of state of overall messagechannel + self.mc = mc + + def clientConnectionLost(self, connector, reason): + log.debug('Onion client connection lost: ' + str(reason)) + # persistent reconnection is reserved for directories; + # for makers, it isn't logical to keep trying; they may + # well have just shut down the onion permanently, and we can + # reach them via directory anyway. + if self.directory and not self.mc.give_up: + if reactor.running: + log.info('Attempting to reconnect...') + protocol.ReconnectingClientFactory.clientConnectionLost(self, + connector, reason) + + def clientConnectionFailed(self, connector, reason): + log.info('Onion client connection failed: ' + str(reason)) + # reasoning here exactly as for clientConnectionLost + if self.directory and not self.mc.give_up: + if reactor.running: + log.info('Attempting to reconnect...') + protocol.ReconnectingClientFactory.clientConnectionFailed(self, + connector, reason) + def register_connection(self, p: OnionLineProtocol) -> None: + self.proto_client = p + self.connection_callback() + + def register_disconnection(self, p: OnionLineProtocol) -> None: + self.proto_client = None + self.disconnection_callback() + + def send(self, msg: OnionCustomMessage) -> bool: + # we may be sending at the time the counterparty + # disconnected + if not self.proto_client: + self.message_not_sendable_callback() + return False + self.proto_client.message(msg) + # Unlike the serving protocol, the client protocol + # is never in a condition of not knowing the counterparty + return True + + def receive_message(self, message: OnionCustomMessage, + p: OnionLineProtocol) -> None: + self.message_receive_callback(message) + +class OnionPeer(object): + + def __init__(self, messagechannel: 'OnionMessageChannel', + socks5_host: str, socks5_port: int, + location_tuple: Tuple[str, int], + directory: bool=False, nick: str="", + handshake_callback: Callable=None): + # reference to the managing OnionMessageChannel instance is + # needed so that we know where to send the messages received + # from this peer: + self.messagechannel = messagechannel + self.nick = nick + # client side net config: + self.socks5_host = socks5_host + self.socks5_port = socks5_port + # remote net config: + self.hostname = location_tuple[0] + self.port = location_tuple[1] + # alternate location strings are used for inbound + # connections for this peer (these will be used by + # directories and onion-serving peers, sending + # messages backwards on a connection created towards them). + self.alternate_location = "" + if self.hostname != NOT_SERVING_ONION_HOSTNAME: + # There is no harm in always setting it by default; + # it only gets used if we don't have an outbound. + self.set_alternate_location(location_tuple_to_str( + location_tuple)) + if directory and not self.hostname: + raise OnionPeerDirectoryWithoutHostError() + self.directory = directory + self._status = PEER_STATUS_UNCONNECTED + #A function to be called to initiate a handshake; + # it should take a single argument, an OnionPeer object, + #and return None. + self.handshake_callback = handshake_callback + # Keep track of the protocol factory used to connect + # to the remote peer. Note that this won't always be used, + # if we have an inbound connection from this peer: + self.factory = None + # the reconnecting service allows auto-reconnection to + # some peers: + self.reconnecting_service = None + + def set_alternate_location(self, location_string: str) -> None: + self.alternate_location = location_string + + def update_status(self, destn_status: int) -> None: + """ Wrapping state updates to enforce: + (a) that the handshake is triggered by connection + outwards, and (b) to ensure no illegal state transitions. + """ + assert destn_status in range(4) + ignored_updates = [] + if self._status == PEER_STATUS_UNCONNECTED: + allowed_updates = [PEER_STATUS_CONNECTED] + elif self._status == PEER_STATUS_CONNECTED: + # updates from connected->connected are harmless + allowed_updates = [PEER_STATUS_CONNECTED, + PEER_STATUS_DISCONNECTED, + PEER_STATUS_HANDSHAKED] + elif self._status == PEER_STATUS_HANDSHAKED: + allowed_updates = [PEER_STATUS_DISCONNECTED] + ignored_updates = [PEER_STATUS_CONNECTED] + elif self._status == PEER_STATUS_DISCONNECTED: + allowed_updates = [PEER_STATUS_CONNECTED] + ignored_updates = [PEER_STATUS_DISCONNECTED] + if destn_status in ignored_updates: + log.debug("Attempt to update status of peer from {} " + "to {} ignored.".format(self._status, destn_status)) + return + assert destn_status in allowed_updates, ("couldn't update state " + "from {} to {}".format(self._status, destn_status)) + self._status = destn_status + # the handshakes are always initiated by a client: + if destn_status == PEER_STATUS_CONNECTED: + log.info("We, {}, are calling the handshake callback as client.".format( + self.messagechannel.self_as_peer.peer_location())) + self.handshake_callback(self) + + def status(self) -> int: + """ Simple getter function for the wrapped _status: + """ + return self._status + + def set_nick(self, nick: str) -> None: + self.nick = nick + + def get_nick_peerlocation_ser(self) -> str: + if not self.nick: + raise OnionPeerError("Cannot serialize " + "identifier string without nick.") + return self.nick + NICK_PEERLOCATOR_SEPARATOR + \ + self.peer_location() + + @classmethod + def from_location_string(cls, mc: 'OnionMessageChannel', + location: str, + socks5_host: str, + socks5_port: int, + directory: bool=False, + handshake_callback: Callable=None) -> 'OnionPeer': + """ Allows construction of an OnionPeer from the + connection information given by the network interface. + TODO: special handling for inbound is needed. + """ + try: + host, port = location.split(":") + portint = int(port) + except: + raise InvalidLocationStringError(location) + return cls(mc, socks5_host, socks5_port, + (host, portint), directory=directory, + handshake_callback=handshake_callback) + + def set_location(self, location_string: str) -> bool: + """ Allows setting location from an unchecked + input string argument. + If the location is specified as the 'no serving' case, + we put the currently existing inbound connection as the alternate + location, and the NOT_SERVING const as the 'location', returning True. + If the string does not have the required format, will return False, + otherwise self.hostname, self.port are + updated for future `peer_location` calls, and True is returned. + """ + if location_string == NOT_SERVING_ONION_HOSTNAME: + self.set_alternate_location(location_tuple_to_str( + (self.hostname, self.port))) + self.hostname = NOT_SERVING_ONION_HOSTNAME + self.port = -1 + return True + try: + host, port = location_string.split(":") + portint = int(port) + assert portint > 0 + except Exception as e: + log.debug("Failed to update host and port of this peer, " + "error: {}".format(repr(e))) + return False + self.hostname = host + self.port = portint + return True + + def peer_location(self) -> str: + if self.hostname == NOT_SERVING_ONION_HOSTNAME: + # special case for non-reachable peers, which can include + # self_as_peer: we just return this string constant + return NOT_SERVING_ONION_HOSTNAME + # in every other case we need a sensible port/host combo: + assert (self.port > 0 and self.hostname) + return location_tuple_to_str((self.hostname, self.port)) + + def send(self, message: OnionCustomMessage) -> bool: + """ If the message can be sent on either an inbound or + outbound connection, True is returned, else False. + """ + if not self.factory: + # we try to send via the overall message channel serving + # protocol, i.e. we assume the connection was made inbound: + return self.messagechannel.proto_factory.send(message, + self.alternate_location) + return self.factory.send(message) + + def receive_message(self, message: OnionCustomMessage) -> None: + self.messagechannel.receive_msg(message, self.peer_location()) + + def notify_message_unsendable(self): + """ Triggered by a failure to send a message on the network, + by the encapsulated ClientFactory. Just used to notify calling + code; no action is triggered. + """ + name = "directory" if self.directory else "peer" + log.warn("Failure to send message to {}: {}.".format( + name, self.peer_location())) + + def connect(self) -> None: + """ This method is called to connect, over Tor, to the remote + peer at the given onion host/port. + """ + if self._status in [PEER_STATUS_HANDSHAKED, PEER_STATUS_CONNECTED]: + return + if not (self.hostname and self.port > 0): + raise OnionPeerConnectionError( + "Cannot connect without host, port info") + + self.factory = OnionClientFactory(self.receive_message, + self.register_connection, self.register_disconnection, + self.notify_message_unsendable, self.directory, self.messagechannel) + if testing_mode: + log.debug("{} is making a tcp connection to {}, {}, {},".format( + self.messagechannel.self_as_peer.peer_location(), self.hostname, + self.port, self.factory)) + self.tcp_connector = reactor.connectTCP(self.hostname, self.port, + self.factory) + else: + # non-default timeout; needs to be much lower than our + # 'wait at least a minute for the IRC connections to come up', + # which is used for *all* message channels, together. + torEndpoint = TCP4ClientEndpoint(reactor, self.socks5_host, + self.socks5_port, + timeout=CONNECT_TO_ONION_TIMEOUT) + onionEndpoint = TorSocksEndpoint(torEndpoint, self.hostname, + self.port) + self.reconnecting_service = ClientService(onionEndpoint, self.factory) + # if we want to actually do something about an unreachable host, + # we have to force t.a.i.ClientService to give up after the timeout: + d = self.reconnecting_service.whenConnected(failAfterFailures=1) + d.addErrback(self.respond_to_connection_failure) + self.reconnecting_service.startService() + + def respond_to_connection_failure(self, failure): + # the error should be of this type specifically, if the onion + # is down, or was configured wrong: + failure.trap(HostUnreachableError) + # if this is a non-dir reachable peer, we just record + # the failure and explicitly give up: + if not self.directory: + log.info("We failed to connect to peer {}; giving up".format( + self.peer_location())) + self.reconnecting_service.stopService() + else: + # in this case, the still-running ClientService will + # just keep trying: + log.warn("We failed to connect to directory {}; trying " + "again.".format(self.peer_location())) + + def register_connection(self) -> None: + self.messagechannel.register_connection(self.peer_location(), + direction=1) + + def register_disconnection(self) -> None: + self.messagechannel.register_disconnection(self.peer_location()) + + def try_to_connect(self) -> None: + """ This method wraps OnionPeer.connect and accepts + any error if that fails. + """ + try: + self.connect() + except OnionPeerConnectionError as e: + # Note that this will happen naturally for non-serving peers. + # TODO remove message or change it. + log.debug("Tried to connect but failed: {}".format(repr(e))) + except Exception as e: + log.warn("Got unexpected exception in connect attempt: {}".format( + repr(e))) + + def disconnect(self) -> None: + if self._status in [PEER_STATUS_UNCONNECTED, PEER_STATUS_DISCONNECTED]: + return + if not (self.hostname and self.port > 0): + raise OnionPeerConnectionError( + "Cannot disconnect without host, port info") + if self.factory: + d = self.reconnecting_service.stopService() + d.addCallback(self.complete_disconnection) + else: + self.messagechannel.proto_factory.disconnect_inbound_peer( + self.alternate_location) + + def complete_disconnection(self, r) -> None: + log.debug("Disconnected from peer: {}".format(self.peer_location())) + self.update_status(PEER_STATUS_DISCONNECTED) + self.factory = None + +class OnionDirectoryPeer(OnionPeer): + delay = 4.0 + def try_to_connect(self) -> None: + # Delay deliberately expands out to very + # long times as yg-s tend to be very long + # running bots: + self.delay *= 1.5 + if self.delay > 10000: + log.warn("Cannot connect to directory node peer: {} " + "after 20 attempts, giving up.".format(self.peer_location())) + return + try: + self.connect() + except OnionPeerConnectionError: + reactor.callLater(self.delay, self.try_to_connect) + + +class OnionMessageChannel(MessageChannel): + """ Sends messages to other nodes of the same type over Tor + via SOCKS5. + *Optionally*: Receives messages via a Torv3 hidden/onion service. + If no onion service, it means we only have connections outbound + to other onion services (directory nodes first, others if and + when they send us a privmsg.). + Uses one or more configured "directory nodes" (which could be us) + to access a list of current active nodes, and updates + dynamically from messages seen. + """ + + def __init__(self, + configdata, + daemon=None): + MessageChannel.__init__(self, daemon=daemon) + # hostid is a feature to avoid replay attacks across message channels; + # TODO investigate, but for now, treat onion-based as one "server". + self.hostid = "onion-network" + self.btc_network = configdata["btcnet"] + # receives notification that we are shutting down + self.give_up = False + # for backwards compat: make sure MessageChannel log can refer to + # this in dynamic switch message: + self.serverport = self.hostid + self.tor_control_host = configdata["tor_control_host"] + self.tor_control_port = configdata["tor_control_port"] + self.onion_serving_host=configdata["onion_serving_host"] + self.onion_serving = configdata["serving"] + if self.onion_serving: + self.onion_serving_port = configdata["onion_serving_port"] + self.hidden_service_dir = configdata["hidden_service_dir"] + # client side config: + self.socks5_host = configdata["socks5_host"] + self.socks5_port = configdata["socks5_port"] + # we use the setting in the config sent over from + # the client, to decide whether to set up our connections + # over localhost (if testing), without Tor: + set_testing_mode(configdata) + # keep track of peers. the list will be instances + # of OnionPeer: + self.peers = set() + for dn in [x.strip() for x in configdata["directory_nodes"].split(",")]: + # note we don't use a nick for directories: + try: + self.peers.add(OnionDirectoryPeer.from_location_string( + self, dn, self.socks5_host, self.socks5_port, + directory=True, handshake_callback=self.handshake_as_client)) + except InvalidLocationStringError as e: + log.error("Failed to load directory nodes: {}".format(repr(e))) + stop_reactor() + return + # we can direct messages via the protocol factory, which + # will index protocol connections by peer location: + self.proto_factory = OnionLineProtocolFactory(self) + if self.onion_serving: + if testing_mode: + # we serve over TCP: + self.testing_serverconn = reactor.listenTCP(self.onion_serving_port, + self.proto_factory, interface="localhost") + self.onion_hostname = "127.0.0.1" + else: + self.hs = JMHiddenService(self.proto_factory, + self.info_callback, + self.setup_error_callback, + self.onion_hostname_callback, + self.tor_control_host, + self.tor_control_port, + self.onion_serving_host, + self.onion_serving_port, + shutdown_callback=self.shutdown_callback, + hidden_service_dir=self.hidden_service_dir) + # this call will start bringing up the HS; when it's finished, + # it will fire the `onion_hostname_callback`, or if it fails, + # it'll fire the `setup_error_callback`. + self.hs.start_tor() + + # This will serve as our unique identifier, indicating + # that we are ready to communicate (in both directions) over Tor. + self.onion_hostname = None + else: + # dummy 'hostname' to indicate we can start running immediately: + self.onion_hostname = NOT_SERVING_ONION_HOSTNAME + + # intended to represent the special case of 'we are the + # only directory node known', however for now dns don't interact + # so this has no role. TODO probably remove it. + self.genesis_node = False + + # waiting loop for all directories to have + # connected (note we could use a deferred but + # the rpc connection calls are not using twisted) + self.wait_for_directories_loop = None + + def info_callback(self, msg: str) -> None: + log.info(msg) + + def setup_error_callback(self, msg: str) -> None: + log.error(msg) + + def shutdown_callback(self, msg: str) -> None: + log.info("in shutdown callback: {}".format(msg)) + + def onion_hostname_callback(self, hostname: str) -> None: + """ This entrypoint marks the start of the OnionMessageChannel + running, since we need this unique identifier as our name + before we can start working (we need to compare it with the + configured directory nodes). + """ + log.info("setting onion hostname to : {}".format(hostname)) + self.onion_hostname = hostname + +# ABC implementation section + def run(self) -> None: + self.hs_up_loop = task.LoopingCall(self.check_onion_hostname) + self.hs_up_loop.start(0.5) + + def shutdown(self) -> None: + self.give_up = True + for p in self.peers: + if p.reconnecting_service: + p.reconnecting_service.stopService() + + def get_pubmsg(self, msg:str, source_nick:str ="") -> str: + """ Converts a message into the known format for + pubmsgs; if we are not sending this (because we + are a directory, forwarding it), `source_nick` must be set. + Note that pubmsg does NOT prefix the *message* with COMMAND_PREFIX. + """ + nick = source_nick if source_nick else self.nick + return nick + COMMAND_PREFIX + "PUBLIC" + msg + + def get_privmsg(self, nick: str, cmd: str, message: str, + source_nick=None) -> str: + """ See `get_pubmsg` for comment on `source_nick`. + """ + from_nick = source_nick if source_nick else self.nick + return from_nick + COMMAND_PREFIX + nick + COMMAND_PREFIX + \ + cmd + " " + message + + def _pubmsg(self, msg:str) -> None: + """ Best effort broadcast of message `msg`: + send the message to every known directory node, + with the PUBLIC message type and nick. + """ + dps = self.get_directory_peers() + msg = OnionCustomMessage(self.get_pubmsg(msg), + JM_MESSAGE_TYPES["pubmsg"]) + for dp in dps: + # currently a directory node can send its own + # pubmsgs (act as maker or taker); this will + # probably be removed but is useful in testing: + if dp == self.self_as_peer: + self.receive_msg(msg, "00") + else: + self._send(dp, msg) + + def _privmsg(self, nick: str, cmd: str, msg:str) -> None: + # in certain test scenarios the directory may try to transfer + # commitments to itself: + if nick == self.nick: + log.debug("Not sending message to ourselves: {}, {}, {}".format( + nick, cmd, msg)) + return + encoded_privmsg = OnionCustomMessage(self.get_privmsg(nick, cmd, msg), + JM_MESSAGE_TYPES["privmsg"]) + peer = self.get_peer_by_nick(nick) + if not peer or peer.status() != PEER_STATUS_HANDSHAKED: + # If we are trying to message a peer via their nick, we + # may not yet have a connection; then we just + # forward via directory nodes. + log.debug("Privmsg peer: {} but don't have peerid; " + "sending via directory.".format(nick)) + try: + # TODO change this to redundant or switching + peer = self.get_connected_directory_peers()[0] + except Exception as e: + log.warn("Failed to send privmsg because no " + "directory peer is connected. Error: {}".format(repr(e))) + return + self._send(peer, encoded_privmsg) + + def _announce_orders(self, offerlist: list) -> None: + for offer in offerlist: + self._pubmsg(offer) + +# End ABC implementation section + + def check_onion_hostname(self) -> None: + if not self.onion_hostname: + return + self.hs_up_loop.stop() + # now our hidden service is up, we must check our peer status + # then set up directories. + self.get_our_peer_info() + # at this point the only peers added are directory + # nodes from config; we try to connect to all. + # We will get other peers to add to our list once they + # start sending us messages. + reactor.callLater(0.0, self.connect_to_directories) + + def get_my_location_tuple(self) -> Tuple[str, int]: + if self.onion_hostname == NOT_SERVING_ONION_HOSTNAME: + return (self.onion_hostname, -1) + elif testing_mode: + return (self.onion_hostname, self.onion_serving_port) + else: + return (self.onion_hostname, 80) + + def get_our_peer_info(self) -> None: + """ Create a special OnionPeer object, + outside of our peerlist, to refer to ourselves. + """ + dps = self.get_directory_peers() + self_dir = False + # only for publicly exposed onion does the 'virtual port' exist; + # for local tests we always connect to an actual machine port: + my_location_tuple = self.get_my_location_tuple() + my_location_str = location_tuple_to_str(my_location_tuple) + if [my_location_str] == [d.peer_location() for d in dps]: + log.info("This is the genesis node: {}".format(self.onion_hostname)) + self.genesis_node = True + self_dir = True + elif my_location_str in dps: + # Here we are just one of many directory nodes, + # which should be fine, we should just be careful + # to not query ourselves. + self_dir = True + self.self_as_peer = OnionPeer(self, self.socks5_host, self.socks5_port, + my_location_tuple, + self_dir, nick=self.nick, + handshake_callback=None) + + def connect_to_directories(self) -> None: + if self.genesis_node: + # we are a directory and we have no directory peers; + # just start. + self.on_welcome(self) + return + # the remaining code is only executed by non-directories: + for p in self.peers: + log.info("Trying to connect to node: {}".format(p.peer_location())) + try: + p.connect() + except OnionPeerConnectionError: + pass + # do not trigger on_welcome event until all directories + # configured are ready: + self.on_welcome_sent = False + self.directory_wait_counter = 0 + self.wait_for_directories_loop = task.LoopingCall( + self.wait_for_directories) + self.wait_for_directories_loop.start(2.0) + + def handshake_as_client(self, peer: OnionPeer) -> None: + assert peer.status() == PEER_STATUS_CONNECTED + if self.self_as_peer.directory: + log.debug("Not sending client handshake to {} because we " + "are directory.".format(peer.peer_location())) + return + our_hs = copy.deepcopy(client_handshake_json) + our_hs["location-string"] = self.self_as_peer.peer_location() + our_hs["nick"] = self.nick + our_hs["network"] = self.btc_network + our_hs_json = json.dumps(our_hs) + log.info("Sending this handshake: {} to peer {}".format( + our_hs_json, peer.peer_location())) + self._send(peer, OnionCustomMessage(our_hs_json, + CONTROL_MESSAGE_TYPES["handshake"])) + + def handshake_as_directory(self, peer: OnionPeer, our_hs: dict) -> None: + assert peer.status() == PEER_STATUS_CONNECTED + our_hs["network"] = self.btc_network + our_hs_json = json.dumps(our_hs) + log.info("Sending this handshake as directory: {}".format( + our_hs_json)) + self._send(peer, OnionCustomMessage(our_hs_json, + CONTROL_MESSAGE_TYPES["dn-handshake"])) + + def get_directory_peers(self) -> list: + return [p for p in self.peers if p.directory is True] + + def get_peer_by_nick(self, nick:str) -> Union[OnionPeer, None]: + for p in self.get_all_connected_peers(): + if p.nick == nick: + return p + + def _send(self, peer: OnionPeer, message: OnionCustomMessage) -> bool: + try: + return peer.send(message) + except Exception as e: + # This can happen when a peer disconnects, depending + # on the timing: + log.warn("Failed to send message to: {}, error: {}".format( + peer.peer_location(), repr(e))) + return False + + def receive_msg(self, message: OnionCustomMessage, peer_location: str) -> None: + """ Messages from peers and also connection related control + messages. These messages either come via OnionPeer or via + the main OnionLineProtocolFactory instance that handles all + inbound connections. + """ + if self.self_as_peer.directory: + # TODO remove, useful while testing + log.debug("received message as directory: {}".format(message.encode())) + peer = self.get_peer_by_id(peer_location) + if not peer: + log.warn("Received message but could not find peer: {}".format(peer_location)) + return + msgtype = message.msgtype + msgval = message.text + if msgtype in LOCAL_CONTROL_MESSAGE_TYPES.values(): + self.process_control_message(peer_location, msgtype, msgval) + # local control messages are processed first. + # TODO this is a historical artifact, we can simplify. + return + + if self.process_control_message(peer_location, msgtype, msgval): + # will return True if it is, elsewise, a control message. + return + + # ignore non-JM messages: + if msgtype not in JM_MESSAGE_TYPES.values(): + log.debug("Invalid message type, ignoring: {}".format(msgtype)) + return + + # real JM message; should be: from_nick, to_nick, cmd, message + try: + nicks_msgs = msgval.split(COMMAND_PREFIX) + from_nick, to_nick = nicks_msgs[:2] + msg = COMMAND_PREFIX + COMMAND_PREFIX.join(nicks_msgs[2:]) + if to_nick == "PUBLIC": + self.on_pubmsg(from_nick, msg) + if self.self_as_peer.directory: + self.forward_pubmsg_to_peers(msg, from_nick) + elif to_nick != self.nick: + if not self.self_as_peer.directory: + log.debug("Ignoring message, not for us: {}".format(msg)) + else: + self.forward_privmsg_to_peer(to_nick, msg, from_nick) + else: + self.on_privmsg(from_nick, msg) + except Exception as e: + log.debug("Invalid Joinmarket message: {}, error was: {}".format( + msgval, repr(e))) + + def forward_pubmsg_to_peers(self, msg: str, from_nick: str) -> None: + """ Used by directory nodes currently. Takes a received + message that was PUBLIC and broadcasts it to the non-directory + peers. + """ + assert self.self_as_peer.directory + pubmsg = self.get_pubmsg(msg, source_nick=from_nick) + msgtype = JM_MESSAGE_TYPES["pubmsg"] + # NOTE!: Specifically with forwarding/broadcasting, + # we introduce the danger of infinite re-broadcast, + # if there is more than one party forwarding. + # For now we are having directory nodes not talk to + # each other (i.e. they are *required* to only configure + # themselves, not other dns). But this could happen by + # accident. + encoded_msg = OnionCustomMessage(pubmsg, msgtype) + for peer in self.get_connected_nondirectory_peers(): + # don't loop back to the sender: + if peer.nick == from_nick: + continue + log.debug("Sending {}:{} to nondir peer {}".format( + msgtype, pubmsg, peer.peer_location())) + self._send(peer, encoded_msg) + + def forward_privmsg_to_peer(self, nick: str, message: str, + from_nick: str) -> None: + assert self.self_as_peer.directory + peer = self.get_peer_by_nick(nick) + if not peer: + log.debug("We were asked to send a message from {} to {}, " + "but {} is not connected.".format(from_nick, nick, nick)) + return + # The `message` passed in has format COMMAND_PREFIX||command||" "||msg + # we need to parse out cmd, message for sending. + # second argument for split means only one split allowed. + cmdsmsgs = message.split(COMMAND_PREFIX, 1)[1] + cmdmsglist = cmdsmsgs.split(" ") + cmd = cmdmsglist[0] + msg = " ".join(cmdmsglist[1:]) + privmsg = self.get_privmsg(nick, cmd, msg, source_nick=from_nick) + encoded_msg = OnionCustomMessage(privmsg, + JM_MESSAGE_TYPES["privmsg"]) + self._send(peer, encoded_msg) + # If possible, we forward the from-nick's network location + # to the to-nick peer, so they can just talk directly next time. + peer_from = self.get_peer_by_nick(from_nick) + if not peer_from: + return + self.send_peers(peer, peer_filter=[peer_from]) + + def process_control_message(self, peerid: str, msgtype: int, + msgval: str) -> bool: + """ Triggered by a directory node feeding us + peers, or by a connect/disconnect hook; this is our housekeeping + to try to create, and keep track of, useful connections. + The returned boolean indicates whether we succeeded in processing + the message or whether it must be analyzed again (note e.g. that + we return True for a rejected message!) + """ + all_ctrl = list(LOCAL_CONTROL_MESSAGE_TYPES.values( + )) + list(CONTROL_MESSAGE_TYPES.values()) + if msgtype not in all_ctrl: + return False + # this is too noisy, but TODO, investigate allowing + # some kind of control message monitoring e.g. default-off + # log-to-file (we don't currently have a 'TRACE' level debug). + #log.debug("received control message: {},{}".format(msgtype, msgval)) + if msgtype == CONTROL_MESSAGE_TYPES["peerlist"]: + # This is the base method of seeding connections; + # a directory node can send this any time. + # These messages can only be accepted from directory peers + # (which we have configured ourselves): + peer = self.get_peer_by_id(peerid) + if not peer or not peer.directory: + return True + try: + peerlist = msgval.split(",") + for peer in peerlist: + # defaults mean we just add the peer, not + # add or alter its connection status: + self.add_peer(peer, with_nick=True) + except Exception as e: + log.debug("Incorrectly formatted peer list: {}, " + "ignoring, {}".format(msgval, e)) + # returning True whether raised or not - see docstring + return True + elif msgtype == CONTROL_MESSAGE_TYPES["getpeerlist"]: + log.warn("getpeerlist request received, currently not supported.") + return True + elif msgtype == CONTROL_MESSAGE_TYPES["handshake"]: + # sent by non-directory peers on startup, also to + # other non-dn peers during tx flow + self.process_handshake(peerid, msgval) + return True + elif msgtype == CONTROL_MESSAGE_TYPES["dn-handshake"]: + self.process_handshake(peerid, msgval, dn=True) + return True + elif msgtype == LOCAL_CONTROL_MESSAGE_TYPES["connect"]: + self.add_peer(msgval, connection=True, + overwrite_connection=True) + elif msgtype == LOCAL_CONTROL_MESSAGE_TYPES["connect-in"]: + self.add_peer(msgval, connection=True, + overwrite_connection=True) + elif msgtype == LOCAL_CONTROL_MESSAGE_TYPES["disconnect"]: + log.debug("We got a disconnect event: {}".format(msgval)) + if msgval in [x.peer_location() for x in self.get_connected_directory_peers()]: + # we need to use the full peer locator string, so that + # add_peer knows it can try to reconnect: + msgval = self.get_peer_by_id(msgval).peer_location() + self.add_peer(msgval, connection=False, + overwrite_connection=True) + # bubble up the disconnection event to the abstract + # message channel logic: + if self.on_nick_leave: + p = self.get_peer_by_id(msgval) + if p and p.nick: + reactor.callLater(0.0, self.on_nick_leave, p.nick, self) + else: + assert False + # If we got here it is *not* a non-local control message; + # so we must process it as a Joinmarket message. + return False + + + def process_handshake(self, peerid: str, message: str, + dn: bool=False) -> None: + peer = self.get_peer_by_id(peerid) + if not peer: + # rando sent us a handshake? + log.warn("Unexpected handshake from unknown peer: {}, " + "ignoring.".format(peerid)) + return + assert isinstance(peer, OnionPeer) + if not peer.status() == PEER_STATUS_CONNECTED: + # we were not waiting for it: + log.warn("Unexpected handshake from peer: {}, " + "ignoring. Peer's current status is: {}".format( + peerid, peer.status())) + return + if dn: + # it means, we are a non-dn and we are expecting + # a returned `dn-handshake` message: + # (currently dns don't talk to other dns): + assert not self.self_as_peer.directory + if not peer.directory: + # got dn-handshake from non-dn: + log.warn("Unexpected dn-handshake from non-dn " + "node: {}, ignoring.".format(peerid)) + return + # we got the right message from the right peer; + # check it is formatted correctly and represents + # acceptance of the connection + try: + handshake_json = json.loads(message) + app_name = handshake_json["app-name"] + is_directory = handshake_json["directory"] + proto_min = handshake_json["proto-ver-min"] + proto_max = handshake_json["proto-ver-max"] + features = handshake_json["features"] + accepted = handshake_json["accepted"] + nick = handshake_json["nick"] + net = handshake_json["network"] + assert isinstance(proto_max, int) + assert isinstance(proto_min, int) + assert isinstance(features, dict) + assert isinstance(nick, str) + assert isinstance(net, str) + except Exception as e: + log.warn("Invalid handshake message from: {}," + " exception: {}, message: {},ignoring".format( + peerid, repr(e), message)) + return + # currently we are not using any features, but the intention + # is forwards compatibility, so we don't check its contents + # at all. + if not accepted: + log.warn("Directory: {} rejected our handshake.".format(peerid)) + # explicitly choose to disconnect (if other side already did, + # this is no-op). + peer.disconnect() + return + if not (app_name == JM_APP_NAME and is_directory and JM_VERSION \ + <= proto_max and JM_VERSION >= proto_min and accepted): + log.warn("Handshake from directory is incompatible or " + "rejected: {}".format(handshake_json)) + peer.disconnect() + return + if not net == self.btc_network: + log.warn("Handshake from directory is on an incompatible " + "network: {}".format(net)) + return + # We received a valid, accepting dn-handshake. Update the peer. + peer.update_status(PEER_STATUS_HANDSHAKED) + peer.set_nick(nick) + else: + # it means, we are receiving an initial handshake + # message from a 'client' (non-dn) peer. + # dns don't talk to each other: + assert not peer.directory + accepted = True + try: + handshake_json = json.loads(message) + app_name = handshake_json["app-name"] + is_directory = handshake_json["directory"] + proto_ver = handshake_json["proto-ver"] + features = handshake_json["features"] + full_location_string = handshake_json["location-string"] + nick = handshake_json["nick"] + net = handshake_json["network"] + assert isinstance(proto_ver, int) + assert isinstance(features, dict) + assert isinstance(nick, str) + assert isinstance(net, str) + except Exception as e: + log.warn("(not dn) Invalid handshake message from: {}, " + "exception: {}, message: {}, ignoring".format( + peerid, repr(e), message)) + # just ignore, since a syntax failure could lead to a crash + return + if not (app_name == JM_APP_NAME and proto_ver == JM_VERSION \ + and not is_directory): + log.warn("Invalid handshake name/version data: {}, from peer: " + "{}, rejecting.".format(message, peerid)) + accepted = False + if not net == self.btc_network: + log.warn("Handshake from peer is on an incompatible " + "network: {}".format(net)) + accepted = False + # If accepted, we should update the peer to have the full + # location which in general will not yet be present, so as to + # allow publishing their location via `getpeerlist`. Note + # that if the peer declares itself as not serving, we do + # nothing here: + if not peer.set_location(full_location_string): + accepted = False + if peerid != full_location_string: + peer.set_alternate_location(peerid) + peer.set_nick(nick) + # client peer's handshake message was valid; send ours, and + # then mark this peer as successfully handshaked: + our_hs = copy.deepcopy(server_handshake_json) + our_hs["nick"] = self.nick + our_hs["accepted"] = accepted + if self.self_as_peer.directory: + self.handshake_as_directory(peer, our_hs) + if accepted: + peer.update_status(PEER_STATUS_HANDSHAKED) + + def get_peer_by_id(self, p: str) -> Union[OnionPeer, bool]: + """ Returns the OnionPeer with peer location p, + if it is in self.peers, otherwise returns False. + """ + if p == "00": + return self.self_as_peer + for x in self.peers: + if x.peer_location() == p and p != NOT_SERVING_ONION_HOSTNAME: + return x + # non-reachable peers can only match on their inbound + # connection port + if x.alternate_location == p: + return x + return False + + def register_connection(self, peer_location: str, direction: int) -> None: + """ We send ourselves a local control message indicating + the new connection. + If the connection is inbound, direction == 0, else 1. + """ + assert direction in range(2) + if direction == 1: + msgtype = LOCAL_CONTROL_MESSAGE_TYPES["connect"] + else: + msgtype = LOCAL_CONTROL_MESSAGE_TYPES["connect-in"] + msg = OnionCustomMessage(peer_location, msgtype) + self.receive_msg(msg, "00") + + def register_disconnection(self, peer_location: str) -> None: + """ We send ourselves a local control message indicating + the disconnection. + """ + msg = OnionCustomMessage(peer_location, + LOCAL_CONTROL_MESSAGE_TYPES["disconnect"]) + self.receive_msg(msg, "00") + + def add_peer(self, peerdata: str, connection: bool=False, + overwrite_connection: bool=False, with_nick=False) -> Union[OnionPeer, None]: + """ add non-directory peer from (nick, peer) serialization `peerdata`, + where "peer" is host:port; + return the created OnionPeer object. Or, with_nick=False means + that `peerdata` has only the peer location. + If the peer is already in our peerlist it can be updated in + one of these ways: + * the nick can be added + * it can be marked as 'connected' if it was previously unconnected, + with this conditional on whether the flag `overwrite_connection` is + set. Note that this peer removal, unlike the peer addition above, + can also occur for directory nodes, if we lose connection (and then + we persistently try to reconnect; see OnionDirectoryPeer). + """ + if with_nick: + try: + nick, peer = peerdata.split(NICK_PEERLOCATOR_SEPARATOR) + except Exception as e: + # TODO: as of now, this is not an error, but expected. + # Don't log? Do something else? + log.debug("Received invalid peer identifier string: {}, {}".format( + peerdata, e)) + return + else: + peer = peerdata + + # assumed that it's passing a full string + try: + temp_p = OnionPeer.from_location_string(self, peer, + self.socks5_host, self.socks5_port, + handshake_callback=self.handshake_as_client) + except Exception as e: + # There are currently a few ways the location + # parsing and Peer object construction can fail; + # TODO specify exception types. + log.warn("Failed to add peer: {}, exception: {}".format(peer, repr(e))) + return + if not self.get_peer_by_id(temp_p.peer_location()): + self.peers.add(temp_p) + if connection: + log.info("Updating status of peer: {} to connected.".format(temp_p.peer_location())) + temp_p.update_status(PEER_STATUS_CONNECTED) + else: + if overwrite_connection: + temp_p.update_status(PEER_STATUS_DISCONNECTED) + if with_nick: + temp_p.set_nick(nick) + if not connection: + # Here, we are not currently connected. We + # try to connect asynchronously. We don't pay attention + # to any return. This attempt is one-shot and opportunistic, + # for non-dns, but will retry with exp-backoff for dns. + # Notice this is only possible for non-dns to other non-dns, + # since dns will never reach this point without an active + # connection. + reactor.callLater(0.0, temp_p.try_to_connect) + return temp_p + else: + p = self.get_peer_by_id(temp_p.peer_location()) + if overwrite_connection: + if connection: + log.info("Updating status to connected for peer {}.".format(temp_p.peer_location())) + p.update_status(PEER_STATUS_CONNECTED) + else: + p.update_status(PEER_STATUS_DISCONNECTED) + if with_nick: + p.set_nick(nick) + return p + + def get_all_connected_peers(self) -> list: + return self.get_connected_directory_peers() + \ + self.get_connected_nondirectory_peers() + + def get_connected_directory_peers(self) -> list: + return [p for p in self.peers if p.directory and p.status() == \ + PEER_STATUS_HANDSHAKED] + + def get_connected_nondirectory_peers(self) -> list: + return [p for p in self.peers if (not p.directory) and p.status() == \ + PEER_STATUS_HANDSHAKED] + + def wait_for_directories(self) -> None: + # Notice this is checking for *handshaked* dps; + # the handshake will have been initiated once a + # connection was seen. + # Note also that this is *only* called on startup, + # so we are guaranteed to have only directory peers. + if len(self.get_connected_directory_peers()) < len(self.peers): + self.directory_wait_counter += 1 + # < 2*11 = 22 seconds; compare with CONNECT_TO_ONION_TIMEOUT; + # with current vals, we get to try twice before entirely + # giving up. + if self.directory_wait_counter < 11: + return + if len(self.get_connected_directory_peers()) == 0: + # at least one handshake must have succeeded, for us + # to continue. + log.error("We failed to connect and handshake with " + "ANY directories; onion messaging is not functioning.") + self.wait_for_directories_loop.stop() + return + # This is what triggers the start of taker/maker workflows. + # Note that even if the preceding (max) 50 seconds failed to + # connect all our configured dps, we will keep trying and they + # can still be used. + if not self.on_welcome_sent: + self.on_welcome(self) + self.on_welcome_sent = True + self.wait_for_directories_loop.stop() + + """ CONTROL MESSAGES SENT BY US + """ + def send_peers(self, requesting_peer: OnionPeer, + peer_filter: List[OnionPeer]) -> None: + """ This message is sent by directory peers, currently + only when a privmsg has to be forwarded to them. It + could also be sent by directories to non-directory peers + according to some other algorithm. + If peer_filter is specified, only those peers will be sent. + The peerlist message should have this format: + (1) entries comma separated + (2) each entry is serialized nick then the NICK_PEERLOCATOR_SEPARATOR + then host:port + (3) Peers that do not have a reachable location are not sent. + """ + if not requesting_peer.status() == PEER_STATUS_HANDSHAKED: + raise OnionPeerConnectionError( + "Cannot send peer list to unhandshaked peer") + peerlist = set() + peer_filter_exists = len(peer_filter) > 0 + for p in self.get_connected_nondirectory_peers(): + # don't send a peer to itself + if p == requesting_peer: + continue + if peer_filter_exists and p not in peer_filter: + continue + if p.status() != PEER_STATUS_HANDSHAKED: + # don't advertise what is not online. + continue + # peers that haven't sent their nick yet are not + # privmsg-reachable; don't send them + if p.nick == "": + continue + if p.peer_location() == NOT_SERVING_ONION_HOSTNAME: + # if a connection has no reachable destination, + # don't forward it + continue + peerlist.add(p.get_nick_peerlocation_ser()) + # For testing: dns won't usually participate: + peerlist.add(self.self_as_peer.get_nick_peerlocation_ser()) + # don't send an empty set (will not be possible unless + # above dn add is removed). + if len(peerlist) == 0: + return + self._send(requesting_peer, OnionCustomMessage(",".join( + peerlist), CONTROL_MESSAGE_TYPES["peerlist"])) diff --git a/jmdaemon/test/test_daemon_protocol.py b/jmdaemon/test/test_daemon_protocol.py index 71beba734..f8e9a6d14 100644 --- a/jmdaemon/test/test_daemon_protocol.py +++ b/jmdaemon/test/test_daemon_protocol.py @@ -7,7 +7,7 @@ from jmdaemon.protocol import NICK_HASH_LENGTH, NICK_MAX_ENCODED, JM_VERSION,\ JOINMARKET_NICK_HEADER from jmbase import get_log -from jmclient import (load_test_config, jm_single, get_irc_mchannels) +from jmclient import (load_test_config, jm_single, get_mchannels) from twisted.python.log import msg as tmsg from twisted.python.log import startLogging from twisted.internet import protocol, reactor, task @@ -59,11 +59,11 @@ def connectionMade(self): def clientStart(self): self.sigs_received = 0 - irc = get_irc_mchannels() + chan_configs = [get_mchannels()[0]] d = self.callRemote(JMInit, bcsource="dummyblockchain", network="dummynetwork", - irc_configs=irc, + chan_configs=chan_configs, minmakers=2, maker_timeout_sec=3, dust_threshold=27300, @@ -212,7 +212,7 @@ def on_JM_REQUEST_OFFERS(self): return super().on_JM_REQUEST_OFFERS() @JMInit.responder - def on_JM_INIT(self, bcsource, network, irc_configs, minmakers, + def on_JM_INIT(self, bcsource, network, chan_configs, minmakers, maker_timeout_sec, dust_threshold, blacklist_location): self.maker_timeout_sec = maker_timeout_sec self.dust_threshold = int(dust_threshold) diff --git a/jmdaemon/test/test_irc_messaging.py b/jmdaemon/test/test_irc_messaging.py index 755a20c69..0e9812fd7 100644 --- a/jmdaemon/test/test_irc_messaging.py +++ b/jmdaemon/test/test_irc_messaging.py @@ -6,7 +6,7 @@ from twisted.internet import reactor, task from jmdaemon import IRCMessageChannel, MessageChannelCollection #needed for test framework -from jmclient import (load_test_config, get_irc_mchannels, jm_single) +from jmclient import (load_test_config, get_mchannels, jm_single) si = 1 class DummyDaemon(object): @@ -95,7 +95,7 @@ def junk_fill(mc): def getmc(nick): dm = DummyDaemon() - mc = DummyMC(get_irc_mchannels()[0], nick, dm) + mc = DummyMC(get_mchannels()[0], nick, dm) mc.register_orderbookwatch_callbacks(on_order_seen=on_order_seen) mc.register_taker_callbacks(on_pubkey=on_pubkey) mc.on_connect = on_connect @@ -108,7 +108,7 @@ class TrialIRC(unittest.TestCase): def setUp(self): load_test_config() - print(get_irc_mchannels()[0]) + print(get_mchannels()[0]) jm_single().maker_timeout_sec = 1 dm, mc, mcc = getmc("irc_publisher") dm2, mc2, mcc2 = getmc("irc_receiver") diff --git a/jmdaemon/test/test_orderbookwatch.py b/jmdaemon/test/test_orderbookwatch.py index 39d4de791..17797a635 100644 --- a/jmdaemon/test/test_orderbookwatch.py +++ b/jmdaemon/test/test_orderbookwatch.py @@ -2,7 +2,7 @@ from jmdaemon.orderbookwatch import OrderbookWatch from jmdaemon import IRCMessageChannel, fidelity_bond_cmd_list -from jmclient import get_irc_mchannels, load_test_config +from jmclient import get_mchannels, load_test_config from jmdaemon.protocol import JM_VERSION, ORDER_KEYS from jmbase.support import hextobin from jmclient.fidelity_bond import FidelityBondProof @@ -24,7 +24,7 @@ def on_welcome(x): def get_ob(): load_test_config() dm = DummyDaemon() - mc = DummyMC(get_irc_mchannels()[0], "test", dm) + mc = DummyMC(get_mchannels()[0], "test", dm) ob = OrderbookWatch() ob.on_welcome = on_welcome ob.set_msgchan(mc) diff --git a/scripts/joinmarket-qt.py b/scripts/joinmarket-qt.py index 0110807d1..0e3a21925 100755 --- a/scripts/joinmarket-qt.py +++ b/scripts/joinmarket-qt.py @@ -912,10 +912,10 @@ def startJoin(self): daemon=daemon, gui=True) else: - #This will re-use IRC connections in background (daemon), no restart + #This will re-use message channels in background (daemon), no restart self.clientfactory.getClient().client = self.taker self.clientfactory.getClient().clientStart() - mainWindow.statusBar().showMessage("Connecting to IRC ...") + mainWindow.statusBar().showMessage("Connecting to message channels ...") def takerInfo(self, infotype, infomsg): if infotype == "INFO": diff --git a/scripts/obwatch/ob-watcher.py b/scripts/obwatch/ob-watcher.py index 661eda94d..f94629b26 100755 --- a/scripts/obwatch/ob-watcher.py +++ b/scripts/obwatch/ob-watcher.py @@ -44,8 +44,9 @@ import matplotlib.pyplot as plt from jmclient import jm_single, load_program_config, calc_cj_fee, \ - get_irc_mchannels, add_base_options -from jmdaemon import OrderbookWatch, MessageChannelCollection, IRCMessageChannel + get_mchannels, add_base_options +from jmdaemon import (OrderbookWatch, MessageChannelCollection, + OnionMessageChannel, IRCMessageChannel) #TODO this is only for base58, find a solution for a client without jmbitcoin import jmbitcoin as btc from jmdaemon.protocol import * @@ -737,32 +738,32 @@ def on_welcome(self): def request_orderbook(self): self.msgchan.request_orderbook() -class ObIRCMessageChannel(IRCMessageChannel): - """A customisation of the message channel - to allow receipt of privmsgs without the - verification hooks in client-daemon communication.""" - def on_privmsg(self, nick, message): - if len(message) < 2: - return - - if message[0] != COMMAND_PREFIX: - log.debug('message not a cmd') - return - cmd_string = message[1:].split(' ')[0] - if cmd_string not in offername_list: - log.debug('non-offer ignored') - return - #Ignore sigs (TODO better to include check) - sig = message[1:].split(' ')[-2:] - #reconstruct original message without cmd pref - rawmessage = ' '.join(message[1:].split(' ')[:-2]) - for command in rawmessage.split(COMMAND_PREFIX): - _chunks = command.split(" ") - try: - self.check_for_orders(nick, _chunks) - self.check_for_fidelity_bond(nick, _chunks) - except: - pass + +"""An override for MessageChannel classes, +to allow receipt of privmsgs without the +verification hooks in client-daemon communication.""" +def on_privmsg(inst, nick, message): + if len(message) < 2: + return + + if message[0] != COMMAND_PREFIX: + log.debug('message not a cmd') + return + cmd_string = message[1:].split(' ')[0] + if cmd_string not in offername_list: + log.debug('non-offer ignored') + return + #Ignore sigs (TODO better to include check) + sig = message[1:].split(' ')[-2:] + #reconstruct original message without cmd pref + rawmessage = ' '.join(message[1:].split(' ')[:-2]) + for command in rawmessage.split(COMMAND_PREFIX): + _chunks = command.split(" ") + try: + inst.check_for_orders(nick, _chunks) + inst.check_for_fidelity_bond(nick, _chunks) + except: + pass def get_dummy_nick(): @@ -805,7 +806,16 @@ def main(): load_program_config(config_path=options.datadir) check_and_start_tor() hostport = (options.host, options.port) - mcs = [ObIRCMessageChannel(c) for c in get_irc_mchannels()] + mcs = [] + chan_configs = get_mchannels() + for c in chan_configs: + if "type" in c and c["type"] == "onion": + mcs.append(OnionMessageChannel(c)) + else: + # default is IRC; TODO allow others + mcs.append(IRCMessageChannel(c)) + IRCMessageChannel.on_privmsg = on_privmsg + OnionMessageChannel.on_privmsg = on_privmsg mcc = MessageChannelCollection(mcs) mcc.set_nick(get_dummy_nick()) taker = ObBasic(mcc, hostport) diff --git a/test/e2e-coinjoin-test.py b/test/e2e-coinjoin-test.py new file mode 100644 index 000000000..d5e9818c3 --- /dev/null +++ b/test/e2e-coinjoin-test.py @@ -0,0 +1,340 @@ +#! /usr/bin/env python +'''Creates wallets and yield generators in regtest, + then runs both them and a JMWalletDaemon instance + for the taker, injecting the newly created taker + wallet into it and running sendpayment once. + Number of ygs is configured in the joinmarket.cfg + with `regtest-count` in the `ln-onion` type MESSAGING + section. + See notes below for more detail on config. + Run it like: + pytest \ + --btcroot=/path/to/bitcoin/bin/ \ + --btcpwd=123456abcdef --btcconf=/blah/bitcoin.conf \ + -s test/e2e-coinjoin-test.py + ''' +from twisted.internet import reactor, defer +from twisted.web.client import readBody, Headers +from common import make_wallets +import pytest +import random +import json +from datetime import datetime +from jmbase import (get_nontor_agent, BytesProducer, jmprint, + get_log, stop_reactor) +from jmclient import (YieldGeneratorBasic, load_test_config, jm_single, + JMClientProtocolFactory, start_reactor, SegwitWallet, get_mchannels, + SegwitLegacyWallet, JMWalletDaemon) +from jmclient.wallet_utils import wallet_gettimelockaddress +from jmclient.wallet_rpc import api_version_string + +log = get_log() + +# For quicker testing, restrict the range of timelock +# addresses to avoid slow load of multiple bots. +# Note: no need to revert this change as test runs +# in isolation. +from jmclient import FidelityBondMixin +FidelityBondMixin.TIMELOCK_ERA_YEARS = 2 +FidelityBondMixin.TIMELOCK_EPOCH_YEAR = datetime.now().year +FidelityBondMixin.TIMENUMBERS_PER_PUBKEY = 12 + +wallet_name = "test-onion-yg-runner.jmdat" + +mean_amt = 2.0 + +directory_node_indices = [1] + +def get_onion_messaging_config_regtest(run_num: int, dns=[1], hsd="", mode="TAKER"): + """ Sets a onion messaging channel section for a regtest instance + indexed by `run_num`. The indices to be used as directory nodes + should be passed as `dns`, as a list of ints. + """ + def location_string(directory_node_run_num): + return "127.0.0.1:" + str( + 8080 + directory_node_run_num) + if run_num in dns: + # means *we* are a dn, and dns currently + # do not use other dns: + dns_to_use = [location_string(run_num)] + else: + dns_to_use = [location_string(a) for a in dns] + dn_nodes_list = ",".join(dns_to_use) + log.info("For node: {}, set dn list to: {}".format(run_num, dn_nodes_list)) + cf = {"type": "onion", + "btcnet": "testnet", + "socks5_host": "127.0.0.1", + "socks5_port": 9050, + "tor_control_host": "127.0.0.1", + "tor_control_port": 9051, + "onion_serving_host": "127.0.0.1", + "onion_serving_port": 8080 + run_num, + "hidden_service_dir": "", + "directory_nodes": dn_nodes_list, + "regtest_count": "1, 1"} + if mode == "MAKER": + cf["serving"] = True + else: + cf["serving"] = False + if run_num in dns: + # only directories need to use fixed hidden service directories: + cf["hidden_service_dir"] = hsd + return cf + + +class RegtestJMClientProtocolFactory(JMClientProtocolFactory): + i = 1 + def set_directory_nodes(self, dns): + # a list of integers representing the directory nodes + # for this test: + self.dns = dns + + def get_mchannels(self, mode="TAKER"): + # swaps out any existing onionmc configs + # in the config settings on startup, for one + # that's indexed to the regtest counter var: + default_chans = get_mchannels(mode=mode) + new_chans = [] + onion_found = False + hsd = "" + for c in default_chans: + if "type" in c and c["type"] == "onion": + onion_found = True + if c["hidden_service_dir"] != "": + hsd = c["hidden_service_dir"] + continue + else: + new_chans.append(c) + if onion_found: + new_chans.append(get_onion_messaging_config_regtest( + self.i, self.dns, hsd, mode=mode)) + return new_chans + +class JMWalletDaemonT(JMWalletDaemon): + def check_cookie(self, request): + if self.auth_disabled: + return True + return super().check_cookie(request) + +class TWalletRPCManager(object): + """ Base class for set up of tests of the + Wallet RPC calls using the wallet_rpc.JMWalletDaemon service. + """ + # the port for the jmwallet daemon + dport = 28183 + # the port for the ws + wss_port = 28283 + + def __init__(self): + # a client connnection object which is often but not always + # instantiated: + self.client_connector = None + self.daemon = JMWalletDaemonT(self.dport, self.wss_port, tls=False) + self.daemon.auth_disabled = True + # because we sync and start the wallet service manually here + # (and don't use wallet files yet), we won't have set a wallet name, + # so we set it here: + self.daemon.wallet_name = wallet_name + + def start(self): + r, s = self.daemon.startService() + self.listener_rpc = r + self.listener_ws = s + + def get_route_root(self): + addr = "http://127.0.0.1:" + str(self.dport) + addr += api_version_string + return addr + + def stop(self): + for dc in reactor.getDelayedCalls(): + dc.cancel() + d1 = defer.maybeDeferred(self.listener_ws.stopListening) + d2 = defer.maybeDeferred(self.listener_rpc.stopListening) + if self.client_connector: + self.client_connector.disconnect() + # only fire if everything is finished: + return defer.gatherResults([d1, d2]) + + @defer.inlineCallbacks + def do_request(self, agent, method, addr, body, handler, token=None): + if token: + headers = Headers({"Authorization": ["Bearer " + self.jwt_token]}) + else: + headers = None + response = yield agent.request(method, addr, headers, bodyProducer=body) + yield self.response_handler(response, handler) + + @defer.inlineCallbacks + def response_handler(self, response, handler): + body = yield readBody(response) + # these responses should always be 200 OK. + #assert response.code == 200 + # handlers check the body is as expected; no return. + yield handler(body) + return True + +def test_start_yg_and_taker_setup(setup_onion_ygrunner): + """Set up some wallets, for the ygs and 1 taker. + Then start LN and the ygs in the background, then fire + a startup of a wallet daemon for the taker who then + makes a coinjoin payment. + """ + if jm_single().config.get("POLICY", "native") == "true": + walletclass = SegwitWallet + else: + # TODO add Legacy + walletclass = SegwitLegacyWallet + + start_bot_num, end_bot_num = [int(x) for x in jm_single().config.get( + "MESSAGING:onion", "regtest_count").split(",")] + num_ygs = end_bot_num - start_bot_num + # specify the number of wallets and bots of each type: + wallet_services = make_wallets(num_ygs + 1, + wallet_structures=[[1, 3, 0, 0, 0]] * (num_ygs + 1), + mean_amt=2.0, + walletclass=walletclass) + #the sendpayment bot uses the last wallet in the list + wallet_service = wallet_services[end_bot_num - 1]['wallet'] + jmprint("\n\nTaker wallet seed : " + wallet_services[end_bot_num - 1]['seed']) + # for manual audit if necessary, show the maker's wallet seeds + # also (note this audit should be automated in future, see + # test_full_coinjoin.py in this directory) + jmprint("\n\nMaker wallet seeds: ") + for i in range(start_bot_num, end_bot_num): + jmprint("Maker seed: " + wallet_services[i - 1]['seed']) + jmprint("\n") + wallet_service.sync_wallet(fast=True) + ygclass = YieldGeneratorBasic + + # As per previous note, override non-default command line settings: + options = {} + for x in ["ordertype", "txfee_contribution", "txfee_contribution_factor", + "cjfee_a", "cjfee_r", "cjfee_factor", "minsize", "size_factor"]: + options[x] = jm_single().config.get("YIELDGENERATOR", x) + ordertype = options["ordertype"] + txfee_contribution = int(options["txfee_contribution"]) + txfee_contribution_factor = float(options["txfee_contribution_factor"]) + cjfee_factor = float(options["cjfee_factor"]) + size_factor = float(options["size_factor"]) + if ordertype == 'reloffer': + cjfee_r = options["cjfee_r"] + # minimum size is such that you always net profit at least 20% + #of the miner fee + minsize = max(int(1.2 * txfee_contribution / float(cjfee_r)), + int(options["minsize"])) + cjfee_a = None + elif ordertype == 'absoffer': + cjfee_a = int(options["cjfee_a"]) + minsize = int(options["minsize"]) + cjfee_r = None + else: + assert False, "incorrect offertype config for yieldgenerator." + + txtype = wallet_service.get_txtype() + if txtype == "p2wpkh": + prefix = "sw0" + elif txtype == "p2sh-p2wpkh": + prefix = "sw" + elif txtype == "p2pkh": + prefix = "" + else: + assert False, "Unsupported wallet type for yieldgenerator: " + txtype + + ordertype = prefix + ordertype + + for i in range(start_bot_num, end_bot_num): + cfg = [txfee_contribution, cjfee_a, cjfee_r, ordertype, minsize, + txfee_contribution_factor, cjfee_factor, size_factor] + wallet_service_yg = wallet_services[i - 1]["wallet"] + + wallet_service_yg.startService() + + yg = ygclass(wallet_service_yg, cfg) + clientfactory = RegtestJMClientProtocolFactory(yg, proto_type="MAKER") + # This ensures that the right rpc/port config is passed into the daemon, + # for this specific bot: + clientfactory.i = i + # This ensures that this bot knows which other bots are directory nodes: + clientfactory.set_directory_nodes(directory_node_indices) + nodaemon = jm_single().config.getint("DAEMON", "no_daemon") + daemon = bool(nodaemon) + #rs = True if i == num_ygs - 1 else False + start_reactor(jm_single().config.get("DAEMON", "daemon_host"), + jm_single().config.getint("DAEMON", "daemon_port"), + clientfactory, daemon=daemon, rs=False) + reactor.callLater(1.0, start_test_taker, wallet_services[end_bot_num - 1]['wallet'], end_bot_num, num_ygs) + reactor.run() + +@defer.inlineCallbacks +def start_test_taker(wallet_service, i, num_ygs): + # this rpc manager has auth disabled, + # and the wallet_service is set manually, + # so no unlock etc. + mgr = TWalletRPCManager() + mgr.daemon.services["wallet"] = wallet_service + # because we are manually setting the wallet_service + # of the JMWalletDaemon instance, we do not follow the + # usual flow of `initialize_wallet_service`, we do not set + # the auth token or start the websocket; so we must manually + # sync the wallet, including bypassing any restart callback: + def dummy_restart_callback(msg): + log.warn("Ignoring rescan request from backend wallet service: " + msg) + mgr.daemon.services["wallet"].add_restart_callback(dummy_restart_callback) + mgr.daemon.wallet_name = wallet_name + mgr.daemon.services["wallet"].startService() + def get_client_factory(): + clientfactory = RegtestJMClientProtocolFactory(mgr.daemon.taker, + proto_type="TAKER") + clientfactory.i = i + clientfactory.set_directory_nodes(directory_node_indices) + return clientfactory + + mgr.daemon.get_client_factory = get_client_factory + # before preparing the RPC call to the wallet daemon, + # we decide a coinjoin destination, counterparty count and amount. + # Choosing a destination in the wallet is a bit easier because + # we can query the mixdepth balance at the end. + coinjoin_destination = mgr.daemon.services["wallet"].get_internal_addr(4) + cj_amount = 22000000 + def n_cps_from_n_ygs(n): + if n > 4: + return n - 2 + if n > 2: + return 2 + assert False, "Need at least 3 yield generators to test" + n_cps = n_cps_from_n_ygs(num_ygs) + # once the taker is finished we sanity check before + # shutting down: + def dummy_taker_finished(res, fromtx=False, + waittime=0.0, txdetails=None): + jmprint("Taker is finished") + # check that the funds have arrived. + mbal = mgr.daemon.services["wallet"].get_balance_by_mixdepth()[4] + assert mbal == cj_amount + jmprint("Funds: {} sats successfully arrived into mixdepth 4.".format(cj_amount)) + stop_reactor() + mgr.daemon.taker_finished = dummy_taker_finished + mgr.start() + agent = get_nontor_agent() + addr = mgr.get_route_root() + addr += "/wallet/" + addr += mgr.daemon.wallet_name + addr += "/taker/coinjoin" + addr = addr.encode() + body = BytesProducer(json.dumps({"mixdepth": "1", + "amount_sats": cj_amount, + "counterparties": str(n_cps), + "destination": coinjoin_destination}).encode()) + yield mgr.do_request(agent, b"POST", addr, body, + process_coinjoin_response) + +def process_coinjoin_response(response): + json_body = json.loads(response.decode("utf-8")) + print("coinjoin response: {}".format(json_body)) + +@pytest.fixture(scope="module") +def setup_onion_ygrunner(): + load_test_config() + jm_single().bc_interface.tick_forward_chain_interval = 10 + jm_single().bc_interface.simulate_blocks() diff --git a/test/regtest_joinmarket.cfg b/test/regtest_joinmarket.cfg index 4d3c211cf..ab0742aac 100644 --- a/test/regtest_joinmarket.cfg +++ b/test/regtest_joinmarket.cfg @@ -16,6 +16,7 @@ network = testnet rpc_wallet_file = jm-test-wallet [MESSAGING:server1] +type = irc host = localhost hostid = localhost1 channel = joinmarket-pit @@ -26,6 +27,7 @@ socks5_host = localhost socks5_port = 9150 [MESSAGING:server2] +type = irc host = localhost hostid = localhost2 channel = joinmarket-pit @@ -35,8 +37,46 @@ socks5 = false socks5_host = localhost socks5_port = 9150 +[MESSAGING:onion] +# onion based message channels must have the exact type 'onion' +# (while the section name above can be MESSAGING:whatever), and there must +# be only ONE such message channel configured (note the directory servers +# can be multiple, below): +type = onion +socks5_host = localhost +socks5_port = 9050 +# the tor control configuration: +tor_control_host = localhost +# or, to use a UNIX socket +# control_host = unix:/var/run/tor/control +tor_control_port = 9051 +# the host/port actually serving the hidden service +# (note the *virtual port*, that the client uses, +# is hardcoded to 80): +onion_serving_host = 127.0.0.1 +onion_serving_port = 8080 +# This is mandatory for directory nodes (who must also set their +# own .onion:port as the only directory in directory_nodes, below), +# but NOT TO BE USED by non-directory nodes (which is you, unless +# you know otherwise!), as it will greatly degrade your privacy. +# +# Special handling on regtest, so just ignore and let the code handle it: +hidden_service_dir = "" +# This is a comma separated list (comma can be omitted if only one item). +# Each item has format host:port +# On regtest we are going to increment the port numbers served from, with +# the value used here as the starting value: +directory_nodes = localhost:8081 +# this is not present in default real config +# and is specifically used to flag tests: +# means we use indices 1,2,3,4,5: +regtest_count=1,5 + [TIMEOUT] -maker_timeout_sec = 15 +maker_timeout_sec = 10 + +[LOGGING] +console_log_level = DEBUG [POLICY] # for dust sweeping, try merge_algorithm = gradual diff --git a/test/ygrunner.py b/test/ygrunner.py index 88ef65b97..d657179d5 100644 --- a/test/ygrunner.py +++ b/test/ygrunner.py @@ -96,7 +96,7 @@ def on_tx_received(self, nick, tx, offerinfo): "num_ygs, wallet_structures, fb_indices, mean_amt, malicious, deterministic", [ # 1sp 3yg, honest makers, one maker has FB: - (3, [[1, 3, 0, 0, 0]] * 4, [1, 2], 2, 0, False), + (3, [[1, 3, 0, 0, 0]] * 4, [], 2, 0, False), # 1sp 3yg, malicious makers reject on auth and on tx 30% of time #(3, [[1, 3, 0, 0, 0]] * 4, 2, 30, False), # 1 sp 9 ygs, deterministically malicious 50% of time @@ -173,6 +173,7 @@ def test_start_ygs(setup_ygrunner, num_ygs, wallet_structures, fb_indices, ygclass = DeterministicMaliciousYieldGenerator else: ygclass = MaliciousYieldGenerator + for i in range(num_ygs): cfg = [txfee_contribution, cjfee_a, cjfee_r, ordertype, minsize, txfee_contribution_factor, cjfee_factor, size_factor]