diff --git a/Cargo.lock b/Cargo.lock index 568e10b6629..88db047e438 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1477,6 +1477,7 @@ checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" dependencies = [ "block-buffer 0.10.2", "crypto-common", + "subtle", ] [[package]] @@ -2107,6 +2108,24 @@ dependencies = [ "proc-macro-hack", ] +[[package]] +name = "hkdf" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "791a029f6b9fc27657f6f188ec6e5e43f6911f6f878e0dc5501396e09809d437" +dependencies = [ + "hmac", +] + +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest 0.10.3", +] + [[package]] name = "http" version = "0.2.7" @@ -3157,6 +3176,44 @@ dependencies = [ "serde_json", ] +[[package]] +name = "near-mirror" +version = "0.0.0" +dependencies = [ + "actix", + "anyhow", + "borsh", + "bs58", + "clap 3.1.18", + "ed25519-dalek", + "hex", + "hkdf", + "near-chain-configs", + "near-client", + "near-client-primitives", + "near-crypto", + "near-indexer", + "near-indexer-primitives", + "near-network", + "near-o11y", + "near-primitives", + "near-primitives-core", + "near-store", + "nearcore", + "once_cell", + "openssl-probe", + "rand_core 0.5.1", + "rocksdb", + "secp256k1", + "serde", + "serde_json", + "sha2 0.10.2", + "strum", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "near-network" version = "0.0.0" @@ -3615,6 +3672,7 @@ dependencies = [ "futures", "near-chain-configs", "near-jsonrpc-primitives", + "near-mirror", "near-o11y", "near-performance-metrics", "near-primitives", diff --git a/Cargo.toml b/Cargo.toml index 3f6c0ae6df9..60a4ac317f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "tools/chainsync-loadtest", "tools/delay-detector", "tools/indexer/example", + "tools/mirror", "tools/mock-node", "tools/restaked", "tools/rpctypegen/core", @@ -111,6 +112,7 @@ fs2 = "0.4" futures = "0.3.5" futures-util = "0.3" hex = { version = "0.4.2", features = ["serde"] } +hkdf = "0.12.3" hyper = { version = "0.14", features = ["full"] } hyper-tls = "0.5.0" im = "15" @@ -148,6 +150,7 @@ protobuf-codegen = "3.0.1" quote = "1.0" rand = "0.8.5" rand_chacha = "0.3.1" +rand_core = "0.5" rand_hc = "0.3.1" rand_xorshift = "0.3" rayon = "1.5" diff --git a/core/chain-configs/src/genesis_config.rs b/core/chain-configs/src/genesis_config.rs index 41e86418aa5..3d1f9f8ad72 100644 --- a/core/chain-configs/src/genesis_config.rs +++ b/core/chain-configs/src/genesis_config.rs @@ -393,7 +393,7 @@ impl<'de, F: FnMut(StateRecord)> DeserializeSeed<'de> for RecordsProcessor<&'_ m } } -fn stream_records_from_file( +pub fn stream_records_from_file( reader: impl Read, mut callback: impl FnMut(StateRecord), ) -> serde_json::Result<()> { diff --git a/core/chain-configs/src/lib.rs b/core/chain-configs/src/lib.rs index c698b3bf881..ab2fcb53ca7 100644 --- a/core/chain-configs/src/lib.rs +++ b/core/chain-configs/src/lib.rs @@ -7,6 +7,6 @@ pub use client_config::{ MIN_GC_NUM_EPOCHS_TO_KEEP, TEST_STATE_SYNC_TIMEOUT, }; pub use genesis_config::{ - get_initial_supply, Genesis, GenesisChangeConfig, GenesisConfig, GenesisRecords, - GenesisValidationMode, ProtocolConfig, ProtocolConfigView, + get_initial_supply, stream_records_from_file, Genesis, GenesisChangeConfig, GenesisConfig, + GenesisRecords, GenesisValidationMode, ProtocolConfig, ProtocolConfigView, }; diff --git a/neard/Cargo.toml b/neard/Cargo.toml index 886461cb545..ffd66616724 100644 --- a/neard/Cargo.toml +++ b/neard/Cargo.toml @@ -34,6 +34,7 @@ tracing.workspace = true nearcore = { path = "../nearcore" } near-chain-configs = { path = "../core/chain-configs" } near-jsonrpc-primitives = { path = "../chain/jsonrpc-primitives" } +near-mirror = { path = "../tools/mirror" } near-primitives = { path = "../core/primitives" } near-performance-metrics = { path = "../utils/near-performance-metrics" } near-state-viewer = { path = "../tools/state-viewer", package = "state-viewer" } diff --git a/neard/src/cli.rs b/neard/src/cli.rs index 3e80ac999f6..3b6ff113433 100644 --- a/neard/src/cli.rs +++ b/neard/src/cli.rs @@ -3,6 +3,7 @@ use anyhow::Context; use clap::{Args, Parser}; use near_chain_configs::GenesisValidationMode; use near_jsonrpc_primitives::types::light_client::RpcLightClientExecutionProofResponse; +use near_mirror::MirrorCommand; use near_o11y::tracing_subscriber::EnvFilter; use near_o11y::{ default_subscriber, default_subscriber_with_opentelemetry, BuildEnvFilterError, @@ -93,6 +94,9 @@ impl NeardCmd { NeardSubCommand::VerifyProof(cmd) => { cmd.run(); } + NeardSubCommand::Mirror(cmd) => { + cmd.run()?; + } }; Ok(()) } @@ -104,6 +108,8 @@ pub(crate) enum RunError { EnvFilter(#[source] BuildEnvFilterError), #[error("could not install a rayon thread pool")] RayonInstall(#[source] rayon::ThreadPoolBuildError), + #[error(transparent)] + Other(#[from] anyhow::Error), } #[derive(Parser)] @@ -189,6 +195,10 @@ pub(super) enum NeardSubCommand { /// Verify proofs #[clap(alias = "verify_proof")] VerifyProof(VerifyProofSubCommand), + + /// Mirror transactions from a source chain to a test chain with state forked + /// from it, reproducing traffic and state as closely as possible. + Mirror(MirrorCommand), } #[derive(Parser)] diff --git a/pytest/lib/key.py b/pytest/lib/key.py index 9dbcc0d5698..9d99385de62 100644 --- a/pytest/lib/key.py +++ b/pytest/lib/key.py @@ -17,6 +17,13 @@ def __init__(self, account_id: str, pk: str, sk: str) -> None: self.pk = pk self.sk = sk + @classmethod + def from_random(cls, account_id: str) -> 'Key': + keys = ed25519.create_keypair(entropy=os.urandom) + sk = 'ed25519:' + base58.b58encode(keys[0].to_bytes()).decode('ascii') + pk = 'ed25519:' + base58.b58encode(keys[1].to_bytes()).decode('ascii') + return cls(account_id, pk, sk) + @classmethod def implicit_account(cls) -> 'Key': keys = ed25519.create_keypair(entropy=os.urandom) diff --git a/pytest/tools/mirror/test.py b/pytest/tools/mirror/test.py new file mode 100755 index 00000000000..b51f9186d97 --- /dev/null +++ b/pytest/tools/mirror/test.py @@ -0,0 +1,470 @@ +#!/usr/bin/env python3 + +import sys, time, base58, random +import atexit +import base58 +import json +import os +import pathlib +import shutil +import signal +import subprocess + +sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) + +from cluster import init_cluster, spin_up_node, load_config +from configured_logger import logger +from mocknet import create_genesis_file +import transaction +import utils +import key + +# This sets up an environment to test the tools/mirror process. It starts a localnet with a few validators +# and waits for some blocks to be produced. Then we fork the state and start a new chain from that, and +# start the mirror process that should mirror transactions from the source chain to the target chain. +# Halfway through we restart it to make sure that it still works properly when restarted + +TIMEOUT = 240 +NUM_VALIDATORS = 4 +TARGET_VALIDATORS = ['foo0', 'foo1', 'foo2'] +MIRROR_DIR = 'test-mirror' + + +def mkdir_clean(dirname): + try: + dirname.mkdir() + except FileExistsError: + shutil.rmtree(dirname) + dirname.mkdir() + + +def dot_near(): + return pathlib.Path.home() / '.near' + + +def ordinal_to_port(port, ordinal): + return f'0.0.0.0:{port + 10 + ordinal}' + + +def init_target_dir(neard, home, ordinal, validator_account=None): + mkdir_clean(home) + + try: + subprocess.check_output([neard, '--home', home, 'init'], + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + sys.exit(f'"neard init" command failed: output: {e.stdout}') + shutil.copy(dot_near() / 'test0/config.json', home / 'config.json') + shutil.copy(dot_near() / 'test0/forked/genesis.json', home / 'genesis.json') + shutil.copy(dot_near() / 'test0/forked/records.json', home / 'records.json') + + with open(home / 'config.json', 'r') as f: + config = json.load(f) + config['genesis_records_file'] = 'records.json' + config['network']['addr'] = ordinal_to_port(24567, ordinal) + config['rpc']['addr'] = ordinal_to_port(3030, ordinal) + with open(home / 'config.json', 'w') as f: + json.dump(config, f) + + if validator_account is None: + os.remove(home / 'validator_key.json') + else: + # this key and the suffix -load-test.near are hardcoded in create_genesis_file() + with open(home / 'validator_key.json', 'w') as f: + json.dump( + { + 'account_id': + f'{validator_account + "-load-test.near"}', + 'public_key': + 'ed25519:76NVkDErhbP1LGrSAf5Db6BsFJ6LBw6YVA4BsfTBohmN', + 'secret_key': + 'ed25519:3cCk8KUWBySGCxBcn1syMoY5u73wx5eaPLRbQcMi23LwBA3aLsqEbA33Ww1bsJaFrchmDciGe9otdn45SrDSkow2' + }, f) + + +def init_target_dirs(neard): + ordinal = NUM_VALIDATORS + 1 + dirs = [] + + for account_id in TARGET_VALIDATORS: + home = dot_near() / f'test_target_{account_id}' + dirs.append(str(home)) + init_target_dir(neard, home, ordinal, validator_account=account_id) + ordinal += 1 + + observer = dot_near() / f'{MIRROR_DIR}/target' + init_target_dir(neard, observer, ordinal, validator_account=None) + shutil.copy(dot_near() / 'test0/output/mirror-secret.json', + observer / 'mirror-secret.json') + return dirs, observer + + +def create_forked_chain(config, near_root): + binary_name = config.get('binary_name', 'neard') + neard = os.path.join(near_root, binary_name) + try: + subprocess.check_output([ + neard, "--home", + dot_near() / 'test0', "view-state", "dump-state", "--stream" + ], + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + sys.exit(f'"dump-state" command failed: output: {e.stdout}') + try: + subprocess.check_output([ + neard, + 'mirror', + 'prepare', + '--records-file-in', + dot_near() / 'test0/output/records.json', + '--records-file-out', + dot_near() / 'test0/output/mirror-records.json', + '--secret-file-out', + dot_near() / 'test0/output/mirror-secret.json', + ], + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + sys.exit(f'"mirror prepare" command failed: output: {e.stdout}') + + os.mkdir(dot_near() / 'test0/forked') + genesis_filename_in = dot_near() / 'test0/output/genesis.json' + genesis_filename_out = dot_near() / 'test0/forked/genesis.json' + records_filename_in = dot_near() / 'test0/output/mirror-records.json' + records_filename_out = dot_near() / 'test0/forked/records.json' + create_genesis_file(TARGET_VALIDATORS, + genesis_filename_in=genesis_filename_in, + genesis_filename_out=genesis_filename_out, + records_filename_in=records_filename_in, + records_filename_out=records_filename_out, + rpc_node_names=[], + chain_id='foonet', + append=True, + epoch_length=20, + node_pks=None, + increasing_stakes=0.0, + num_seats=len(TARGET_VALIDATORS)) + return init_target_dirs(neard) + + +def init_mirror_dir(home, source_boot_node): + mkdir_clean(dot_near() / MIRROR_DIR) + os.rename(home, dot_near() / f'{MIRROR_DIR}/source') + ordinal = NUM_VALIDATORS + with open(dot_near() / f'{MIRROR_DIR}/source/config.json', 'r') as f: + config = json.load(f) + config['network']['boot_nodes'] = source_boot_node.addr_with_pk() + config['network']['addr'] = ordinal_to_port(24567, ordinal) + config['rpc']['addr'] = ordinal_to_port(3030, ordinal) + with open(dot_near() / f'{MIRROR_DIR}/source/config.json', 'w') as f: + json.dump(config, f) + + +def mirror_cleanup(process): + process.send_signal(signal.SIGINT) + try: + process.wait(5) + except: + process.kill() + logger.error('can\'t kill mirror process') + + +def start_mirror(near_root, source_home, target_home, boot_node): + env = os.environ.copy() + env["RUST_LOG"] = "actix_web=warn,mio=warn,tokio_util=warn,actix_server=warn,actix_http=warn," + env.get( + "RUST_LOG", "debug") + with open(dot_near() / f'{MIRROR_DIR}/stdout', 'ab') as stdout, \ + open(dot_near() / f'{MIRROR_DIR}/stderr', 'ab') as stderr: + process = subprocess.Popen([ + os.path.join(near_root, 'neard'), 'mirror', 'run', "--source-home", + source_home, "--target-home", target_home, '--secret-file', + target_home / 'mirror-secret.json' + ], + stdin=subprocess.DEVNULL, + stdout=stdout, + stderr=stderr, + env=env) + logger.info("Started mirror process") + atexit.register(mirror_cleanup, process) + with open(target_home / 'config.json', 'r') as f: + config = json.load(f) + config['network']['boot_nodes'] = boot_node.addr_with_pk() + with open(target_home / 'config.json', 'w') as f: + json.dump(config, f) + return process + + +# we'll test out adding an access key and then sending txs signed with it +# since that hits some codepaths we want to test +def send_add_access_key(node, creator_key, nonce, block_hash): + k = key.Key.from_random('test0') + action = transaction.create_full_access_key_action(k.decoded_pk()) + tx = transaction.sign_and_serialize_transaction('test0', nonce, [action], + block_hash, 'test0', + creator_key.decoded_pk(), + creator_key.decoded_sk()) + node.send_tx(tx) + return k + + +def create_subaccount(node, signer_key, nonce, block_hash): + k = key.Key.from_random('foo.' + signer_key.account_id) + actions = [] + actions.append(transaction.create_create_account_action()) + actions.append(transaction.create_full_access_key_action(k.decoded_pk())) + actions.append(transaction.create_payment_action(10**24)) + # add an extra one just to exercise some more corner cases + actions.append( + transaction.create_full_access_key_action( + key.Key.from_random(k.account_id).decoded_pk())) + + tx = transaction.sign_and_serialize_transaction(k.account_id, nonce, + actions, block_hash, + signer_key.account_id, + signer_key.decoded_pk(), + signer_key.decoded_sk()) + node.send_tx(tx) + return k + + +# a key that we added with an AddKey tx or implicit account transfer. +# just for nonce handling convenience +class AddedKey: + + def __init__(self, key): + self.nonce = None + self.key = key + + def send_if_inited(self, node, transfers, block_hash): + if self.nonce is None: + self.nonce = node.get_nonce_for_pk(self.key.account_id, self.key.pk) + + if self.nonce is not None: + for (receiver_id, amount) in transfers: + self.nonce += 1 + tx = transaction.sign_payment_tx(self.key, receiver_id, amount, + self.nonce, block_hash) + node.send_tx(tx) + + +class ImplicitAccount: + + def __init__(self): + self.key = AddedKey(key.Key.implicit_account()) + + def account_id(self): + return self.key.key.account_id + + def transfer(self, node, sender_key, amount, block_hash, nonce): + tx = transaction.sign_payment_tx(sender_key, self.account_id(), amount, + nonce, block_hash) + node.send_tx(tx) + logger.info( + f'sent {amount} to initialize implicit account {self.account_id()}') + + def send_if_inited(self, node, transfers, block_hash): + self.key.send_if_inited(node, transfers, block_hash) + + +def count_total_txs(node, min_height=0): + total = 0 + h = node.get_latest_block().hash + while True: + block = node.get_block(h)['result'] + height = int(block['header']['height']) + if height < min_height: + return total + + for c in block['chunks']: + if int(c['height_included']) == height: + chunk = node.get_chunk(c['chunk_hash'])['result'] + total += len(chunk['transactions']) + + h = block['header']['prev_hash'] + if h == '11111111111111111111111111111111': + return total + + +def check_num_txs(source_node, target_node, start_time, end_source_height): + with open(os.path.join(target_node.node_dir, 'genesis.json'), 'r') as f: + genesis_height = json.load(f)['genesis_height'] + with open(os.path.join(target_node.node_dir, 'config.json'), 'r') as f: + delay = json.load(f)['consensus']['min_block_production_delay'] + block_delay = 10**9 * int(delay['secs']) + int(delay['nanos']) + block_delay = block_delay / 10**9 + + total_source_txs = count_total_txs(source_node, min_height=genesis_height) + + # start_time is the time the mirror binary was started. Give it 20 seconds to + # sync and then 50% more than min_block_production_delay for each block between + # the start and end points of the source chain. Not ideal to be basing a test on time + # like this but there's no real strong guarantee on when the transactions should + # make it on chain, so this is some kind of reasonable timeout + + total_time_allowed = 20 + (end_source_height - + genesis_height) * block_delay * 1.5 + time_elapsed = time.time() - start_time + if time_elapsed < total_time_allowed: + time_left = total_time_allowed - time_elapsed + logger.info( + f'waiting for {int(time_left)} seconds to allow transactions to make it to the target chain' + ) + time.sleep(time_left) + + total_target_txs = count_total_txs(target_node) + assert total_source_txs == total_target_txs, (total_source_txs, + total_target_txs) + logger.info(f'all {total_source_txs} transactions mirrored') + + +def main(): + config_changes = {} + for i in range(NUM_VALIDATORS + 1): + config_changes[i] = {"tracked_shards": [0, 1, 2, 3], "archive": True} + + config = load_config() + near_root, node_dirs = init_cluster(num_nodes=NUM_VALIDATORS, + num_observers=1, + num_shards=4, + config=config, + genesis_config_changes=[ + ["epoch_length", 10], + ], + client_config_changes=config_changes) + + nodes = [spin_up_node(config, near_root, node_dirs[0], 0)] + + init_mirror_dir(node_dirs[NUM_VALIDATORS], nodes[0]) + + for i in range(1, NUM_VALIDATORS): + nodes.append( + spin_up_node(config, near_root, node_dirs[i], i, + boot_node=nodes[0])) + + ctx = utils.TxContext([i for i in range(len(nodes))], nodes) + + implicit_account1 = ImplicitAccount() + for height, block_hash in utils.poll_blocks(nodes[0], timeout=TIMEOUT): + implicit_account1.transfer(nodes[0], nodes[0].signer_key, 10**24, + base58.b58decode(block_hash.encode('utf8')), + ctx.next_nonce) + ctx.next_nonce += 1 + break + + for height, block_hash in utils.poll_blocks(nodes[0], timeout=TIMEOUT): + block_hash_bytes = base58.b58decode(block_hash.encode('utf8')) + + implicit_account1.send_if_inited(nodes[0], [('test2', height), + ('test3', height)], + block_hash_bytes) + ctx.send_moar_txs(block_hash, 10, use_routing=False) + + if height > 12: + break + + nodes[0].kill() + target_node_dirs, target_observer_dir = create_forked_chain( + config, near_root) + nodes[0].start(boot_node=nodes[1]) + + ordinal = NUM_VALIDATORS + 1 + target_nodes = [ + spin_up_node(config, near_root, target_node_dirs[0], ordinal) + ] + for i in range(1, len(target_node_dirs)): + ordinal += 1 + target_nodes.append( + spin_up_node(config, + near_root, + target_node_dirs[i], + ordinal, + boot_node=target_nodes[0])) + + p = start_mirror(near_root, + dot_near() / f'{MIRROR_DIR}/source/', target_observer_dir, + target_nodes[0]) + start_time = time.time() + start_source_height = nodes[0].get_latest_block().height + restarted = False + + subaccount_key = AddedKey( + create_subaccount(nodes[0], nodes[0].signer_key, ctx.next_nonce, + block_hash_bytes)) + ctx.next_nonce += 1 + + new_key = AddedKey( + send_add_access_key(nodes[0], nodes[0].signer_key, ctx.next_nonce, + block_hash_bytes)) + ctx.next_nonce += 1 + + implicit_account2 = ImplicitAccount() + # here we are gonna send a tiny amount (1 yoctoNEAR) to the implicit account and + # then wait a bit before properly initializing it. This hits a corner case where the + # mirror binary needs to properly look for the second tx's outcome to find the starting + # nonce because the first one failed + implicit_account2.transfer(nodes[0], nodes[0].signer_key, 1, + block_hash_bytes, ctx.next_nonce) + ctx.next_nonce += 1 + time.sleep(2) + implicit_account2.transfer(nodes[0], nodes[0].signer_key, 10**24, + block_hash_bytes, ctx.next_nonce) + ctx.next_nonce += 1 + + for height, block_hash in utils.poll_blocks(nodes[0], timeout=TIMEOUT): + code = p.poll() + if code is not None: + assert code == 0 + break + + block_hash_bytes = base58.b58decode(block_hash.encode('utf8')) + + ctx.send_moar_txs(block_hash, 10, use_routing=False) + + implicit_account1.send_if_inited( + nodes[0], [('test2', height), ('test1', height), + (implicit_account2.account_id(), height)], + block_hash_bytes) + implicit_account2.send_if_inited( + nodes[1], [('test2', height), ('test0', height), + (implicit_account1.account_id(), height)], + block_hash_bytes) + new_key.send_if_inited(nodes[2], + [('test1', height), ('test2', height), + (implicit_account1.account_id(), height), + (implicit_account2.account_id(), height)], + block_hash_bytes) + subaccount_key.send_if_inited( + nodes[3], [('test3', height), + (implicit_account2.account_id(), height)], + block_hash_bytes) + + if not restarted and height - start_source_height >= 50: + logger.info('stopping mirror process') + p.terminate() + p.wait() + with open(dot_near() / f'{MIRROR_DIR}/stderr', 'ab') as stderr: + stderr.write( + b'<><><><><><><><><><><><> restarting <><><><><><><><><><><><><><><><><><><><>\n' + ) + stderr.write( + b'<><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><>\n' + ) + stderr.write( + b'<><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><>\n' + ) + p = start_mirror(near_root, + dot_near() / f'{MIRROR_DIR}/source/', + target_observer_dir, target_nodes[0]) + restarted = True + + if height - start_source_height >= 100: + break + + time.sleep(5) + # we don't need these anymore + for node in nodes[1:]: + node.kill() + check_num_txs(nodes[0], target_nodes[0], start_time, height) + + +if __name__ == '__main__': + main() diff --git a/tools/mirror/Cargo.toml b/tools/mirror/Cargo.toml new file mode 100644 index 00000000000..7137dd87ecb --- /dev/null +++ b/tools/mirror/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "near-mirror" +version = "0.0.0" +authors.workspace = true +publish = false +# Please update rust-toolchain.toml as well when changing version here: +rust-version.workspace = true +edition.workspace = true + +[dependencies] +actix.workspace = true +anyhow.workspace = true +borsh.workspace = true +bs58.workspace = true +clap.workspace = true +ed25519-dalek.workspace = true +hex.workspace = true +hkdf.workspace = true +once_cell.workspace = true +openssl-probe.workspace = true +rand_core.workspace = true +rocksdb.workspace = true +secp256k1.workspace = true +serde.workspace = true +serde_json.workspace = true +sha2.workspace = true +strum.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true + +nearcore = { path = "../../nearcore" } +near-chain-configs = { path = "../../core/chain-configs" } +near-client = { path = "../../chain/client" } +near-client-primitives = { path = "../../chain/client-primitives" } +near-indexer-primitives = { path = "../../chain/indexer-primitives" } +near-indexer = { path = "../../chain/indexer" } +near-network = { path = "../../chain/network" } +near-primitives = { path = "../../core/primitives" } +near-primitives-core = { path = "../../core/primitives-core" } +near-o11y = { path = "../../core/o11y" } +near-store = { path = "../../core/store" } +near-crypto = { path = "../../core/crypto" } \ No newline at end of file diff --git a/tools/mirror/README.md b/tools/mirror/README.md new file mode 100644 index 00000000000..d44b27dd2d5 --- /dev/null +++ b/tools/mirror/README.md @@ -0,0 +1,59 @@ +## Transaction Mirror + +This is some code that tries to help with the following: We have some +chain, let's call it the "source chain", producing blocks and chunks +with transactions as usual, and we have another chain, let's call it +the "target chain" that starts from state forked from the source +chain. Usually this would be done by using the `neard view-state +dump-state` command, and using the resulting genesis and records file +as the start of the target chain. What we want is to then periodically +send the transactions appearing in the source chain after the fork +point to the target chain. Ideally, the traffic we see in the target +chain will be very similar to the traffic in the source chain. + +The first approach we might try is to just send the source chain +transactions byte-for-byte unaltered to the target chain. This almost +works, but not quite, because the `block_hash` field in the +transactions will be rejected. This means we have no choice but to +replace the accounts' public keys in the original forked state, so +that we can sign transactions with a valid `block_hash` field. So the +way we'll use this is that we'll generate the forked state from the +source chain using the usual `dump-state` command, and then run: + +``` +$ mirror prepare --records-file-in "~/.near/output/records.json" --records-file-out "~/.near/output/mapped-records.json" +``` + +This command will output a records file where the keys have been +replaced. And then the logic we end up with when running the +transaction generator is something like this: + +``` +loop { + sleep(ONE_SECOND); + source_block = fetch_block(source_chain_view_client, height); + for chunk in block: + for tx in chunk: + private_key = map_key(tx.public_key) + block_hash = fetch_head_hash(target_chain_view_client) + new_tx = sign_tx(private_key, tx.actions, block_hash) + send_tx(target_chain_client, new_tx) +} +``` + +So then the question is what does `map_key()` do?. If we don't care +about the security of these accounts in the target chain (for example +if the target chain is just some throwaway test chain that nobody +would have any incentive to mess with), we can just use the bytes of +the public key directly as the private key. If we do care somewhat +about security, then we pass a `--secret-key-file` argument to the +`prepare` command, and pass it as an argument to `map_key()`. Using +that makes things a little bit more delicate, since if the generated +secret is ever lost, then it will no longer be possible to mirror any +traffic to the target chain. + +known problems: + +keys in the source chain added with the `promise_batch_action_add_key*` +host functions will not be mapped in the target chain. Maybe a solution +could be to replace those keys manually or something? diff --git a/tools/mirror/src/chain_tracker.rs b/tools/mirror/src/chain_tracker.rs new file mode 100644 index 00000000000..40d7cfd8635 --- /dev/null +++ b/tools/mirror/src/chain_tracker.rs @@ -0,0 +1,430 @@ +use crate::MappedBlock; +use near_crypto::PublicKey; +use near_indexer::StreamerMessage; +use near_indexer_primitives::IndexerTransactionWithOutcome; +use near_primitives::hash::CryptoHash; +use near_primitives::transaction::SignedTransaction; +use near_primitives::types::{AccountId, BlockHeight}; +use near_primitives_core::types::{Nonce, ShardId}; +use std::cmp::Ordering; +use std::collections::hash_map; +use std::collections::HashMap; +use std::collections::{BTreeSet, VecDeque}; +use std::pin::Pin; +use std::time::{Duration, Instant}; + +struct TxSendInfo { + sent_at: Instant, + source_height: BlockHeight, + target_height: BlockHeight, +} + +#[derive(PartialEq, Eq, Debug)] +struct TxId { + hash: CryptoHash, + nonce: Nonce, +} + +impl PartialOrd for TxId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TxId { + fn cmp(&self, other: &Self) -> Ordering { + self.nonce.cmp(&other.nonce).then_with(|| self.hash.cmp(&other.hash)) + } +} + +// we want a reference to transactions in .queued_blocks that need to have nonces +// set later. To avoid having the struct be self referential we keep this struct +// with enough info to look it up later. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct TxRef { + height: BlockHeight, + shard_id: ShardId, + tx_idx: usize, +} + +struct TxAwaitingNonceCursor<'a> { + txs: &'a [TxRef], + idx: usize, +} + +impl<'a> TxAwaitingNonceCursor<'a> { + fn new(txs: &'a [TxRef]) -> Self { + Self { txs, idx: 0 } + } +} + +pub(crate) struct TxAwaitingNonceIter<'a> { + queued_blocks: &'a VecDeque, + iter: hash_map::Iter<'a, BlockHeight, Vec>, + cursor: Option>, +} + +impl<'a> TxAwaitingNonceIter<'a> { + fn new( + queued_blocks: &'a VecDeque, + txs_awaiting_nonce: &'a HashMap>, + ) -> Self { + let mut iter = txs_awaiting_nonce.iter(); + let cursor = iter.next().map(|(_height, txs)| TxAwaitingNonceCursor::new(txs)); + Self { queued_blocks, iter, cursor } + } +} + +impl<'a> Iterator for TxAwaitingNonceIter<'a> { + type Item = (&'a TxRef, &'a crate::TxAwaitingNonce); + + fn next(&mut self) -> Option { + match &mut self.cursor { + Some(c) => { + let tx_ref = &c.txs[c.idx]; + c.idx += 1; + if c.idx == c.txs.len() { + self.cursor = + self.iter.next().map(|(_height, txs)| TxAwaitingNonceCursor::new(txs)); + } + let block_idx = self + .queued_blocks + .binary_search_by(|b| b.source_height.cmp(&tx_ref.height)) + .unwrap(); + let block = &self.queued_blocks[block_idx]; + let chunk = block.chunks.iter().find(|c| c.shard_id == tx_ref.shard_id).unwrap(); + match &chunk.txs[tx_ref.tx_idx] { + crate::TargetChainTx::AwaitingNonce(tx) => Some((tx_ref, tx)), + crate::TargetChainTx::Ready(_) => unreachable!(), + } + } + None => None, + } + } +} + +// Keeps the queue of upcoming transactions and provides them in regular intervals via next_batch() +// Also keeps track of txs we've sent so far and looks for them on chain, for metrics/logging purposes. +#[derive(Default)] +pub(crate) struct TxTracker { + sent_txs: HashMap, + txs_by_signer: HashMap<(AccountId, PublicKey), BTreeSet>, + queued_blocks: VecDeque, + txs_awaiting_nonce: HashMap>, + pending_access_keys: HashMap<(AccountId, PublicKey), usize>, + height_queued: Option, + send_time: Option>>, + // Config value in the target chain, used to judge how long to wait before sending a new batch of txs + min_block_production_delay: Duration, + // timestamps in the target chain, used to judge how long to wait before sending a new batch of txs + recent_block_timestamps: VecDeque, +} + +impl TxTracker { + pub(crate) fn new(min_block_production_delay: Duration) -> Self { + Self { min_block_production_delay, ..Default::default() } + } + + pub(crate) fn height_queued(&self) -> Option { + self.height_queued + } + + pub(crate) fn num_blocks_queued(&self) -> usize { + self.queued_blocks.len() + } + + pub(crate) fn pending_access_keys_iter<'a>( + &'a self, + ) -> impl Iterator { + self.pending_access_keys.iter().map(|(x, _)| x) + } + + pub(crate) fn tx_awaiting_nonce_iter<'a>(&'a self) -> TxAwaitingNonceIter<'a> { + TxAwaitingNonceIter::new(&self.queued_blocks, &self.txs_awaiting_nonce) + } + + fn pending_access_keys_deref( + &mut self, + source_signer_id: AccountId, + source_public_key: PublicKey, + ) { + match self.pending_access_keys.entry((source_signer_id, source_public_key)) { + hash_map::Entry::Occupied(mut e) => { + let ref_count = e.get_mut(); + if *ref_count == 1 { + e.remove(); + } else { + *ref_count -= 1; + } + } + hash_map::Entry::Vacant(_) => unreachable!(), + } + } + + // We now know of a valid nonce for the transaction referenced by tx_ref. + // Set the nonce and mark the tx as ready to be sent later. + pub(crate) fn set_tx_nonce(&mut self, tx_ref: &TxRef, nonce: Nonce) { + let block_idx = + self.queued_blocks.binary_search_by(|b| b.source_height.cmp(&tx_ref.height)).unwrap(); + let block = &mut self.queued_blocks[block_idx]; + let chunk = block.chunks.iter_mut().find(|c| c.shard_id == tx_ref.shard_id).unwrap(); + let tx = &mut chunk.txs[tx_ref.tx_idx]; + + match self.txs_awaiting_nonce.entry(tx_ref.height) { + hash_map::Entry::Occupied(mut e) => { + let txs = e.get_mut(); + if txs.len() == 1 { + assert!(&txs[0] == tx_ref); + e.remove(); + } else { + let idx = txs.iter().position(|t| t == tx_ref).unwrap(); + txs.swap_remove(idx); + } + } + hash_map::Entry::Vacant(_) => unreachable!(), + } + let (source_signer_id, source_public_key) = match &tx { + crate::TargetChainTx::AwaitingNonce(tx) => { + (tx.source_signer_id.clone(), tx.source_public.clone()) + } + crate::TargetChainTx::Ready(_) => unreachable!(), + }; + + tx.set_nonce(nonce); + self.pending_access_keys_deref(source_signer_id, source_public_key); + } + + pub(crate) fn queue_block(&mut self, block: MappedBlock) { + self.height_queued = Some(block.source_height); + let mut txs_awaiting_nonce = Vec::new(); + for c in block.chunks.iter() { + for (tx_idx, tx) in c.txs.iter().enumerate() { + if let crate::TargetChainTx::AwaitingNonce(tx) = tx { + txs_awaiting_nonce.push(TxRef { + height: block.source_height, + shard_id: c.shard_id, + tx_idx, + }); + *self + .pending_access_keys + .entry((tx.source_signer_id.clone(), tx.source_public.clone())) + .or_default() += 1; + } + } + } + if !txs_awaiting_nonce.is_empty() { + self.txs_awaiting_nonce.insert(block.source_height, txs_awaiting_nonce); + } + self.queued_blocks.push_back(block); + } + + pub(crate) fn next_batch_time(&self) -> Instant { + match &self.send_time { + Some(t) => t.as_ref().deadline().into_std(), + None => Instant::now(), + } + } + + pub(crate) async fn next_batch(&mut self) -> Option { + if let Some(sleep) = &mut self.send_time { + sleep.await; + } + let block = self.queued_blocks.pop_front(); + if let Some(block) = &block { + self.txs_awaiting_nonce.remove(&block.source_height); + for chunk in block.chunks.iter() { + for tx in chunk.txs.iter() { + match &tx { + crate::TargetChainTx::AwaitingNonce(tx) => self.pending_access_keys_deref( + tx.source_signer_id.clone(), + tx.source_public.clone(), + ), + crate::TargetChainTx::Ready(_) => {} + } + } + } + } + block + } + + fn remove_tx(&mut self, tx: &IndexerTransactionWithOutcome) { + let k = (tx.transaction.signer_id.clone(), tx.transaction.public_key.clone()); + match self.txs_by_signer.entry(k.clone()) { + hash_map::Entry::Occupied(mut e) => { + let txs = e.get_mut(); + if !txs.remove(&TxId { hash: tx.transaction.hash, nonce: tx.transaction.nonce }) { + tracing::warn!(target: "mirror", "tried to remove nonexistent tx {} from txs_by_signer", tx.transaction.hash); + } + // split off from hash: default() since that's the smallest hash, which will leave us with every tx with nonce + // greater than this one in txs_left. + let txs_left = txs.split_off(&TxId { + hash: CryptoHash::default(), + nonce: tx.transaction.nonce + 1, + }); + if !txs.is_empty() { + tracing::warn!( + target: "mirror", "{} Transactions for {:?} skipped by inclusion of tx with nonce {}: {:?}. These will never make it on chain.", + txs.len(), &k, tx.transaction.nonce, &txs + ); + for t in txs.iter() { + if self.sent_txs.remove(&t.hash).is_none() { + tracing::warn!( + target: "mirror", "tx with hash {} that we thought was skipped is not in the set of sent txs", + &t.hash, + ); + } + } + } + *txs = txs_left; + if txs.is_empty() { + self.txs_by_signer.remove(&k); + } + } + hash_map::Entry::Vacant(_) => { + tracing::warn!( + target: "mirror", "recently removed tx {}, but ({:?}, {:?}) not in txs_by_signer", + tx.transaction.hash, tx.transaction.signer_id, tx.transaction.public_key + ); + return; + } + }; + } + + fn record_block_timestamp(&mut self, msg: &StreamerMessage) { + self.recent_block_timestamps.push_back(msg.block.header.timestamp_nanosec); + if self.recent_block_timestamps.len() > 10 { + self.recent_block_timestamps.pop_front(); + } + } + + pub(crate) fn on_target_block(&mut self, msg: &StreamerMessage) { + self.record_block_timestamp(msg); + for s in msg.shards.iter() { + if let Some(c) = &s.chunk { + for tx in c.transactions.iter() { + if let Some(send_info) = self.sent_txs.remove(&tx.transaction.hash) { + let latency = Instant::now() - send_info.sent_at; + tracing::debug!( + target: "mirror", "found my tx {} from source #{} in target #{} {:?} after sending @ target #{}", + tx.transaction.hash, send_info.source_height, msg.block.header.height, latency, send_info.target_height + ); + crate::metrics::TRANSACTIONS_INCLUDED.inc(); + + self.remove_tx(tx); + } + } + } + } + } + + fn on_tx_sent( + &mut self, + tx: &SignedTransaction, + source_height: BlockHeight, + target_height: BlockHeight, + ) { + let hash = tx.get_hash(); + if self.sent_txs.contains_key(&hash) { + tracing::warn!(target: "mirror", "transaction sent twice: {}", &hash); + return; + } + + // TODO: don't keep adding txs if we're not ever finding them on chain, since we'll OOM eventually + // if that happens. + self.sent_txs + .insert(hash, TxSendInfo { sent_at: Instant::now(), source_height, target_height }); + let txs = self + .txs_by_signer + .entry((tx.transaction.signer_id.clone(), tx.transaction.public_key.clone())) + .or_default(); + + if let Some(highest_nonce) = txs.iter().next_back() { + if highest_nonce.nonce > tx.transaction.nonce { + tracing::warn!( + target: "mirror", "transaction sent with out of order nonce: {}: {}. Sent so far: {:?}", + &hash, tx.transaction.nonce, txs + ); + } + } + if !txs.insert(TxId { hash, nonce: tx.transaction.nonce }) { + tracing::warn!(target: "mirror", "inserted tx {} twice into txs_by_signer", &hash); + } + } + + // among the last 10 blocks, what's the second longest time between their timestamps? + // probably there's a better heuristic to use than that but this will do for now. + fn second_longest_recent_block_delay(&self) -> Option { + if self.recent_block_timestamps.len() < 5 { + return None; + } + let mut last = *self.recent_block_timestamps.front().unwrap(); + let mut longest = None; + let mut second_longest = None; + + for timestamp in self.recent_block_timestamps.iter().skip(1) { + let delay = timestamp - last; + + match longest { + Some(l) => match second_longest { + Some(s) => { + if delay > l { + second_longest = longest; + longest = Some(delay); + } else if delay > s { + second_longest = Some(delay); + } + } + None => { + if delay > l { + second_longest = longest; + longest = Some(delay); + } else { + second_longest = Some(delay); + } + } + }, + None => { + longest = Some(delay); + } + } + last = *timestamp; + } + let delay = Duration::from_nanos(second_longest.unwrap()); + if delay > 2 * self.min_block_production_delay { + tracing::warn!( + "Target chain blocks are taking longer than expected to be produced. Observing delays \ + of {:?} and {:?} vs min_block_production_delay of {:?} ", + delay, + Duration::from_nanos(longest.unwrap()), + self.min_block_production_delay, + ) + } + Some(delay) + } + + // We just successfully sent some transactions. Remember them so we can see if they really show up on chain. + pub(crate) fn on_txs_sent( + &mut self, + txs: &[SignedTransaction], + source_height: BlockHeight, + target_height: BlockHeight, + ) { + tracing::info!( + target: "mirror", "Sent {} transactions from source #{} with target HEAD @ #{}", + txs.len(), source_height, target_height + ); + for tx in txs.iter() { + self.on_tx_sent(tx, source_height, target_height); + } + + let block_delay = self + .second_longest_recent_block_delay() + .unwrap_or(self.min_block_production_delay + Duration::from_millis(100)); + match &mut self.send_time { + Some(t) => t.as_mut().reset(tokio::time::Instant::now() + block_delay), + None => { + self.send_time = Some(Box::pin(tokio::time::sleep(block_delay))); + } + } + } +} diff --git a/tools/mirror/src/cli.rs b/tools/mirror/src/cli.rs new file mode 100644 index 00000000000..4f6ea0a606c --- /dev/null +++ b/tools/mirror/src/cli.rs @@ -0,0 +1,133 @@ +use anyhow::Context; +use clap::Parser; +use std::cell::Cell; +use std::path::PathBuf; + +#[derive(Parser)] +pub struct MirrorCommand { + #[clap(subcommand)] + subcmd: SubCommand, +} + +#[derive(Parser)] +enum SubCommand { + Prepare(PrepareCmd), + Run(RunCmd), +} + +/// Start two NEAR nodes, one for each chain, and try to mirror +/// transactions from the source chain to the target chain. +#[derive(Parser)] +struct RunCmd { + /// source chain home dir + #[clap(long)] + source_home: PathBuf, + /// target chain home dir + #[clap(long)] + target_home: PathBuf, + /// file containing an optional secret as generated by the + /// `prepare` command. Must be provided unless --no-secret is given + #[clap(long)] + secret_file: Option, + /// Equivalent to passing --secret-file where is a + /// config that indicates no secret should be used. If this is + /// given, and --secret-file is also given and points to a config + /// that does contain a secret, the mirror will refuse to start + #[clap(long)] + no_secret: bool, +} + +impl RunCmd { + fn run(self) -> anyhow::Result<()> { + openssl_probe::init_ssl_cert_env_vars(); + let runtime = tokio::runtime::Runtime::new().context("failed to start tokio runtime")?; + + let secret = if let Some(secret_file) = &self.secret_file { + let secret = crate::secret::load(secret_file) + .with_context(|| format!("Failed to load secret from {:?}", secret_file))?; + if secret.is_some() && self.no_secret { + anyhow::bail!( + "--no-secret given with --secret-file indicating that a secret should be used" + ); + } + secret + } else { + if !self.no_secret { + anyhow::bail!("Please give either --secret-file or --no-secret"); + } + None + }; + + let system = new_actix_system(runtime); + system + .block_on(async move { + actix::spawn(crate::run(self.source_home, self.target_home, secret)).await + }) + .unwrap() + } +} + +/// Write a new genesis records file where the public keys have been +/// altered so that this binary can sign transactions when mirroring +/// them from the source chain to the target chain +#[derive(Parser)] +struct PrepareCmd { + /// A genesis records file as output by `neard view-state + /// dump-state --stream` + #[clap(long)] + records_file_in: PathBuf, + /// Path to the new records file with updated public keys + #[clap(long)] + records_file_out: PathBuf, + /// If this is provided, don't use a secret when mapping public + /// keys to new source chain private keys. This means that anyone + /// will be able to sign transactions for the accounts in the + /// target chain corresponding to accounts in the source chain. If + /// that is okay, then --no-secret will make the code run slightly + /// faster, and you won't have to take care to not lose the + /// secret. + #[clap(long)] + no_secret: bool, + /// Path to the secret. Note that if you don't pass --no-secret, + /// this secret is required to sign transactions for the accounts + /// in the target chain corresponding to accounts in the source + /// chain. This means that if you lose this secret, you will no + /// longer be able to mirror any traffic. + #[clap(long)] + secret_file_out: PathBuf, +} + +impl PrepareCmd { + fn run(self) -> anyhow::Result<()> { + crate::genesis::map_records( + &self.records_file_in, + &self.records_file_out, + self.no_secret, + &self.secret_file_out, + ) + } +} + +// copied from neard/src/cli.rs +fn new_actix_system(runtime: tokio::runtime::Runtime) -> actix::SystemRunner { + // `with_tokio_rt()` accepts an `Fn()->Runtime`, however we know that this function is called exactly once. + // This makes it safe to move out of the captured variable `runtime`, which is done by a trick + // using a `swap` of `Cell>`s. + let runtime_cell = Cell::new(Some(runtime)); + actix::System::with_tokio_rt(|| { + let r = Cell::new(None); + runtime_cell.swap(&r); + r.into_inner().unwrap() + }) +} + +impl MirrorCommand { + pub fn run(self) -> anyhow::Result<()> { + tracing::warn!(target: "mirror", "the mirror command is not stable, and may be removed or changed arbitrarily at any time"); + + match self.subcmd { + SubCommand::Prepare(r) => r.run(), + SubCommand::Run(r) => r.run(), + } + } +} diff --git a/tools/mirror/src/genesis.rs b/tools/mirror/src/genesis.rs new file mode 100644 index 00000000000..1fd24eb6a3b --- /dev/null +++ b/tools/mirror/src/genesis.rs @@ -0,0 +1,83 @@ +use near_primitives::state_record::StateRecord; +use serde::ser::{SerializeSeq, Serializer}; +use std::fs::File; +use std::io::{BufReader, BufWriter}; +use std::path::Path; + +pub fn map_records>( + records_file_in: P, + records_file_out: P, + no_secret: bool, + secret_file_out: P, +) -> anyhow::Result<()> { + let secret = if !no_secret { + Some(crate::secret::generate(secret_file_out)?) + } else { + crate::secret::write_empty(secret_file_out)?; + None + }; + let reader = BufReader::new(File::open(records_file_in)?); + let records_out = BufWriter::new(File::create(records_file_out)?); + let mut records_ser = serde_json::Serializer::new(records_out); + let mut records_seq = records_ser.serialize_seq(None).unwrap(); + + near_chain_configs::stream_records_from_file(reader, |mut r| { + match &mut r { + StateRecord::AccessKey { account_id, public_key, access_key } => { + let replacement = crate::key_mapping::map_key(&public_key, secret.as_ref()); + let new_record = StateRecord::AccessKey { + account_id: crate::key_mapping::map_account(&account_id, secret.as_ref()), + public_key: replacement.public_key(), + access_key: access_key.clone(), + }; + // TODO: would be nice for stream_records_from_file() to let you return early on error so + // we dont have to unwrap here + records_seq.serialize_element(&new_record).unwrap(); + } + StateRecord::Account { account_id, .. } => { + if account_id.is_implicit() { + *account_id = crate::key_mapping::map_account(&account_id, secret.as_ref()); + } + records_seq.serialize_element(&r).unwrap(); + } + StateRecord::Data { account_id, .. } => { + if account_id.is_implicit() { + *account_id = crate::key_mapping::map_account(&account_id, secret.as_ref()); + } + records_seq.serialize_element(&r).unwrap(); + } + StateRecord::Contract { account_id, .. } => { + if account_id.is_implicit() { + *account_id = crate::key_mapping::map_account(&account_id, secret.as_ref()); + } + records_seq.serialize_element(&r).unwrap(); + } + StateRecord::PostponedReceipt(receipt) => { + if receipt.predecessor_id.is_implicit() || receipt.receiver_id.is_implicit() { + receipt.predecessor_id = + crate::key_mapping::map_account(&receipt.predecessor_id, secret.as_ref()); + receipt.receiver_id = + crate::key_mapping::map_account(&receipt.receiver_id, secret.as_ref()); + } + records_seq.serialize_element(&r).unwrap(); + } + StateRecord::ReceivedData { account_id, .. } => { + if account_id.is_implicit() { + *account_id = crate::key_mapping::map_account(&account_id, secret.as_ref()); + } + records_seq.serialize_element(&r).unwrap(); + } + StateRecord::DelayedReceipt(receipt) => { + if receipt.predecessor_id.is_implicit() || receipt.receiver_id.is_implicit() { + receipt.predecessor_id = + crate::key_mapping::map_account(&receipt.predecessor_id, secret.as_ref()); + receipt.receiver_id = + crate::key_mapping::map_account(&receipt.receiver_id, secret.as_ref()); + } + records_seq.serialize_element(&r).unwrap(); + } + }; + })?; + records_seq.end()?; + Ok(()) +} diff --git a/tools/mirror/src/key_mapping.rs b/tools/mirror/src/key_mapping.rs new file mode 100644 index 00000000000..01df970fd8c --- /dev/null +++ b/tools/mirror/src/key_mapping.rs @@ -0,0 +1,111 @@ +use borsh::BorshDeserialize; +use hkdf::Hkdf; +use near_crypto::{ED25519PublicKey, ED25519SecretKey, PublicKey, Secp256K1PublicKey, SecretKey}; +use near_primitives::types::AccountId; +use sha2::Sha256; + +fn ed25519_map_secret( + buf: &mut [u8], + public: &ED25519PublicKey, + secret: Option<&[u8; crate::secret::SECRET_LEN]>, +) { + match secret { + Some(secret) => { + let hk = Hkdf::::new(None, secret); + hk.expand(&public.0, buf).unwrap(); + } + None => { + buf.copy_from_slice(&public.0); + } + }; +} + +fn map_ed25519( + public: &ED25519PublicKey, + secret: Option<&[u8; crate::secret::SECRET_LEN]>, +) -> ED25519SecretKey { + let mut buf = [0; ed25519_dalek::KEYPAIR_LENGTH]; + + ed25519_map_secret(&mut buf[..ed25519_dalek::SECRET_KEY_LENGTH], public, secret); + + let secret_key = + ed25519_dalek::SecretKey::from_bytes(&buf[..ed25519_dalek::SECRET_KEY_LENGTH]).unwrap(); + let public_key = ed25519_dalek::PublicKey::from(&secret_key); + + buf[ed25519_dalek::SECRET_KEY_LENGTH..].copy_from_slice(public_key.as_bytes()); + ED25519SecretKey(buf) +} + +fn secp256k1_from_slice(buf: &mut [u8], public: &Secp256K1PublicKey) -> secp256k1::SecretKey { + match secp256k1::SecretKey::from_slice(buf) { + Ok(s) => s, + Err(_) => { + tracing::warn!(target: "mirror", "Something super unlikely occurred! SECP256K1 key mapped from {:?} is too large. Flipping most significant bit.", public); + // If we got an error, it means that either `buf` is all zeros, or that when interpreted as a 256-bit + // int, it is larger than the order of the secp256k1 curve. Since the order of the curve starts with 0xFF, + // in either case flipping the first bit should work, and we can unwrap() below. + buf[0] ^= 0x80; + secp256k1::SecretKey::from_slice(buf).unwrap() + } + } +} + +fn map_secp256k1( + public: &Secp256K1PublicKey, + secret: Option<&[u8; crate::secret::SECRET_LEN]>, +) -> secp256k1::SecretKey { + let mut buf = [0; secp256k1::constants::SECRET_KEY_SIZE]; + + match secret { + Some(secret) => { + let hk = Hkdf::::new(None, secret); + hk.expand(public.as_ref(), &mut buf).unwrap(); + } + None => { + buf.copy_from_slice(&public.as_ref()[..secp256k1::constants::SECRET_KEY_SIZE]); + } + }; + + secp256k1_from_slice(&mut buf, public) +} + +// This maps the public key to a secret key so that we can sign +// transactions on the target chain. If secret is None, then we just +// use the bytes of the public key directly, otherwise we feed the +// public key to a key derivation function. +pub(crate) fn map_key( + key: &PublicKey, + secret: Option<&[u8; crate::secret::SECRET_LEN]>, +) -> SecretKey { + match key { + PublicKey::ED25519(k) => SecretKey::ED25519(map_ed25519(k, secret)), + PublicKey::SECP256K1(k) => SecretKey::SECP256K1(map_secp256k1(k, secret)), + } +} + +// returns the public key encoded in this implicit account. panics if it's not +// actually an implicit account +// basically copy pasted from runtime/runtime/src/actions.rs +pub(crate) fn implicit_account_key(account_id: &AccountId) -> PublicKey { + let mut public_key_data = Vec::with_capacity(33); + public_key_data.push(0u8); + public_key_data.extend(hex::decode(account_id.as_ref().as_bytes()).unwrap()); + assert_eq!(public_key_data.len(), 33); + PublicKey::try_from_slice(&public_key_data).unwrap() +} + +// If it's an implicit account, interprets it as an ed25519 public key, maps that and then returns +// the resulting implicit account. Otherwise does nothing. We do this so that transactions creating +// an implicit account by sending money will generate an account that we can control +pub(crate) fn map_account( + account_id: &AccountId, + secret: Option<&[u8; crate::secret::SECRET_LEN]>, +) -> AccountId { + if account_id.is_implicit() { + let public_key = implicit_account_key(account_id); + let mapped_key = map_key(&public_key, secret); + hex::encode(mapped_key.public_key().key_data()).parse().unwrap() + } else { + account_id.clone() + } +} diff --git a/tools/mirror/src/lib.rs b/tools/mirror/src/lib.rs new file mode 100644 index 00000000000..b53f7c200a8 --- /dev/null +++ b/tools/mirror/src/lib.rs @@ -0,0 +1,1046 @@ +use actix::Addr; +use anyhow::Context; +use borsh::{BorshDeserialize, BorshSerialize}; +use near_chain_configs::GenesisValidationMode; +use near_client::{ClientActor, ViewClientActor}; +use near_client_primitives::types::{ + GetBlock, GetBlockError, GetChunk, GetChunkError, GetExecutionOutcome, + GetExecutionOutcomeError, GetExecutionOutcomeResponse, Query, QueryError, +}; +use near_crypto::{PublicKey, SecretKey}; +use near_indexer::{Indexer, StreamerMessage}; +use near_network::types::{NetworkClientMessages, NetworkClientResponses}; +use near_o11y::WithSpanContextExt; +use near_primitives::hash::CryptoHash; +use near_primitives::transaction::{ + Action, AddKeyAction, DeleteKeyAction, SignedTransaction, Transaction, +}; +use near_primitives::types::{ + AccountId, BlockHeight, BlockId, BlockReference, Finality, TransactionOrReceiptId, +}; +use near_primitives::views::{ + ExecutionStatusView, QueryRequest, QueryResponseKind, SignedTransactionView, +}; +use near_primitives_core::types::{Nonce, ShardId}; +use nearcore::config::NearConfig; +use rocksdb::DB; +use std::collections::HashSet; +use std::path::Path; +use std::time::{Duration, Instant}; +use strum::IntoEnumIterator; +use tokio::sync::mpsc; + +mod chain_tracker; +pub mod cli; +mod genesis; +mod key_mapping; +mod metrics; +mod secret; + +pub use cli::MirrorCommand; + +#[derive(strum::EnumIter)] +enum DBCol { + Misc, + // This tracks nonces for Access Keys added by AddKey transactions + // or transfers to implicit accounts (not present in the genesis state). + // For a given (account ID, public key), if we're preparing a transaction + // and there's no entry in the DB, then the key was present in the genesis + // state. Otherwise, we map tx nonces according to the values in this column. + Nonces, +} + +impl DBCol { + fn name(&self) -> &'static str { + match self { + Self::Misc => "miscellaneous", + Self::Nonces => "nonces", + } + } +} + +// returns bytes that serve as the key corresponding to this pair in the Nonces column +fn nonce_col_key(account_id: &AccountId, public_key: &PublicKey) -> Vec { + (account_id.clone(), public_key.clone()).try_to_vec().unwrap() +} + +#[derive(Clone, BorshDeserialize, BorshSerialize, Debug, PartialEq, Eq, PartialOrd, Hash)] +struct TxIds { + tx_hash: CryptoHash, + signer_id: AccountId, + receiver_id: AccountId, +} + +// For a given AddKey Action, records the starting nonces of the +// resulting Access Keys. We need this because when an AddKey receipt +// is processed, the nonce field of the AddKey action is actually +// ignored, and it's set to block_height*1000000, so to generate +// transactions with valid nonces, we need to map valid source chain +// nonces to valid target chain nonces. +#[derive(BorshDeserialize, BorshSerialize, Debug, Default)] +struct NonceDiff { + source_start: Option, + target_start: Option, + pending_source_txs: HashSet, +} + +#[derive(thiserror::Error, Debug)] +pub(crate) enum MapNonceError { + #[error("Source chain access key not yet on chain")] + SourceKeyNotOnChain, + #[error("Target chain access key not yet on chain")] + TargetKeyNotOnChain, + #[error("Nonce arithmetic overflow: {0} + {1}")] + AddOverflow(Nonce, Nonce), + #[error("Nonce arithmetic overflow: {0} - {1}")] + SubOverflow(Nonce, Nonce), +} + +impl NonceDiff { + fn set_source(&mut self, nonce: Nonce) { + self.source_start = Some(nonce); + self.pending_source_txs.clear(); + } + + fn map(&self, nonce: Nonce) -> Result { + let source_start = self.source_start.ok_or(MapNonceError::SourceKeyNotOnChain)?; + let target_start = self.target_start.ok_or(MapNonceError::TargetKeyNotOnChain)?; + if target_start > source_start { + let diff = target_start - source_start; + nonce.checked_add(diff).ok_or_else(|| MapNonceError::AddOverflow(nonce, diff)) + } else { + let diff = source_start - target_start; + nonce.checked_sub(diff).ok_or_else(|| MapNonceError::SubOverflow(nonce, diff)) + } + } + + fn known(&self) -> bool { + self.source_start.is_some() && self.target_start.is_some() + } +} + +struct TxMirror { + target_stream: mpsc::Receiver, + source_view_client: Addr, + source_client: Addr, + target_view_client: Addr, + target_client: Addr, + db: DB, + target_genesis_height: BlockHeight, + target_min_block_production_delay: Duration, + tracked_shards: Vec, + secret: Option<[u8; crate::secret::SECRET_LEN]>, + next_source_height: Option, +} + +fn open_db>(home: P, config: &NearConfig) -> anyhow::Result { + let db_path = + near_store::NodeStorage::opener(home.as_ref(), &config.config.store).path().join("mirror"); + let mut options = rocksdb::Options::default(); + options.create_missing_column_families(true); + options.create_if_missing(true); + let cf_descriptors = DBCol::iter() + .map(|col| rocksdb::ColumnFamilyDescriptor::new(col.name(), options.clone())) + .collect::>(); + Ok(DB::open_cf_descriptors(&options, db_path, cf_descriptors)?) +} + +// a transaction that's almost prepared, except that we don't yet know +// what nonce to use because the public key was added in an AddKey +// action that we haven't seen on chain yet. The tx field is complete +// except for the nonce field. +#[derive(Debug)] +struct TxAwaitingNonce { + source_public: PublicKey, + source_signer_id: AccountId, + target_private: SecretKey, + tx: Transaction, +} + +#[derive(Debug)] +enum TargetChainTx { + Ready(SignedTransaction), + AwaitingNonce(TxAwaitingNonce), +} + +impl TargetChainTx { + // For an AwaitingNonce(_), set the nonce and sign the transaction, changing self into Ready(_). + // must not be called if self is Ready(_) + fn set_nonce(&mut self, nonce: Nonce) { + match self { + Self::AwaitingNonce(t) => { + t.tx.nonce = nonce; + let tx = SignedTransaction::new( + t.target_private.sign(&t.tx.get_hash_and_size().0.as_ref()), + t.tx.clone(), + ); + tracing::debug!( + target: "mirror", "prepared a transaction for ({:?}, {:?}) that was previously waiting for the access key to appear on chain", + &tx.transaction.signer_id, &tx.transaction.public_key + ); + *self = Self::Ready(tx); + } + Self::Ready(_) => unreachable!(), + } + } +} + +#[derive(Debug)] +struct MappedChunk { + txs: Vec, + shard_id: ShardId, +} + +#[derive(Debug)] +struct MappedBlock { + source_height: BlockHeight, + chunks: Vec, +} + +async fn account_exists( + view_client: &Addr, + account_id: &AccountId, + prev_block: &CryptoHash, +) -> anyhow::Result { + match view_client + .send( + Query::new( + BlockReference::BlockId(BlockId::Hash(prev_block.clone())), + QueryRequest::ViewAccount { account_id: account_id.clone() }, + ) + .with_span_context(), + ) + .await? + { + Ok(res) => match res.kind { + QueryResponseKind::ViewAccount(_) => Ok(true), + other => { + panic!("Received unexpected QueryResponse after Querying Account: {:?}", other); + } + }, + Err(e) => match &e { + QueryError::UnknownAccount { .. } => Ok(false), + _ => Err(e.into()), + }, + } +} + +async fn fetch_access_key_nonce( + view_client: &Addr, + account_id: &AccountId, + public_key: &PublicKey, + block_hash: Option<&CryptoHash>, +) -> anyhow::Result> { + let block_ref = match block_hash { + Some(h) => BlockReference::BlockId(BlockId::Hash(h.clone())), + None => BlockReference::Finality(Finality::None), + }; + match view_client + .send( + Query::new( + block_ref, + QueryRequest::ViewAccessKey { + account_id: account_id.clone(), + public_key: public_key.clone(), + }, + ) + .with_span_context(), + ) + .await? + { + Ok(res) => match res.kind { + QueryResponseKind::AccessKey(access_key) => Ok(Some(access_key.nonce)), + other => { + panic!("Received unexpected QueryResponse after Querying Access Key: {:?}", other); + } + }, + Err(_) => Ok(None), + } +} + +#[derive(Clone, Debug)] +enum TxOutcome { + Success(CryptoHash), + Pending, + Failure, +} + +async fn fetch_tx_outcome( + view_client: &Addr, + transaction_hash: CryptoHash, + signer_id: &AccountId, + receiver_id: &AccountId, +) -> anyhow::Result { + let receipt_id = match view_client + .send( + GetExecutionOutcome { + id: TransactionOrReceiptId::Transaction { + transaction_hash, + sender_id: signer_id.clone(), + }, + } + .with_span_context(), + ) + .await + .unwrap() + { + Ok(GetExecutionOutcomeResponse { outcome_proof, .. }) => { + match outcome_proof.outcome.status { + ExecutionStatusView::SuccessReceiptId(id) => id, + ExecutionStatusView::SuccessValue(_) => unreachable!(), + ExecutionStatusView::Failure(_) | ExecutionStatusView::Unknown => { + return Ok(TxOutcome::Failure) + } + } + } + Err( + GetExecutionOutcomeError::NotConfirmed { .. } + | GetExecutionOutcomeError::UnknownBlock { .. }, + ) => return Ok(TxOutcome::Pending), + Err(e) => { + return Err(e) + .with_context(|| format!("failed fetching outcome for tx {}", transaction_hash)) + } + }; + match view_client + .send( + GetExecutionOutcome { + id: TransactionOrReceiptId::Receipt { + receipt_id, + receiver_id: receiver_id.clone(), + }, + } + .with_span_context(), + ) + .await + .unwrap() + { + Ok(GetExecutionOutcomeResponse { outcome_proof, .. }) => { + match outcome_proof.outcome.status { + ExecutionStatusView::SuccessReceiptId(_) | ExecutionStatusView::SuccessValue(_) => { + // the view client code actually modifies the outcome's block_hash field to be the + // next block with a new chunk in the relevant shard, so go backwards one block, + // since that's what we'll want to give in the query for AccessKeys + let block = view_client + .send( + GetBlock(BlockReference::BlockId(BlockId::Hash( + outcome_proof.block_hash, + ))) + .with_span_context(), + ) + .await + .unwrap() + .with_context(|| { + format!("failed fetching block {}", &outcome_proof.block_hash) + })?; + Ok(TxOutcome::Success(block.header.prev_hash)) + } + ExecutionStatusView::Failure(_) | ExecutionStatusView::Unknown => { + Ok(TxOutcome::Failure) + } + } + } + Err( + GetExecutionOutcomeError::NotConfirmed { .. } + | GetExecutionOutcomeError::UnknownBlock { .. } + | GetExecutionOutcomeError::UnknownTransactionOrReceipt { .. }, + ) => Ok(TxOutcome::Pending), + Err(e) => { + Err(e).with_context(|| format!("failed fetching outcome for receipt {}", &receipt_id)) + } + } +} + +async fn block_hash_to_height( + view_client: &Addr, + hash: &CryptoHash, +) -> anyhow::Result { + Ok(view_client + .send(GetBlock(BlockReference::BlockId(BlockId::Hash(hash.clone()))).with_span_context()) + .await + .unwrap()? + .header + .height) +} + +impl TxMirror { + fn new>( + source_home: P, + target_home: P, + secret: Option<[u8; crate::secret::SECRET_LEN]>, + ) -> anyhow::Result { + let target_config = + nearcore::config::load_config(target_home.as_ref(), GenesisValidationMode::UnsafeFast) + .with_context(|| { + format!("Error loading target config from {:?}", target_home.as_ref()) + })?; + let db = + open_db(target_home.as_ref(), &target_config).context("failed to open mirror DB")?; + let source_config = + nearcore::config::load_config(source_home.as_ref(), GenesisValidationMode::UnsafeFast) + .with_context(|| { + format!("Error loading source config from {:?}", source_home.as_ref()) + })?; + + let source_node = nearcore::start_with_config(source_home.as_ref(), source_config.clone()) + .context("failed to start source chain NEAR node")?; + + let target_indexer = Indexer::new(near_indexer::IndexerConfig { + home_dir: target_home.as_ref().to_path_buf(), + sync_mode: near_indexer::SyncModeEnum::LatestSynced, + await_for_node_synced: near_indexer::AwaitForNodeSyncedEnum::WaitForFullSync, + }) + .context("failed to start target chain indexer")?; + let (target_view_client, target_client) = target_indexer.client_actors(); + let target_stream = target_indexer.streamer(); + + Ok(Self { + source_view_client: source_node.view_client, + source_client: source_node.client, + target_client, + target_view_client, + target_stream, + db, + target_genesis_height: target_config.genesis.config.genesis_height, + target_min_block_production_delay: target_config + .client_config + .min_block_production_delay, + tracked_shards: target_config.config.tracked_shards.clone(), + secret, + next_source_height: None, + }) + } + + fn get_next_source_height(&mut self) -> anyhow::Result { + if let Some(height) = self.next_source_height { + return Ok(height); + } + let height = + self.db.get_cf(self.db.cf_handle(DBCol::Misc.name()).unwrap(), "next_source_height")?; + match height { + Some(h) => { + let height = BlockHeight::try_from_slice(&h).unwrap(); + self.next_source_height = Some(height); + Ok(height) + } + None => Ok(self.target_genesis_height), + } + } + + async fn send_transactions( + &mut self, + block: &MappedBlock, + ) -> anyhow::Result> { + let mut sent = vec![]; + for chunk in block.chunks.iter() { + for tx in chunk.txs.iter() { + match tx { + TargetChainTx::Ready(tx) => { + match self + .target_client + .send( + NetworkClientMessages::Transaction { + transaction: tx.clone(), + is_forwarded: false, + check_only: false, + } + .with_span_context(), + ) + .await? + { + NetworkClientResponses::RequestRouted => { + crate::metrics::TRANSACTIONS_SENT.with_label_values(&["ok"]).inc(); + sent.push(tx.clone()); + } + NetworkClientResponses::InvalidTx(e) => { + // TODO: here if we're getting an error because the tx was already included, it is possible + // that some other instance of this code ran and made progress already. For now we can assume + // only once instance of this code will run, but this is the place to detect if that's not the case. + tracing::error!( + target: "mirror", "Tried to send an invalid tx from source #{} shard {}: {:?}", + block.source_height, chunk.shard_id, e + ); + crate::metrics::TRANSACTIONS_SENT + .with_label_values(&["invalid"]) + .inc(); + } + r => { + tracing::error!( + target: "mirror", "Unexpected response sending tx from source #{} shard {}: {:?}. The transaction was not sent", + block.source_height, chunk.shard_id, r + ); + crate::metrics::TRANSACTIONS_SENT + .with_label_values(&["internal_error"]) + .inc(); + } + } + } + TargetChainTx::AwaitingNonce(tx) => { + // TODO: here we should just save this transaction for later and send it when it's known + tracing::warn!(target: "mirror", "skipped sending transaction with signer {} because valid target chain nonce not known", &tx.source_signer_id) + } + } + } + } + Ok(sent) + } + + fn read_nonce_diff( + &self, + account_id: &AccountId, + public_key: &PublicKey, + ) -> anyhow::Result> { + let db_key = nonce_col_key(account_id, public_key); + // TODO: cache this? + Ok(self + .db + .get_cf(self.db.cf_handle(DBCol::Nonces.name()).unwrap(), &db_key)? + .map(|v| NonceDiff::try_from_slice(&v).unwrap())) + } + + fn put_nonce_diff( + &self, + account_id: &AccountId, + public_key: &PublicKey, + diff: &NonceDiff, + ) -> anyhow::Result<()> { + tracing::debug!(target: "mirror", "storing {:?} in DB for ({:?}, {:?})", &diff, account_id, public_key); + let db_key = nonce_col_key(account_id, public_key); + self.db.put_cf( + self.db.cf_handle(DBCol::Nonces.name()).unwrap(), + &db_key, + &diff.try_to_vec().unwrap(), + )?; + Ok(()) + } + + // If the access key was present in the genesis records, just + // return the same nonce. Otherwise, we need to change the + // nonce. So check if we already know what the difference in + // nonces is, and if not, try to fetch that info and store it. + // `source_signer_id` and `target_signer_id` are the same unless + // it's an implicit account + async fn map_nonce( + &self, + source_signer_id: &AccountId, + target_signer_id: &AccountId, + source_public: &PublicKey, + target_public: &PublicKey, + nonce: Nonce, + ) -> anyhow::Result> { + let mut diff = match self.read_nonce_diff(source_signer_id, source_public)? { + Some(m) => m, + // If it's not stored in the database, it's an access key that was present in the genesis + // records, so we don't need to do anything to the nonce. + None => return Ok(Ok(nonce)), + }; + if diff.known() { + return Ok(diff.map(nonce)); + } + + self.update_nonces( + source_signer_id, + target_signer_id, + source_public, + target_public, + &mut diff, + ) + .await?; + Ok(diff.map(nonce)) + } + + async fn update_nonces( + &self, + source_signer_id: &AccountId, + target_signer_id: &AccountId, + source_public: &PublicKey, + target_public: &PublicKey, + diff: &mut NonceDiff, + ) -> anyhow::Result<()> { + let mut rewrite = false; + if diff.source_start.is_none() { + self.update_source_nonce(source_signer_id, source_public, diff).await?; + rewrite |= diff.source_start.is_some(); + } + if diff.target_start.is_none() { + diff.target_start = fetch_access_key_nonce( + &self.target_view_client, + target_signer_id, + target_public, + None, + ) + .await?; + rewrite |= diff.target_start.is_some(); + } + + if rewrite { + self.put_nonce_diff(source_signer_id, source_public, diff)?; + } + Ok(()) + } + + async fn update_source_nonce( + &self, + account_id: &AccountId, + public_key: &PublicKey, + diff: &mut NonceDiff, + ) -> anyhow::Result<()> { + let mut block_height = 0; + let mut block_hash = CryptoHash::default(); + let mut failed_txs = Vec::new(); + + // first find the earliest block hash where the access key should exist + for tx in diff.pending_source_txs.iter() { + match fetch_tx_outcome( + &self.source_view_client, + tx.tx_hash.clone(), + &tx.signer_id, + &tx.receiver_id, + ) + .await? + { + TxOutcome::Success(hash) => { + let height = + block_hash_to_height(&self.source_view_client, &hash).await.with_context( + || format!("failed fetching block height of block {}", &hash), + )?; + if &block_hash == &CryptoHash::default() || block_height > height { + block_height = height; + block_hash = hash; + } + } + TxOutcome::Failure => { + failed_txs.push(tx.clone()); + } + TxOutcome::Pending => {} + } + } + if &block_hash == &CryptoHash::default() { + // no need to do this if block_hash is set because set_source() below will clear it + for tx in failed_txs.iter() { + diff.pending_source_txs.remove(tx); + } + return Ok(()); + } + let nonce = fetch_access_key_nonce( + &self.source_view_client, + account_id, + public_key, + Some(&block_hash), + ) + .await? + .ok_or_else(|| { + anyhow::anyhow!( + "expected access key to exist for {}, {} after finding successful receipt in {}", + &account_id, + &public_key, + &block_hash + ) + })?; + diff.set_source(nonce); + Ok(()) + } + + // we have a situation where nonces need to be mapped (AddKey actions + // or implicit account transfers). So store the initial nonce data in the DB. + async fn store_source_nonce( + &self, + tx: &SignedTransactionView, + public_key: &PublicKey, + ) -> anyhow::Result<()> { + // TODO: probably better to use a merge operator here. Not urgent, though. + let mut diff = self.read_nonce_diff(&tx.receiver_id, &public_key)?.unwrap_or_default(); + if diff.source_start.is_some() { + return Ok(()); + } + diff.pending_source_txs.insert(TxIds { + tx_hash: tx.hash.clone(), + signer_id: tx.signer_id.clone(), + receiver_id: tx.receiver_id.clone(), + }); + self.update_source_nonce(&tx.receiver_id, &public_key, &mut diff).await?; + self.put_nonce_diff(&tx.receiver_id, &public_key, &diff) + } + + async fn map_actions( + &self, + tx: &SignedTransactionView, + prev_block: &CryptoHash, + ) -> anyhow::Result> { + let mut actions = Vec::new(); + + for a in tx.actions.iter() { + // this try_from() won't fail since the ActionView was constructed from the Action + let action = Action::try_from(a.clone()).unwrap(); + + match &action { + Action::AddKey(add_key) => { + self.store_source_nonce(tx, &add_key.public_key).await?; + + let replacement = + crate::key_mapping::map_key(&add_key.public_key, self.secret.as_ref()); + + actions.push(Action::AddKey(AddKeyAction { + public_key: replacement.public_key(), + access_key: add_key.access_key.clone(), + })); + } + Action::DeleteKey(delete_key) => { + let replacement = + crate::key_mapping::map_key(&delete_key.public_key, self.secret.as_ref()); + let public_key = replacement.public_key(); + + actions.push(Action::DeleteKey(DeleteKeyAction { public_key })); + } + Action::Transfer(_) => { + if tx.receiver_id.is_implicit() + && !account_exists(&self.source_view_client, &tx.receiver_id, prev_block) + .await + .with_context(|| { + format!("failed checking existence for account {}", &tx.receiver_id) + })? + { + let public_key = crate::key_mapping::implicit_account_key(&tx.receiver_id); + self.store_source_nonce(tx, &public_key).await?; + } + actions.push(action); + } + // We don't want to mess with the set of validators in the target chain + Action::Stake(_) => {} + _ => actions.push(action), + }; + } + Ok(actions) + } + + // fetch the source chain block at `source_height`, and prepare a + // set of transactions that should be valid in the target chain + // from it. + async fn fetch_txs( + &self, + source_height: BlockHeight, + ref_hash: CryptoHash, + ) -> anyhow::Result> { + let prev_hash = match self + .source_view_client + .send( + GetBlock(BlockReference::BlockId(BlockId::Height(source_height))) + .with_span_context(), + ) + .await + .unwrap() + { + Ok(b) => b.header.prev_hash, + Err(GetBlockError::UnknownBlock { .. }) => return Ok(None), + Err(e) => return Err(e.into()), + }; + let mut chunks = Vec::new(); + for shard_id in self.tracked_shards.iter() { + let mut txs = Vec::new(); + + let chunk = match self + .source_view_client + .send(GetChunk::Height(source_height, *shard_id).with_span_context()) + .await? + { + Ok(c) => c, + Err(e) => match e { + GetChunkError::UnknownBlock { .. } => return Ok(None), + GetChunkError::UnknownChunk { .. } => { + tracing::error!( + "Can't fetch source chain shard {} chunk at height {}. Are we tracking all shards?", + shard_id, source_height + ); + continue; + } + _ => return Err(e.into()), + }, + }; + if chunk.header.height_included != source_height { + continue; + } + + let mut num_not_ready = 0; + for t in chunk.transactions { + let actions = self.map_actions(&t, &prev_hash).await?; + if actions.is_empty() { + // If this is a tx containing only stake actions, skip it. + continue; + } + let mapped_key = crate::key_mapping::map_key(&t.public_key, self.secret.as_ref()); + let public_key = mapped_key.public_key(); + + let target_signer_id = + crate::key_mapping::map_account(&t.signer_id, self.secret.as_ref()); + match self + .map_nonce(&t.signer_id, &target_signer_id, &t.public_key, &public_key, t.nonce) + .await? + { + Ok(nonce) => { + let mut tx = Transaction::new( + target_signer_id, + public_key, + crate::key_mapping::map_account(&t.receiver_id, self.secret.as_ref()), + nonce, + ref_hash.clone(), + ); + tx.actions = actions; + let tx = SignedTransaction::new( + mapped_key.sign(&tx.get_hash_and_size().0.as_ref()), + tx, + ); + txs.push(TargetChainTx::Ready(tx)); + } + Err(e) => match e { + MapNonceError::AddOverflow(..) + | MapNonceError::SubOverflow(..) + | MapNonceError::SourceKeyNotOnChain => { + tracing::error!(target: "mirror", "error mapping nonce for ({:?}, {:?}): {:?}", &t.signer_id, &public_key, e); + continue; + } + MapNonceError::TargetKeyNotOnChain => { + let mut tx = Transaction::new( + crate::key_mapping::map_account(&t.signer_id, self.secret.as_ref()), + public_key, + crate::key_mapping::map_account( + &t.receiver_id, + self.secret.as_ref(), + ), + t.nonce, + ref_hash.clone(), + ); + tx.actions = actions; + txs.push(TargetChainTx::AwaitingNonce(TxAwaitingNonce { + tx, + source_public: t.public_key.clone(), + source_signer_id: t.signer_id.clone(), + target_private: mapped_key, + })); + num_not_ready += 1; + } + }, + }; + } + if num_not_ready == 0 { + tracing::debug!( + target: "mirror", "prepared {} transacations for source chain #{} shard {}", + txs.len(), source_height, shard_id + ); + } else { + tracing::debug!( + target: "mirror", "prepared {} transacations for source chain #{} shard {} {} of which are \ + still waiting for the corresponding access keys to make it on chain", + txs.len(), source_height, shard_id, num_not_ready, + ); + } + chunks.push(MappedChunk { txs, shard_id: *shard_id }); + } + Ok(Some(MappedBlock { source_height, chunks })) + } + + // Up to a certain capacity, prepare and queue up batches of + // transactions that we want to send to the target chain. + async fn queue_txs( + &mut self, + tracker: &mut crate::chain_tracker::TxTracker, + ref_hash: CryptoHash, + check_send_time: bool, + ) -> anyhow::Result<()> { + if tracker.num_blocks_queued() > 100 { + return Ok(()); + } + + let next_batch_time = tracker.next_batch_time(); + let source_head = + self.get_source_height().await.context("can't fetch source chain HEAD")?; + let start_height = match tracker.height_queued() { + Some(h) => h + 1, + None => self.get_next_source_height()?, + }; + + for height in start_height..=source_head { + if let Some(b) = self + .fetch_txs(height, ref_hash) + .await + .with_context(|| format!("Can't fetch source #{} transactions", height))? + { + tracker.queue_block(b); + if tracker.num_blocks_queued() > 100 { + return Ok(()); + } + }; + + if check_send_time + && tracker.num_blocks_queued() > 0 + && Instant::now() > next_batch_time - Duration::from_millis(20) + { + return Ok(()); + } + } + Ok(()) + } + + fn set_next_source_height(&mut self, height: BlockHeight) -> anyhow::Result<()> { + self.next_source_height = Some(height); + // TODO: we should instead save something like the + // (block_height, shard_id, idx_in_chunk) of the last + // transaction sent. Currently we set next_source_height after + // sending all of the transactions in that chunk, so if we get + // SIGTERM or something in the middle of sending a batch of + // txs, we'll send some that we already sent next time we + // start. Not a giant problem but kind of unclean. + self.db.put_cf( + self.db.cf_handle(DBCol::Misc.name()).unwrap(), + "next_source_height", + height.try_to_vec().unwrap(), + )?; + Ok(()) + } + + // Go through any upcoming batches of transactions that we haven't + // been able to set a valid nonce for yet, and see if we can now + // do that. + async fn set_nonces( + &self, + tracker: &mut crate::chain_tracker::TxTracker, + ) -> anyhow::Result<()> { + let next_batch_time = tracker.next_batch_time(); + let mut txs_ready = Vec::new(); + let mut keys_mapped = HashSet::new(); + + for (source_signer_id, source_public_key) in tracker.pending_access_keys_iter() { + let mut diff = self.read_nonce_diff(source_signer_id, source_public_key)?.unwrap(); + let target_signer_id = + crate::key_mapping::map_account(source_signer_id, self.secret.as_ref()); + let target_public_key = + crate::key_mapping::map_key(source_public_key, self.secret.as_ref()).public_key(); + self.update_nonces( + &source_signer_id, + &target_signer_id, + &source_public_key, + &target_public_key, + &mut diff, + ) + .await?; + if diff.known() { + keys_mapped.insert((source_signer_id.clone(), source_public_key.clone())); + } + } + for (tx_ref, tx) in tracker.tx_awaiting_nonce_iter() { + if keys_mapped.contains(&(tx.source_signer_id.clone(), tx.source_public.clone())) { + let nonce = self + .map_nonce( + &tx.source_signer_id, + &tx.tx.signer_id, + &tx.source_public, + &tx.tx.public_key, + tx.tx.nonce, + ) + .await? + .unwrap(); + txs_ready.push((tx_ref.clone(), nonce)); + } + + if Instant::now() > next_batch_time - Duration::from_millis(20) { + break; + } + } + for (tx_ref, nonce) in txs_ready { + tracker.set_tx_nonce(&tx_ref, nonce); + } + Ok(()) + } + + async fn main_loop( + &mut self, + mut tracker: crate::chain_tracker::TxTracker, + mut target_height: BlockHeight, + mut target_head: CryptoHash, + ) -> anyhow::Result<()> { + loop { + tokio::select! { + // time to send a batch of transactions + mapped_block = tracker.next_batch(), if tracker.num_blocks_queued() > 0 => { + let mapped_block = mapped_block.unwrap(); + let sent = self.send_transactions(&mapped_block).await?; + tracker.on_txs_sent(&sent, mapped_block.source_height, target_height); + + // now we have one second left until we need to send more transactions. In the + // meantime, we might as well prepare some more batches of transactions. + // TODO: continue in best effort fashion on error + self.set_next_source_height(mapped_block.source_height+1)?; + self.queue_txs(&mut tracker, target_head, true).await?; + } + msg = self.target_stream.recv() => { + let msg = msg.unwrap(); + tracker.on_target_block(&msg); + self.set_nonces(&mut tracker).await?; + target_head = msg.block.header.hash; + target_height = msg.block.header.height; + } + // If we don't have any upcoming sets of transactions to send already built, we probably fell behind in the source + // chain and can't fetch the transactions. Check if we have them now here. + _ = tokio::time::sleep(Duration::from_millis(200)), if tracker.num_blocks_queued() == 0 => { + self.queue_txs(&mut tracker, target_head, true).await?; + } + }; + } + } + + async fn get_source_height(&self) -> Option { + self.source_client + .send( + near_client::Status { is_health_check: false, detailed: false }.with_span_context(), + ) + .await + .unwrap() + .ok() + .map(|s| s.sync_info.latest_block_height) + } + + // wait until HEAD moves. We don't really need it to be fully synced. + async fn wait_source_ready(&self) { + let mut first_height = None; + loop { + if let Some(head) = self.get_source_height().await { + match first_height { + Some(h) => { + if h != head { + return; + } + } + None => { + first_height = Some(head); + } + } + } + + tokio::time::sleep(Duration::from_millis(500)).await; + } + } + + async fn wait_target_synced(&mut self) -> (BlockHeight, CryptoHash) { + let msg = self.target_stream.recv().await.unwrap(); + (msg.block.header.height, msg.block.header.hash) + } + + async fn run(mut self) -> anyhow::Result<()> { + let mut tracker = + crate::chain_tracker::TxTracker::new(self.target_min_block_production_delay); + self.wait_source_ready().await; + let (target_height, target_head) = self.wait_target_synced().await; + + self.queue_txs(&mut tracker, target_head, false).await?; + + self.main_loop(tracker, target_height, target_head).await + } +} + +async fn run>( + source_home: P, + target_home: P, + secret: Option<[u8; crate::secret::SECRET_LEN]>, +) -> anyhow::Result<()> { + let m = TxMirror::new(source_home, target_home, secret)?; + m.run().await +} diff --git a/tools/mirror/src/metrics.rs b/tools/mirror/src/metrics.rs new file mode 100644 index 00000000000..c26c563cfaa --- /dev/null +++ b/tools/mirror/src/metrics.rs @@ -0,0 +1,21 @@ +use near_o11y::metrics::{ + try_create_int_counter, try_create_int_counter_vec, IntCounter, IntCounterVec, +}; +use once_cell::sync::Lazy; + +pub static TRANSACTIONS_SENT: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_mirror_transactions_sent", + "Total number of transactions sent", + &["status"], + ) + .unwrap() +}); + +pub static TRANSACTIONS_INCLUDED: Lazy = Lazy::new(|| { + try_create_int_counter( + "near_mirror_transactions_included", + "Total number of transactions sent that made it on-chain", + ) + .unwrap() +}); diff --git a/tools/mirror/src/secret.rs b/tools/mirror/src/secret.rs new file mode 100644 index 00000000000..6107a831aa8 --- /dev/null +++ b/tools/mirror/src/secret.rs @@ -0,0 +1,85 @@ +use rand_core::{OsRng, RngCore}; +use serde::{Deserialize, Serialize}; +use std::fs::File; +use std::io::Write; +use std::path::Path; +use std::str::FromStr; + +pub const SECRET_LEN: usize = 64; +struct KeyMapSecret([u8; SECRET_LEN]); + +#[derive(Serialize, Deserialize)] +struct MirrorSecretConfig { + pub key_map_secret: Option, +} + +impl serde::Serialize for KeyMapSecret { + fn serialize( + &self, + serializer: S, + ) -> Result<::Ok, ::Error> + where + S: serde::Serializer, + { + let data = bs58::encode(&self.0[..]).into_string(); + serializer.serialize_str(&data) + } +} + +impl<'de> serde::Deserialize<'de> for KeyMapSecret { + fn deserialize(deserializer: D) -> Result>::Error> + where + D: serde::Deserializer<'de>, + { + let s = ::deserialize(deserializer)?; + Self::from_str(&s).map_err(|err| serde::de::Error::custom(format!("{:?}", err))) + } +} + +#[derive(thiserror::Error, Debug)] +pub(crate) enum ParseSecretError { + #[error("Base58 decode failure: `{1}`")] + BS58(#[source] bs58::decode::Error, String), + #[error("invalid decoded length (expected: 64, got: {0}: input: `{1}`)")] + BadLength(usize, String), +} + +impl FromStr for KeyMapSecret { + type Err = ParseSecretError; + + fn from_str(s: &str) -> Result { + let mut array = [0; SECRET_LEN]; + let length = bs58::decode(s) + .into(&mut array[..]) + .map_err(|err| Self::Err::BS58(err, s.to_owned()))?; + if length != SECRET_LEN { + return Err(Self::Err::BadLength(length, s.to_owned())); + } + Ok(Self(array)) + } +} + +pub(crate) fn generate>(secret_file_out: P) -> anyhow::Result<[u8; SECRET_LEN]> { + let mut secret = [0; SECRET_LEN]; + let mut out = File::create(secret_file_out)?; + + OsRng.fill_bytes(&mut secret); + let config = MirrorSecretConfig { key_map_secret: Some(KeyMapSecret(secret)) }; + let str = serde_json::to_string_pretty(&config)?; + out.write_all(str.as_bytes())?; + Ok(secret) +} + +pub(crate) fn write_empty>(secret_file_out: P) -> anyhow::Result<()> { + let mut out = File::create(secret_file_out)?; + let config = MirrorSecretConfig { key_map_secret: None }; + let str = serde_json::to_string_pretty(&config)?; + out.write_all(str.as_bytes())?; + Ok(()) +} + +pub fn load>(secret_file: P) -> anyhow::Result> { + let s = std::fs::read_to_string(secret_file)?; + let config: MirrorSecretConfig = serde_json::from_str(&s)?; + Ok(config.key_map_secret.map(|s| s.0)) +}