From bd45c11c5f2e78234f0e35b36128a11bec1f28e7 Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Fri, 21 Oct 2022 11:42:06 -0400 Subject: [PATCH] feat: add a transaction mirror binary (#7183) This adds code that mirrors traffic from a source chain (e.g. mainnet or testnet) to a test chain with genesis state forked from the source chain. The goal is to produce traffic that looks like source chain traffic. So in a mocknet test where we fork mainnet state for example, we can then actually observe what happens when we subsequently get traffic equivalent to mainnet traffic after the fork point. For more info, see the README in this commit. --- Cargo.lock | 58 ++ Cargo.toml | 3 + core/chain-configs/src/genesis_config.rs | 2 +- core/chain-configs/src/lib.rs | 4 +- neard/Cargo.toml | 1 + neard/src/cli.rs | 10 + pytest/lib/key.py | 7 + pytest/tools/mirror/test.py | 470 ++++++++++ tools/mirror/Cargo.toml | 43 + tools/mirror/README.md | 59 ++ tools/mirror/src/chain_tracker.rs | 430 +++++++++ tools/mirror/src/cli.rs | 133 +++ tools/mirror/src/genesis.rs | 83 ++ tools/mirror/src/key_mapping.rs | 111 +++ tools/mirror/src/lib.rs | 1046 ++++++++++++++++++++++ tools/mirror/src/metrics.rs | 21 + tools/mirror/src/secret.rs | 85 ++ 17 files changed, 2563 insertions(+), 3 deletions(-) create mode 100755 pytest/tools/mirror/test.py create mode 100644 tools/mirror/Cargo.toml create mode 100644 tools/mirror/README.md create mode 100644 tools/mirror/src/chain_tracker.rs create mode 100644 tools/mirror/src/cli.rs create mode 100644 tools/mirror/src/genesis.rs create mode 100644 tools/mirror/src/key_mapping.rs create mode 100644 tools/mirror/src/lib.rs create mode 100644 tools/mirror/src/metrics.rs create mode 100644 tools/mirror/src/secret.rs diff --git a/Cargo.lock b/Cargo.lock index 568e10b6629..88db047e438 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1477,6 +1477,7 @@ checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" dependencies = [ "block-buffer 0.10.2", "crypto-common", + "subtle", ] [[package]] @@ -2107,6 +2108,24 @@ dependencies = [ "proc-macro-hack", ] +[[package]] +name = "hkdf" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "791a029f6b9fc27657f6f188ec6e5e43f6911f6f878e0dc5501396e09809d437" +dependencies = [ + "hmac", +] + +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest 0.10.3", +] + [[package]] name = "http" version = "0.2.7" @@ -3157,6 +3176,44 @@ dependencies = [ "serde_json", ] +[[package]] +name = "near-mirror" +version = "0.0.0" +dependencies = [ + "actix", + "anyhow", + "borsh", + "bs58", + "clap 3.1.18", + "ed25519-dalek", + "hex", + "hkdf", + "near-chain-configs", + "near-client", + "near-client-primitives", + "near-crypto", + "near-indexer", + "near-indexer-primitives", + "near-network", + "near-o11y", + "near-primitives", + "near-primitives-core", + "near-store", + "nearcore", + "once_cell", + "openssl-probe", + "rand_core 0.5.1", + "rocksdb", + "secp256k1", + "serde", + "serde_json", + "sha2 0.10.2", + "strum", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "near-network" version = "0.0.0" @@ -3615,6 +3672,7 @@ dependencies = [ "futures", "near-chain-configs", "near-jsonrpc-primitives", + "near-mirror", "near-o11y", "near-performance-metrics", "near-primitives", diff --git a/Cargo.toml b/Cargo.toml index 3f6c0ae6df9..60a4ac317f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "tools/chainsync-loadtest", "tools/delay-detector", "tools/indexer/example", + "tools/mirror", "tools/mock-node", "tools/restaked", "tools/rpctypegen/core", @@ -111,6 +112,7 @@ fs2 = "0.4" futures = "0.3.5" futures-util = "0.3" hex = { version = "0.4.2", features = ["serde"] } +hkdf = "0.12.3" hyper = { version = "0.14", features = ["full"] } hyper-tls = "0.5.0" im = "15" @@ -148,6 +150,7 @@ protobuf-codegen = "3.0.1" quote = "1.0" rand = "0.8.5" rand_chacha = "0.3.1" +rand_core = "0.5" rand_hc = "0.3.1" rand_xorshift = "0.3" rayon = "1.5" diff --git a/core/chain-configs/src/genesis_config.rs b/core/chain-configs/src/genesis_config.rs index 41e86418aa5..3d1f9f8ad72 100644 --- a/core/chain-configs/src/genesis_config.rs +++ b/core/chain-configs/src/genesis_config.rs @@ -393,7 +393,7 @@ impl<'de, F: FnMut(StateRecord)> DeserializeSeed<'de> for RecordsProcessor<&'_ m } } -fn stream_records_from_file( +pub fn stream_records_from_file( reader: impl Read, mut callback: impl FnMut(StateRecord), ) -> serde_json::Result<()> { diff --git a/core/chain-configs/src/lib.rs b/core/chain-configs/src/lib.rs index c698b3bf881..ab2fcb53ca7 100644 --- a/core/chain-configs/src/lib.rs +++ b/core/chain-configs/src/lib.rs @@ -7,6 +7,6 @@ pub use client_config::{ MIN_GC_NUM_EPOCHS_TO_KEEP, TEST_STATE_SYNC_TIMEOUT, }; pub use genesis_config::{ - get_initial_supply, Genesis, GenesisChangeConfig, GenesisConfig, GenesisRecords, - GenesisValidationMode, ProtocolConfig, ProtocolConfigView, + get_initial_supply, stream_records_from_file, Genesis, GenesisChangeConfig, GenesisConfig, + GenesisRecords, GenesisValidationMode, ProtocolConfig, ProtocolConfigView, }; diff --git a/neard/Cargo.toml b/neard/Cargo.toml index 886461cb545..ffd66616724 100644 --- a/neard/Cargo.toml +++ b/neard/Cargo.toml @@ -34,6 +34,7 @@ tracing.workspace = true nearcore = { path = "../nearcore" } near-chain-configs = { path = "../core/chain-configs" } near-jsonrpc-primitives = { path = "../chain/jsonrpc-primitives" } +near-mirror = { path = "../tools/mirror" } near-primitives = { path = "../core/primitives" } near-performance-metrics = { path = "../utils/near-performance-metrics" } near-state-viewer = { path = "../tools/state-viewer", package = "state-viewer" } diff --git a/neard/src/cli.rs b/neard/src/cli.rs index 3e80ac999f6..3b6ff113433 100644 --- a/neard/src/cli.rs +++ b/neard/src/cli.rs @@ -3,6 +3,7 @@ use anyhow::Context; use clap::{Args, Parser}; use near_chain_configs::GenesisValidationMode; use near_jsonrpc_primitives::types::light_client::RpcLightClientExecutionProofResponse; +use near_mirror::MirrorCommand; use near_o11y::tracing_subscriber::EnvFilter; use near_o11y::{ default_subscriber, default_subscriber_with_opentelemetry, BuildEnvFilterError, @@ -93,6 +94,9 @@ impl NeardCmd { NeardSubCommand::VerifyProof(cmd) => { cmd.run(); } + NeardSubCommand::Mirror(cmd) => { + cmd.run()?; + } }; Ok(()) } @@ -104,6 +108,8 @@ pub(crate) enum RunError { EnvFilter(#[source] BuildEnvFilterError), #[error("could not install a rayon thread pool")] RayonInstall(#[source] rayon::ThreadPoolBuildError), + #[error(transparent)] + Other(#[from] anyhow::Error), } #[derive(Parser)] @@ -189,6 +195,10 @@ pub(super) enum NeardSubCommand { /// Verify proofs #[clap(alias = "verify_proof")] VerifyProof(VerifyProofSubCommand), + + /// Mirror transactions from a source chain to a test chain with state forked + /// from it, reproducing traffic and state as closely as possible. + Mirror(MirrorCommand), } #[derive(Parser)] diff --git a/pytest/lib/key.py b/pytest/lib/key.py index 9dbcc0d5698..9d99385de62 100644 --- a/pytest/lib/key.py +++ b/pytest/lib/key.py @@ -17,6 +17,13 @@ def __init__(self, account_id: str, pk: str, sk: str) -> None: self.pk = pk self.sk = sk + @classmethod + def from_random(cls, account_id: str) -> 'Key': + keys = ed25519.create_keypair(entropy=os.urandom) + sk = 'ed25519:' + base58.b58encode(keys[0].to_bytes()).decode('ascii') + pk = 'ed25519:' + base58.b58encode(keys[1].to_bytes()).decode('ascii') + return cls(account_id, pk, sk) + @classmethod def implicit_account(cls) -> 'Key': keys = ed25519.create_keypair(entropy=os.urandom) diff --git a/pytest/tools/mirror/test.py b/pytest/tools/mirror/test.py new file mode 100755 index 00000000000..b51f9186d97 --- /dev/null +++ b/pytest/tools/mirror/test.py @@ -0,0 +1,470 @@ +#!/usr/bin/env python3 + +import sys, time, base58, random +import atexit +import base58 +import json +import os +import pathlib +import shutil +import signal +import subprocess + +sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) + +from cluster import init_cluster, spin_up_node, load_config +from configured_logger import logger +from mocknet import create_genesis_file +import transaction +import utils +import key + +# This sets up an environment to test the tools/mirror process. It starts a localnet with a few validators +# and waits for some blocks to be produced. Then we fork the state and start a new chain from that, and +# start the mirror process that should mirror transactions from the source chain to the target chain. +# Halfway through we restart it to make sure that it still works properly when restarted + +TIMEOUT = 240 +NUM_VALIDATORS = 4 +TARGET_VALIDATORS = ['foo0', 'foo1', 'foo2'] +MIRROR_DIR = 'test-mirror' + + +def mkdir_clean(dirname): + try: + dirname.mkdir() + except FileExistsError: + shutil.rmtree(dirname) + dirname.mkdir() + + +def dot_near(): + return pathlib.Path.home() / '.near' + + +def ordinal_to_port(port, ordinal): + return f'0.0.0.0:{port + 10 + ordinal}' + + +def init_target_dir(neard, home, ordinal, validator_account=None): + mkdir_clean(home) + + try: + subprocess.check_output([neard, '--home', home, 'init'], + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + sys.exit(f'"neard init" command failed: output: {e.stdout}') + shutil.copy(dot_near() / 'test0/config.json', home / 'config.json') + shutil.copy(dot_near() / 'test0/forked/genesis.json', home / 'genesis.json') + shutil.copy(dot_near() / 'test0/forked/records.json', home / 'records.json') + + with open(home / 'config.json', 'r') as f: + config = json.load(f) + config['genesis_records_file'] = 'records.json' + config['network']['addr'] = ordinal_to_port(24567, ordinal) + config['rpc']['addr'] = ordinal_to_port(3030, ordinal) + with open(home / 'config.json', 'w') as f: + json.dump(config, f) + + if validator_account is None: + os.remove(home / 'validator_key.json') + else: + # this key and the suffix -load-test.near are hardcoded in create_genesis_file() + with open(home / 'validator_key.json', 'w') as f: + json.dump( + { + 'account_id': + f'{validator_account + "-load-test.near"}', + 'public_key': + 'ed25519:76NVkDErhbP1LGrSAf5Db6BsFJ6LBw6YVA4BsfTBohmN', + 'secret_key': + 'ed25519:3cCk8KUWBySGCxBcn1syMoY5u73wx5eaPLRbQcMi23LwBA3aLsqEbA33Ww1bsJaFrchmDciGe9otdn45SrDSkow2' + }, f) + + +def init_target_dirs(neard): + ordinal = NUM_VALIDATORS + 1 + dirs = [] + + for account_id in TARGET_VALIDATORS: + home = dot_near() / f'test_target_{account_id}' + dirs.append(str(home)) + init_target_dir(neard, home, ordinal, validator_account=account_id) + ordinal += 1 + + observer = dot_near() / f'{MIRROR_DIR}/target' + init_target_dir(neard, observer, ordinal, validator_account=None) + shutil.copy(dot_near() / 'test0/output/mirror-secret.json', + observer / 'mirror-secret.json') + return dirs, observer + + +def create_forked_chain(config, near_root): + binary_name = config.get('binary_name', 'neard') + neard = os.path.join(near_root, binary_name) + try: + subprocess.check_output([ + neard, "--home", + dot_near() / 'test0', "view-state", "dump-state", "--stream" + ], + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + sys.exit(f'"dump-state" command failed: output: {e.stdout}') + try: + subprocess.check_output([ + neard, + 'mirror', + 'prepare', + '--records-file-in', + dot_near() / 'test0/output/records.json', + '--records-file-out', + dot_near() / 'test0/output/mirror-records.json', + '--secret-file-out', + dot_near() / 'test0/output/mirror-secret.json', + ], + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + sys.exit(f'"mirror prepare" command failed: output: {e.stdout}') + + os.mkdir(dot_near() / 'test0/forked') + genesis_filename_in = dot_near() / 'test0/output/genesis.json' + genesis_filename_out = dot_near() / 'test0/forked/genesis.json' + records_filename_in = dot_near() / 'test0/output/mirror-records.json' + records_filename_out = dot_near() / 'test0/forked/records.json' + create_genesis_file(TARGET_VALIDATORS, + genesis_filename_in=genesis_filename_in, + genesis_filename_out=genesis_filename_out, + records_filename_in=records_filename_in, + records_filename_out=records_filename_out, + rpc_node_names=[], + chain_id='foonet', + append=True, + epoch_length=20, + node_pks=None, + increasing_stakes=0.0, + num_seats=len(TARGET_VALIDATORS)) + return init_target_dirs(neard) + + +def init_mirror_dir(home, source_boot_node): + mkdir_clean(dot_near() / MIRROR_DIR) + os.rename(home, dot_near() / f'{MIRROR_DIR}/source') + ordinal = NUM_VALIDATORS + with open(dot_near() / f'{MIRROR_DIR}/source/config.json', 'r') as f: + config = json.load(f) + config['network']['boot_nodes'] = source_boot_node.addr_with_pk() + config['network']['addr'] = ordinal_to_port(24567, ordinal) + config['rpc']['addr'] = ordinal_to_port(3030, ordinal) + with open(dot_near() / f'{MIRROR_DIR}/source/config.json', 'w') as f: + json.dump(config, f) + + +def mirror_cleanup(process): + process.send_signal(signal.SIGINT) + try: + process.wait(5) + except: + process.kill() + logger.error('can\'t kill mirror process') + + +def start_mirror(near_root, source_home, target_home, boot_node): + env = os.environ.copy() + env["RUST_LOG"] = "actix_web=warn,mio=warn,tokio_util=warn,actix_server=warn,actix_http=warn," + env.get( + "RUST_LOG", "debug") + with open(dot_near() / f'{MIRROR_DIR}/stdout', 'ab') as stdout, \ + open(dot_near() / f'{MIRROR_DIR}/stderr', 'ab') as stderr: + process = subprocess.Popen([ + os.path.join(near_root, 'neard'), 'mirror', 'run', "--source-home", + source_home, "--target-home", target_home, '--secret-file', + target_home / 'mirror-secret.json' + ], + stdin=subprocess.DEVNULL, + stdout=stdout, + stderr=stderr, + env=env) + logger.info("Started mirror process") + atexit.register(mirror_cleanup, process) + with open(target_home / 'config.json', 'r') as f: + config = json.load(f) + config['network']['boot_nodes'] = boot_node.addr_with_pk() + with open(target_home / 'config.json', 'w') as f: + json.dump(config, f) + return process + + +# we'll test out adding an access key and then sending txs signed with it +# since that hits some codepaths we want to test +def send_add_access_key(node, creator_key, nonce, block_hash): + k = key.Key.from_random('test0') + action = transaction.create_full_access_key_action(k.decoded_pk()) + tx = transaction.sign_and_serialize_transaction('test0', nonce, [action], + block_hash, 'test0', + creator_key.decoded_pk(), + creator_key.decoded_sk()) + node.send_tx(tx) + return k + + +def create_subaccount(node, signer_key, nonce, block_hash): + k = key.Key.from_random('foo.' + signer_key.account_id) + actions = [] + actions.append(transaction.create_create_account_action()) + actions.append(transaction.create_full_access_key_action(k.decoded_pk())) + actions.append(transaction.create_payment_action(10**24)) + # add an extra one just to exercise some more corner cases + actions.append( + transaction.create_full_access_key_action( + key.Key.from_random(k.account_id).decoded_pk())) + + tx = transaction.sign_and_serialize_transaction(k.account_id, nonce, + actions, block_hash, + signer_key.account_id, + signer_key.decoded_pk(), + signer_key.decoded_sk()) + node.send_tx(tx) + return k + + +# a key that we added with an AddKey tx or implicit account transfer. +# just for nonce handling convenience +class AddedKey: + + def __init__(self, key): + self.nonce = None + self.key = key + + def send_if_inited(self, node, transfers, block_hash): + if self.nonce is None: + self.nonce = node.get_nonce_for_pk(self.key.account_id, self.key.pk) + + if self.nonce is not None: + for (receiver_id, amount) in transfers: + self.nonce += 1 + tx = transaction.sign_payment_tx(self.key, receiver_id, amount, + self.nonce, block_hash) + node.send_tx(tx) + + +class ImplicitAccount: + + def __init__(self): + self.key = AddedKey(key.Key.implicit_account()) + + def account_id(self): + return self.key.key.account_id + + def transfer(self, node, sender_key, amount, block_hash, nonce): + tx = transaction.sign_payment_tx(sender_key, self.account_id(), amount, + nonce, block_hash) + node.send_tx(tx) + logger.info( + f'sent {amount} to initialize implicit account {self.account_id()}') + + def send_if_inited(self, node, transfers, block_hash): + self.key.send_if_inited(node, transfers, block_hash) + + +def count_total_txs(node, min_height=0): + total = 0 + h = node.get_latest_block().hash + while True: + block = node.get_block(h)['result'] + height = int(block['header']['height']) + if height < min_height: + return total + + for c in block['chunks']: + if int(c['height_included']) == height: + chunk = node.get_chunk(c['chunk_hash'])['result'] + total += len(chunk['transactions']) + + h = block['header']['prev_hash'] + if h == '11111111111111111111111111111111': + return total + + +def check_num_txs(source_node, target_node, start_time, end_source_height): + with open(os.path.join(target_node.node_dir, 'genesis.json'), 'r') as f: + genesis_height = json.load(f)['genesis_height'] + with open(os.path.join(target_node.node_dir, 'config.json'), 'r') as f: + delay = json.load(f)['consensus']['min_block_production_delay'] + block_delay = 10**9 * int(delay['secs']) + int(delay['nanos']) + block_delay = block_delay / 10**9 + + total_source_txs = count_total_txs(source_node, min_height=genesis_height) + + # start_time is the time the mirror binary was started. Give it 20 seconds to + # sync and then 50% more than min_block_production_delay for each block between + # the start and end points of the source chain. Not ideal to be basing a test on time + # like this but there's no real strong guarantee on when the transactions should + # make it on chain, so this is some kind of reasonable timeout + + total_time_allowed = 20 + (end_source_height - + genesis_height) * block_delay * 1.5 + time_elapsed = time.time() - start_time + if time_elapsed < total_time_allowed: + time_left = total_time_allowed - time_elapsed + logger.info( + f'waiting for {int(time_left)} seconds to allow transactions to make it to the target chain' + ) + time.sleep(time_left) + + total_target_txs = count_total_txs(target_node) + assert total_source_txs == total_target_txs, (total_source_txs, + total_target_txs) + logger.info(f'all {total_source_txs} transactions mirrored') + + +def main(): + config_changes = {} + for i in range(NUM_VALIDATORS + 1): + config_changes[i] = {"tracked_shards": [0, 1, 2, 3], "archive": True} + + config = load_config() + near_root, node_dirs = init_cluster(num_nodes=NUM_VALIDATORS, + num_observers=1, + num_shards=4, + config=config, + genesis_config_changes=[ + ["epoch_length", 10], + ], + client_config_changes=config_changes) + + nodes = [spin_up_node(config, near_root, node_dirs[0], 0)] + + init_mirror_dir(node_dirs[NUM_VALIDATORS], nodes[0]) + + for i in range(1, NUM_VALIDATORS): + nodes.append( + spin_up_node(config, near_root, node_dirs[i], i, + boot_node=nodes[0])) + + ctx = utils.TxContext([i for i in range(len(nodes))], nodes) + + implicit_account1 = ImplicitAccount() + for height, block_hash in utils.poll_blocks(nodes[0], timeout=TIMEOUT): + implicit_account1.transfer(nodes[0], nodes[0].signer_key, 10**24, + base58.b58decode(block_hash.encode('utf8')), + ctx.next_nonce) + ctx.next_nonce += 1 + break + + for height, block_hash in utils.poll_blocks(nodes[0], timeout=TIMEOUT): + block_hash_bytes = base58.b58decode(block_hash.encode('utf8')) + + implicit_account1.send_if_inited(nodes[0], [('test2', height), + ('test3', height)], + block_hash_bytes) + ctx.send_moar_txs(block_hash, 10, use_routing=False) + + if height > 12: + break + + nodes[0].kill() + target_node_dirs, target_observer_dir = create_forked_chain( + config, near_root) + nodes[0].start(boot_node=nodes[1]) + + ordinal = NUM_VALIDATORS + 1 + target_nodes = [ + spin_up_node(config, near_root, target_node_dirs[0], ordinal) + ] + for i in range(1, len(target_node_dirs)): + ordinal += 1 + target_nodes.append( + spin_up_node(config, + near_root, + target_node_dirs[i], + ordinal, + boot_node=target_nodes[0])) + + p = start_mirror(near_root, + dot_near() / f'{MIRROR_DIR}/source/', target_observer_dir, + target_nodes[0]) + start_time = time.time() + start_source_height = nodes[0].get_latest_block().height + restarted = False + + subaccount_key = AddedKey( + create_subaccount(nodes[0], nodes[0].signer_key, ctx.next_nonce, + block_hash_bytes)) + ctx.next_nonce += 1 + + new_key = AddedKey( + send_add_access_key(nodes[0], nodes[0].signer_key, ctx.next_nonce, + block_hash_bytes)) + ctx.next_nonce += 1 + + implicit_account2 = ImplicitAccount() + # here we are gonna send a tiny amount (1 yoctoNEAR) to the implicit account and + # then wait a bit before properly initializing it. This hits a corner case where the + # mirror binary needs to properly look for the second tx's outcome to find the starting + # nonce because the first one failed + implicit_account2.transfer(nodes[0], nodes[0].signer_key, 1, + block_hash_bytes, ctx.next_nonce) + ctx.next_nonce += 1 + time.sleep(2) + implicit_account2.transfer(nodes[0], nodes[0].signer_key, 10**24, + block_hash_bytes, ctx.next_nonce) + ctx.next_nonce += 1 + + for height, block_hash in utils.poll_blocks(nodes[0], timeout=TIMEOUT): + code = p.poll() + if code is not None: + assert code == 0 + break + + block_hash_bytes = base58.b58decode(block_hash.encode('utf8')) + + ctx.send_moar_txs(block_hash, 10, use_routing=False) + + implicit_account1.send_if_inited( + nodes[0], [('test2', height), ('test1', height), + (implicit_account2.account_id(), height)], + block_hash_bytes) + implicit_account2.send_if_inited( + nodes[1], [('test2', height), ('test0', height), + (implicit_account1.account_id(), height)], + block_hash_bytes) + new_key.send_if_inited(nodes[2], + [('test1', height), ('test2', height), + (implicit_account1.account_id(), height), + (implicit_account2.account_id(), height)], + block_hash_bytes) + subaccount_key.send_if_inited( + nodes[3], [('test3', height), + (implicit_account2.account_id(), height)], + block_hash_bytes) + + if not restarted and height - start_source_height >= 50: + logger.info('stopping mirror process') + p.terminate() + p.wait() + with open(dot_near() / f'{MIRROR_DIR}/stderr', 'ab') as stderr: + stderr.write( + b'<><><><><><><><><><><><> restarting <><><><><><><><><><><><><><><><><><><><>\n' + ) + stderr.write( + b'<><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><>\n' + ) + stderr.write( + b'<><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><>\n' + ) + p = start_mirror(near_root, + dot_near() / f'{MIRROR_DIR}/source/', + target_observer_dir, target_nodes[0]) + restarted = True + + if height - start_source_height >= 100: + break + + time.sleep(5) + # we don't need these anymore + for node in nodes[1:]: + node.kill() + check_num_txs(nodes[0], target_nodes[0], start_time, height) + + +if __name__ == '__main__': + main() diff --git a/tools/mirror/Cargo.toml b/tools/mirror/Cargo.toml new file mode 100644 index 00000000000..7137dd87ecb --- /dev/null +++ b/tools/mirror/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "near-mirror" +version = "0.0.0" +authors.workspace = true +publish = false +# Please update rust-toolchain.toml as well when changing version here: +rust-version.workspace = true +edition.workspace = true + +[dependencies] +actix.workspace = true +anyhow.workspace = true +borsh.workspace = true +bs58.workspace = true +clap.workspace = true +ed25519-dalek.workspace = true +hex.workspace = true +hkdf.workspace = true +once_cell.workspace = true +openssl-probe.workspace = true +rand_core.workspace = true +rocksdb.workspace = true +secp256k1.workspace = true +serde.workspace = true +serde_json.workspace = true +sha2.workspace = true +strum.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true + +nearcore = { path = "../../nearcore" } +near-chain-configs = { path = "../../core/chain-configs" } +near-client = { path = "../../chain/client" } +near-client-primitives = { path = "../../chain/client-primitives" } +near-indexer-primitives = { path = "../../chain/indexer-primitives" } +near-indexer = { path = "../../chain/indexer" } +near-network = { path = "../../chain/network" } +near-primitives = { path = "../../core/primitives" } +near-primitives-core = { path = "../../core/primitives-core" } +near-o11y = { path = "../../core/o11y" } +near-store = { path = "../../core/store" } +near-crypto = { path = "../../core/crypto" } \ No newline at end of file diff --git a/tools/mirror/README.md b/tools/mirror/README.md new file mode 100644 index 00000000000..d44b27dd2d5 --- /dev/null +++ b/tools/mirror/README.md @@ -0,0 +1,59 @@ +## Transaction Mirror + +This is some code that tries to help with the following: We have some +chain, let's call it the "source chain", producing blocks and chunks +with transactions as usual, and we have another chain, let's call it +the "target chain" that starts from state forked from the source +chain. Usually this would be done by using the `neard view-state +dump-state` command, and using the resulting genesis and records file +as the start of the target chain. What we want is to then periodically +send the transactions appearing in the source chain after the fork +point to the target chain. Ideally, the traffic we see in the target +chain will be very similar to the traffic in the source chain. + +The first approach we might try is to just send the source chain +transactions byte-for-byte unaltered to the target chain. This almost +works, but not quite, because the `block_hash` field in the +transactions will be rejected. This means we have no choice but to +replace the accounts' public keys in the original forked state, so +that we can sign transactions with a valid `block_hash` field. So the +way we'll use this is that we'll generate the forked state from the +source chain using the usual `dump-state` command, and then run: + +``` +$ mirror prepare --records-file-in "~/.near/output/records.json" --records-file-out "~/.near/output/mapped-records.json" +``` + +This command will output a records file where the keys have been +replaced. And then the logic we end up with when running the +transaction generator is something like this: + +``` +loop { + sleep(ONE_SECOND); + source_block = fetch_block(source_chain_view_client, height); + for chunk in block: + for tx in chunk: + private_key = map_key(tx.public_key) + block_hash = fetch_head_hash(target_chain_view_client) + new_tx = sign_tx(private_key, tx.actions, block_hash) + send_tx(target_chain_client, new_tx) +} +``` + +So then the question is what does `map_key()` do?. If we don't care +about the security of these accounts in the target chain (for example +if the target chain is just some throwaway test chain that nobody +would have any incentive to mess with), we can just use the bytes of +the public key directly as the private key. If we do care somewhat +about security, then we pass a `--secret-key-file` argument to the +`prepare` command, and pass it as an argument to `map_key()`. Using +that makes things a little bit more delicate, since if the generated +secret is ever lost, then it will no longer be possible to mirror any +traffic to the target chain. + +known problems: + +keys in the source chain added with the `promise_batch_action_add_key*` +host functions will not be mapped in the target chain. Maybe a solution +could be to replace those keys manually or something? diff --git a/tools/mirror/src/chain_tracker.rs b/tools/mirror/src/chain_tracker.rs new file mode 100644 index 00000000000..40d7cfd8635 --- /dev/null +++ b/tools/mirror/src/chain_tracker.rs @@ -0,0 +1,430 @@ +use crate::MappedBlock; +use near_crypto::PublicKey; +use near_indexer::StreamerMessage; +use near_indexer_primitives::IndexerTransactionWithOutcome; +use near_primitives::hash::CryptoHash; +use near_primitives::transaction::SignedTransaction; +use near_primitives::types::{AccountId, BlockHeight}; +use near_primitives_core::types::{Nonce, ShardId}; +use std::cmp::Ordering; +use std::collections::hash_map; +use std::collections::HashMap; +use std::collections::{BTreeSet, VecDeque}; +use std::pin::Pin; +use std::time::{Duration, Instant}; + +struct TxSendInfo { + sent_at: Instant, + source_height: BlockHeight, + target_height: BlockHeight, +} + +#[derive(PartialEq, Eq, Debug)] +struct TxId { + hash: CryptoHash, + nonce: Nonce, +} + +impl PartialOrd for TxId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TxId { + fn cmp(&self, other: &Self) -> Ordering { + self.nonce.cmp(&other.nonce).then_with(|| self.hash.cmp(&other.hash)) + } +} + +// we want a reference to transactions in .queued_blocks that need to have nonces +// set later. To avoid having the struct be self referential we keep this struct +// with enough info to look it up later. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct TxRef { + height: BlockHeight, + shard_id: ShardId, + tx_idx: usize, +} + +struct TxAwaitingNonceCursor<'a> { + txs: &'a [TxRef], + idx: usize, +} + +impl<'a> TxAwaitingNonceCursor<'a> { + fn new(txs: &'a [TxRef]) -> Self { + Self { txs, idx: 0 } + } +} + +pub(crate) struct TxAwaitingNonceIter<'a> { + queued_blocks: &'a VecDeque, + iter: hash_map::Iter<'a, BlockHeight, Vec>, + cursor: Option>, +} + +impl<'a> TxAwaitingNonceIter<'a> { + fn new( + queued_blocks: &'a VecDeque, + txs_awaiting_nonce: &'a HashMap>, + ) -> Self { + let mut iter = txs_awaiting_nonce.iter(); + let cursor = iter.next().map(|(_height, txs)| TxAwaitingNonceCursor::new(txs)); + Self { queued_blocks, iter, cursor } + } +} + +impl<'a> Iterator for TxAwaitingNonceIter<'a> { + type Item = (&'a TxRef, &'a crate::TxAwaitingNonce); + + fn next(&mut self) -> Option { + match &mut self.cursor { + Some(c) => { + let tx_ref = &c.txs[c.idx]; + c.idx += 1; + if c.idx == c.txs.len() { + self.cursor = + self.iter.next().map(|(_height, txs)| TxAwaitingNonceCursor::new(txs)); + } + let block_idx = self + .queued_blocks + .binary_search_by(|b| b.source_height.cmp(&tx_ref.height)) + .unwrap(); + let block = &self.queued_blocks[block_idx]; + let chunk = block.chunks.iter().find(|c| c.shard_id == tx_ref.shard_id).unwrap(); + match &chunk.txs[tx_ref.tx_idx] { + crate::TargetChainTx::AwaitingNonce(tx) => Some((tx_ref, tx)), + crate::TargetChainTx::Ready(_) => unreachable!(), + } + } + None => None, + } + } +} + +// Keeps the queue of upcoming transactions and provides them in regular intervals via next_batch() +// Also keeps track of txs we've sent so far and looks for them on chain, for metrics/logging purposes. +#[derive(Default)] +pub(crate) struct TxTracker { + sent_txs: HashMap, + txs_by_signer: HashMap<(AccountId, PublicKey), BTreeSet>, + queued_blocks: VecDeque, + txs_awaiting_nonce: HashMap>, + pending_access_keys: HashMap<(AccountId, PublicKey), usize>, + height_queued: Option, + send_time: Option>>, + // Config value in the target chain, used to judge how long to wait before sending a new batch of txs + min_block_production_delay: Duration, + // timestamps in the target chain, used to judge how long to wait before sending a new batch of txs + recent_block_timestamps: VecDeque, +} + +impl TxTracker { + pub(crate) fn new(min_block_production_delay: Duration) -> Self { + Self { min_block_production_delay, ..Default::default() } + } + + pub(crate) fn height_queued(&self) -> Option { + self.height_queued + } + + pub(crate) fn num_blocks_queued(&self) -> usize { + self.queued_blocks.len() + } + + pub(crate) fn pending_access_keys_iter<'a>( + &'a self, + ) -> impl Iterator { + self.pending_access_keys.iter().map(|(x, _)| x) + } + + pub(crate) fn tx_awaiting_nonce_iter<'a>(&'a self) -> TxAwaitingNonceIter<'a> { + TxAwaitingNonceIter::new(&self.queued_blocks, &self.txs_awaiting_nonce) + } + + fn pending_access_keys_deref( + &mut self, + source_signer_id: AccountId, + source_public_key: PublicKey, + ) { + match self.pending_access_keys.entry((source_signer_id, source_public_key)) { + hash_map::Entry::Occupied(mut e) => { + let ref_count = e.get_mut(); + if *ref_count == 1 { + e.remove(); + } else { + *ref_count -= 1; + } + } + hash_map::Entry::Vacant(_) => unreachable!(), + } + } + + // We now know of a valid nonce for the transaction referenced by tx_ref. + // Set the nonce and mark the tx as ready to be sent later. + pub(crate) fn set_tx_nonce(&mut self, tx_ref: &TxRef, nonce: Nonce) { + let block_idx = + self.queued_blocks.binary_search_by(|b| b.source_height.cmp(&tx_ref.height)).unwrap(); + let block = &mut self.queued_blocks[block_idx]; + let chunk = block.chunks.iter_mut().find(|c| c.shard_id == tx_ref.shard_id).unwrap(); + let tx = &mut chunk.txs[tx_ref.tx_idx]; + + match self.txs_awaiting_nonce.entry(tx_ref.height) { + hash_map::Entry::Occupied(mut e) => { + let txs = e.get_mut(); + if txs.len() == 1 { + assert!(&txs[0] == tx_ref); + e.remove(); + } else { + let idx = txs.iter().position(|t| t == tx_ref).unwrap(); + txs.swap_remove(idx); + } + } + hash_map::Entry::Vacant(_) => unreachable!(), + } + let (source_signer_id, source_public_key) = match &tx { + crate::TargetChainTx::AwaitingNonce(tx) => { + (tx.source_signer_id.clone(), tx.source_public.clone()) + } + crate::TargetChainTx::Ready(_) => unreachable!(), + }; + + tx.set_nonce(nonce); + self.pending_access_keys_deref(source_signer_id, source_public_key); + } + + pub(crate) fn queue_block(&mut self, block: MappedBlock) { + self.height_queued = Some(block.source_height); + let mut txs_awaiting_nonce = Vec::new(); + for c in block.chunks.iter() { + for (tx_idx, tx) in c.txs.iter().enumerate() { + if let crate::TargetChainTx::AwaitingNonce(tx) = tx { + txs_awaiting_nonce.push(TxRef { + height: block.source_height, + shard_id: c.shard_id, + tx_idx, + }); + *self + .pending_access_keys + .entry((tx.source_signer_id.clone(), tx.source_public.clone())) + .or_default() += 1; + } + } + } + if !txs_awaiting_nonce.is_empty() { + self.txs_awaiting_nonce.insert(block.source_height, txs_awaiting_nonce); + } + self.queued_blocks.push_back(block); + } + + pub(crate) fn next_batch_time(&self) -> Instant { + match &self.send_time { + Some(t) => t.as_ref().deadline().into_std(), + None => Instant::now(), + } + } + + pub(crate) async fn next_batch(&mut self) -> Option { + if let Some(sleep) = &mut self.send_time { + sleep.await; + } + let block = self.queued_blocks.pop_front(); + if let Some(block) = &block { + self.txs_awaiting_nonce.remove(&block.source_height); + for chunk in block.chunks.iter() { + for tx in chunk.txs.iter() { + match &tx { + crate::TargetChainTx::AwaitingNonce(tx) => self.pending_access_keys_deref( + tx.source_signer_id.clone(), + tx.source_public.clone(), + ), + crate::TargetChainTx::Ready(_) => {} + } + } + } + } + block + } + + fn remove_tx(&mut self, tx: &IndexerTransactionWithOutcome) { + let k = (tx.transaction.signer_id.clone(), tx.transaction.public_key.clone()); + match self.txs_by_signer.entry(k.clone()) { + hash_map::Entry::Occupied(mut e) => { + let txs = e.get_mut(); + if !txs.remove(&TxId { hash: tx.transaction.hash, nonce: tx.transaction.nonce }) { + tracing::warn!(target: "mirror", "tried to remove nonexistent tx {} from txs_by_signer", tx.transaction.hash); + } + // split off from hash: default() since that's the smallest hash, which will leave us with every tx with nonce + // greater than this one in txs_left. + let txs_left = txs.split_off(&TxId { + hash: CryptoHash::default(), + nonce: tx.transaction.nonce + 1, + }); + if !txs.is_empty() { + tracing::warn!( + target: "mirror", "{} Transactions for {:?} skipped by inclusion of tx with nonce {}: {:?}. These will never make it on chain.", + txs.len(), &k, tx.transaction.nonce, &txs + ); + for t in txs.iter() { + if self.sent_txs.remove(&t.hash).is_none() { + tracing::warn!( + target: "mirror", "tx with hash {} that we thought was skipped is not in the set of sent txs", + &t.hash, + ); + } + } + } + *txs = txs_left; + if txs.is_empty() { + self.txs_by_signer.remove(&k); + } + } + hash_map::Entry::Vacant(_) => { + tracing::warn!( + target: "mirror", "recently removed tx {}, but ({:?}, {:?}) not in txs_by_signer", + tx.transaction.hash, tx.transaction.signer_id, tx.transaction.public_key + ); + return; + } + }; + } + + fn record_block_timestamp(&mut self, msg: &StreamerMessage) { + self.recent_block_timestamps.push_back(msg.block.header.timestamp_nanosec); + if self.recent_block_timestamps.len() > 10 { + self.recent_block_timestamps.pop_front(); + } + } + + pub(crate) fn on_target_block(&mut self, msg: &StreamerMessage) { + self.record_block_timestamp(msg); + for s in msg.shards.iter() { + if let Some(c) = &s.chunk { + for tx in c.transactions.iter() { + if let Some(send_info) = self.sent_txs.remove(&tx.transaction.hash) { + let latency = Instant::now() - send_info.sent_at; + tracing::debug!( + target: "mirror", "found my tx {} from source #{} in target #{} {:?} after sending @ target #{}", + tx.transaction.hash, send_info.source_height, msg.block.header.height, latency, send_info.target_height + ); + crate::metrics::TRANSACTIONS_INCLUDED.inc(); + + self.remove_tx(tx); + } + } + } + } + } + + fn on_tx_sent( + &mut self, + tx: &SignedTransaction, + source_height: BlockHeight, + target_height: BlockHeight, + ) { + let hash = tx.get_hash(); + if self.sent_txs.contains_key(&hash) { + tracing::warn!(target: "mirror", "transaction sent twice: {}", &hash); + return; + } + + // TODO: don't keep adding txs if we're not ever finding them on chain, since we'll OOM eventually + // if that happens. + self.sent_txs + .insert(hash, TxSendInfo { sent_at: Instant::now(), source_height, target_height }); + let txs = self + .txs_by_signer + .entry((tx.transaction.signer_id.clone(), tx.transaction.public_key.clone())) + .or_default(); + + if let Some(highest_nonce) = txs.iter().next_back() { + if highest_nonce.nonce > tx.transaction.nonce { + tracing::warn!( + target: "mirror", "transaction sent with out of order nonce: {}: {}. Sent so far: {:?}", + &hash, tx.transaction.nonce, txs + ); + } + } + if !txs.insert(TxId { hash, nonce: tx.transaction.nonce }) { + tracing::warn!(target: "mirror", "inserted tx {} twice into txs_by_signer", &hash); + } + } + + // among the last 10 blocks, what's the second longest time between their timestamps? + // probably there's a better heuristic to use than that but this will do for now. + fn second_longest_recent_block_delay(&self) -> Option { + if self.recent_block_timestamps.len() < 5 { + return None; + } + let mut last = *self.recent_block_timestamps.front().unwrap(); + let mut longest = None; + let mut second_longest = None; + + for timestamp in self.recent_block_timestamps.iter().skip(1) { + let delay = timestamp - last; + + match longest { + Some(l) => match second_longest { + Some(s) => { + if delay > l { + second_longest = longest; + longest = Some(delay); + } else if delay > s { + second_longest = Some(delay); + } + } + None => { + if delay > l { + second_longest = longest; + longest = Some(delay); + } else { + second_longest = Some(delay); + } + } + }, + None => { + longest = Some(delay); + } + } + last = *timestamp; + } + let delay = Duration::from_nanos(second_longest.unwrap()); + if delay > 2 * self.min_block_production_delay { + tracing::warn!( + "Target chain blocks are taking longer than expected to be produced. Observing delays \ + of {:?} and {:?} vs min_block_production_delay of {:?} ", + delay, + Duration::from_nanos(longest.unwrap()), + self.min_block_production_delay, + ) + } + Some(delay) + } + + // We just successfully sent some transactions. Remember them so we can see if they really show up on chain. + pub(crate) fn on_txs_sent( + &mut self, + txs: &[SignedTransaction], + source_height: BlockHeight, + target_height: BlockHeight, + ) { + tracing::info!( + target: "mirror", "Sent {} transactions from source #{} with target HEAD @ #{}", + txs.len(), source_height, target_height + ); + for tx in txs.iter() { + self.on_tx_sent(tx, source_height, target_height); + } + + let block_delay = self + .second_longest_recent_block_delay() + .unwrap_or(self.min_block_production_delay + Duration::from_millis(100)); + match &mut self.send_time { + Some(t) => t.as_mut().reset(tokio::time::Instant::now() + block_delay), + None => { + self.send_time = Some(Box::pin(tokio::time::sleep(block_delay))); + } + } + } +} diff --git a/tools/mirror/src/cli.rs b/tools/mirror/src/cli.rs new file mode 100644 index 00000000000..4f6ea0a606c --- /dev/null +++ b/tools/mirror/src/cli.rs @@ -0,0 +1,133 @@ +use anyhow::Context; +use clap::Parser; +use std::cell::Cell; +use std::path::PathBuf; + +#[derive(Parser)] +pub struct MirrorCommand { + #[clap(subcommand)] + subcmd: SubCommand, +} + +#[derive(Parser)] +enum SubCommand { + Prepare(PrepareCmd), + Run(RunCmd), +} + +/// Start two NEAR nodes, one for each chain, and try to mirror +/// transactions from the source chain to the target chain. +#[derive(Parser)] +struct RunCmd { + /// source chain home dir + #[clap(long)] + source_home: PathBuf, + /// target chain home dir + #[clap(long)] + target_home: PathBuf, + /// file containing an optional secret as generated by the + /// `prepare` command. Must be provided unless --no-secret is given + #[clap(long)] + secret_file: Option, + /// Equivalent to passing --secret-file where is a + /// config that indicates no secret should be used. If this is + /// given, and --secret-file is also given and points to a config + /// that does contain a secret, the mirror will refuse to start + #[clap(long)] + no_secret: bool, +} + +impl RunCmd { + fn run(self) -> anyhow::Result<()> { + openssl_probe::init_ssl_cert_env_vars(); + let runtime = tokio::runtime::Runtime::new().context("failed to start tokio runtime")?; + + let secret = if let Some(secret_file) = &self.secret_file { + let secret = crate::secret::load(secret_file) + .with_context(|| format!("Failed to load secret from {:?}", secret_file))?; + if secret.is_some() && self.no_secret { + anyhow::bail!( + "--no-secret given with --secret-file indicating that a secret should be used" + ); + } + secret + } else { + if !self.no_secret { + anyhow::bail!("Please give either --secret-file or --no-secret"); + } + None + }; + + let system = new_actix_system(runtime); + system + .block_on(async move { + actix::spawn(crate::run(self.source_home, self.target_home, secret)).await + }) + .unwrap() + } +} + +/// Write a new genesis records file where the public keys have been +/// altered so that this binary can sign transactions when mirroring +/// them from the source chain to the target chain +#[derive(Parser)] +struct PrepareCmd { + /// A genesis records file as output by `neard view-state + /// dump-state --stream` + #[clap(long)] + records_file_in: PathBuf, + /// Path to the new records file with updated public keys + #[clap(long)] + records_file_out: PathBuf, + /// If this is provided, don't use a secret when mapping public + /// keys to new source chain private keys. This means that anyone + /// will be able to sign transactions for the accounts in the + /// target chain corresponding to accounts in the source chain. If + /// that is okay, then --no-secret will make the code run slightly + /// faster, and you won't have to take care to not lose the + /// secret. + #[clap(long)] + no_secret: bool, + /// Path to the secret. Note that if you don't pass --no-secret, + /// this secret is required to sign transactions for the accounts + /// in the target chain corresponding to accounts in the source + /// chain. This means that if you lose this secret, you will no + /// longer be able to mirror any traffic. + #[clap(long)] + secret_file_out: PathBuf, +} + +impl PrepareCmd { + fn run(self) -> anyhow::Result<()> { + crate::genesis::map_records( + &self.records_file_in, + &self.records_file_out, + self.no_secret, + &self.secret_file_out, + ) + } +} + +// copied from neard/src/cli.rs +fn new_actix_system(runtime: tokio::runtime::Runtime) -> actix::SystemRunner { + // `with_tokio_rt()` accepts an `Fn()->Runtime`, however we know that this function is called exactly once. + // This makes it safe to move out of the captured variable `runtime`, which is done by a trick + // using a `swap` of `Cell>`s. + let runtime_cell = Cell::new(Some(runtime)); + actix::System::with_tokio_rt(|| { + let r = Cell::new(None); + runtime_cell.swap(&r); + r.into_inner().unwrap() + }) +} + +impl MirrorCommand { + pub fn run(self) -> anyhow::Result<()> { + tracing::warn!(target: "mirror", "the mirror command is not stable, and may be removed or changed arbitrarily at any time"); + + match self.subcmd { + SubCommand::Prepare(r) => r.run(), + SubCommand::Run(r) => r.run(), + } + } +} diff --git a/tools/mirror/src/genesis.rs b/tools/mirror/src/genesis.rs new file mode 100644 index 00000000000..1fd24eb6a3b --- /dev/null +++ b/tools/mirror/src/genesis.rs @@ -0,0 +1,83 @@ +use near_primitives::state_record::StateRecord; +use serde::ser::{SerializeSeq, Serializer}; +use std::fs::File; +use std::io::{BufReader, BufWriter}; +use std::path::Path; + +pub fn map_records>( + records_file_in: P, + records_file_out: P, + no_secret: bool, + secret_file_out: P, +) -> anyhow::Result<()> { + let secret = if !no_secret { + Some(crate::secret::generate(secret_file_out)?) + } else { + crate::secret::write_empty(secret_file_out)?; + None + }; + let reader = BufReader::new(File::open(records_file_in)?); + let records_out = BufWriter::new(File::create(records_file_out)?); + let mut records_ser = serde_json::Serializer::new(records_out); + let mut records_seq = records_ser.serialize_seq(None).unwrap(); + + near_chain_configs::stream_records_from_file(reader, |mut r| { + match &mut r { + StateRecord::AccessKey { account_id, public_key, access_key } => { + let replacement = crate::key_mapping::map_key(&public_key, secret.as_ref()); + let new_record = StateRecord::AccessKey { + account_id: crate::key_mapping::map_account(&account_id, secret.as_ref()), + public_key: replacement.public_key(), + access_key: access_key.clone(), + }; + // TODO: would be nice for stream_records_from_file() to let you return early on error so + // we dont have to unwrap here + records_seq.serialize_element(&new_record).unwrap(); + } + StateRecord::Account { account_id, .. } => { + if account_id.is_implicit() { + *account_id = crate::key_mapping::map_account(&account_id, secret.as_ref()); + } + records_seq.serialize_element(&r).unwrap(); + } + StateRecord::Data { account_id, .. } => { + if account_id.is_implicit() { + *account_id = crate::key_mapping::map_account(&account_id, secret.as_ref()); + } + records_seq.serialize_element(&r).unwrap(); + } + StateRecord::Contract { account_id, .. } => { + if account_id.is_implicit() { + *account_id = crate::key_mapping::map_account(&account_id, secret.as_ref()); + } + records_seq.serialize_element(&r).unwrap(); + } + StateRecord::PostponedReceipt(receipt) => { + if receipt.predecessor_id.is_implicit() || receipt.receiver_id.is_implicit() { + receipt.predecessor_id = + crate::key_mapping::map_account(&receipt.predecessor_id, secret.as_ref()); + receipt.receiver_id = + crate::key_mapping::map_account(&receipt.receiver_id, secret.as_ref()); + } + records_seq.serialize_element(&r).unwrap(); + } + StateRecord::ReceivedData { account_id, .. } => { + if account_id.is_implicit() { + *account_id = crate::key_mapping::map_account(&account_id, secret.as_ref()); + } + records_seq.serialize_element(&r).unwrap(); + } + StateRecord::DelayedReceipt(receipt) => { + if receipt.predecessor_id.is_implicit() || receipt.receiver_id.is_implicit() { + receipt.predecessor_id = + crate::key_mapping::map_account(&receipt.predecessor_id, secret.as_ref()); + receipt.receiver_id = + crate::key_mapping::map_account(&receipt.receiver_id, secret.as_ref()); + } + records_seq.serialize_element(&r).unwrap(); + } + }; + })?; + records_seq.end()?; + Ok(()) +} diff --git a/tools/mirror/src/key_mapping.rs b/tools/mirror/src/key_mapping.rs new file mode 100644 index 00000000000..01df970fd8c --- /dev/null +++ b/tools/mirror/src/key_mapping.rs @@ -0,0 +1,111 @@ +use borsh::BorshDeserialize; +use hkdf::Hkdf; +use near_crypto::{ED25519PublicKey, ED25519SecretKey, PublicKey, Secp256K1PublicKey, SecretKey}; +use near_primitives::types::AccountId; +use sha2::Sha256; + +fn ed25519_map_secret( + buf: &mut [u8], + public: &ED25519PublicKey, + secret: Option<&[u8; crate::secret::SECRET_LEN]>, +) { + match secret { + Some(secret) => { + let hk = Hkdf::::new(None, secret); + hk.expand(&public.0, buf).unwrap(); + } + None => { + buf.copy_from_slice(&public.0); + } + }; +} + +fn map_ed25519( + public: &ED25519PublicKey, + secret: Option<&[u8; crate::secret::SECRET_LEN]>, +) -> ED25519SecretKey { + let mut buf = [0; ed25519_dalek::KEYPAIR_LENGTH]; + + ed25519_map_secret(&mut buf[..ed25519_dalek::SECRET_KEY_LENGTH], public, secret); + + let secret_key = + ed25519_dalek::SecretKey::from_bytes(&buf[..ed25519_dalek::SECRET_KEY_LENGTH]).unwrap(); + let public_key = ed25519_dalek::PublicKey::from(&secret_key); + + buf[ed25519_dalek::SECRET_KEY_LENGTH..].copy_from_slice(public_key.as_bytes()); + ED25519SecretKey(buf) +} + +fn secp256k1_from_slice(buf: &mut [u8], public: &Secp256K1PublicKey) -> secp256k1::SecretKey { + match secp256k1::SecretKey::from_slice(buf) { + Ok(s) => s, + Err(_) => { + tracing::warn!(target: "mirror", "Something super unlikely occurred! SECP256K1 key mapped from {:?} is too large. Flipping most significant bit.", public); + // If we got an error, it means that either `buf` is all zeros, or that when interpreted as a 256-bit + // int, it is larger than the order of the secp256k1 curve. Since the order of the curve starts with 0xFF, + // in either case flipping the first bit should work, and we can unwrap() below. + buf[0] ^= 0x80; + secp256k1::SecretKey::from_slice(buf).unwrap() + } + } +} + +fn map_secp256k1( + public: &Secp256K1PublicKey, + secret: Option<&[u8; crate::secret::SECRET_LEN]>, +) -> secp256k1::SecretKey { + let mut buf = [0; secp256k1::constants::SECRET_KEY_SIZE]; + + match secret { + Some(secret) => { + let hk = Hkdf::::new(None, secret); + hk.expand(public.as_ref(), &mut buf).unwrap(); + } + None => { + buf.copy_from_slice(&public.as_ref()[..secp256k1::constants::SECRET_KEY_SIZE]); + } + }; + + secp256k1_from_slice(&mut buf, public) +} + +// This maps the public key to a secret key so that we can sign +// transactions on the target chain. If secret is None, then we just +// use the bytes of the public key directly, otherwise we feed the +// public key to a key derivation function. +pub(crate) fn map_key( + key: &PublicKey, + secret: Option<&[u8; crate::secret::SECRET_LEN]>, +) -> SecretKey { + match key { + PublicKey::ED25519(k) => SecretKey::ED25519(map_ed25519(k, secret)), + PublicKey::SECP256K1(k) => SecretKey::SECP256K1(map_secp256k1(k, secret)), + } +} + +// returns the public key encoded in this implicit account. panics if it's not +// actually an implicit account +// basically copy pasted from runtime/runtime/src/actions.rs +pub(crate) fn implicit_account_key(account_id: &AccountId) -> PublicKey { + let mut public_key_data = Vec::with_capacity(33); + public_key_data.push(0u8); + public_key_data.extend(hex::decode(account_id.as_ref().as_bytes()).unwrap()); + assert_eq!(public_key_data.len(), 33); + PublicKey::try_from_slice(&public_key_data).unwrap() +} + +// If it's an implicit account, interprets it as an ed25519 public key, maps that and then returns +// the resulting implicit account. Otherwise does nothing. We do this so that transactions creating +// an implicit account by sending money will generate an account that we can control +pub(crate) fn map_account( + account_id: &AccountId, + secret: Option<&[u8; crate::secret::SECRET_LEN]>, +) -> AccountId { + if account_id.is_implicit() { + let public_key = implicit_account_key(account_id); + let mapped_key = map_key(&public_key, secret); + hex::encode(mapped_key.public_key().key_data()).parse().unwrap() + } else { + account_id.clone() + } +} diff --git a/tools/mirror/src/lib.rs b/tools/mirror/src/lib.rs new file mode 100644 index 00000000000..b53f7c200a8 --- /dev/null +++ b/tools/mirror/src/lib.rs @@ -0,0 +1,1046 @@ +use actix::Addr; +use anyhow::Context; +use borsh::{BorshDeserialize, BorshSerialize}; +use near_chain_configs::GenesisValidationMode; +use near_client::{ClientActor, ViewClientActor}; +use near_client_primitives::types::{ + GetBlock, GetBlockError, GetChunk, GetChunkError, GetExecutionOutcome, + GetExecutionOutcomeError, GetExecutionOutcomeResponse, Query, QueryError, +}; +use near_crypto::{PublicKey, SecretKey}; +use near_indexer::{Indexer, StreamerMessage}; +use near_network::types::{NetworkClientMessages, NetworkClientResponses}; +use near_o11y::WithSpanContextExt; +use near_primitives::hash::CryptoHash; +use near_primitives::transaction::{ + Action, AddKeyAction, DeleteKeyAction, SignedTransaction, Transaction, +}; +use near_primitives::types::{ + AccountId, BlockHeight, BlockId, BlockReference, Finality, TransactionOrReceiptId, +}; +use near_primitives::views::{ + ExecutionStatusView, QueryRequest, QueryResponseKind, SignedTransactionView, +}; +use near_primitives_core::types::{Nonce, ShardId}; +use nearcore::config::NearConfig; +use rocksdb::DB; +use std::collections::HashSet; +use std::path::Path; +use std::time::{Duration, Instant}; +use strum::IntoEnumIterator; +use tokio::sync::mpsc; + +mod chain_tracker; +pub mod cli; +mod genesis; +mod key_mapping; +mod metrics; +mod secret; + +pub use cli::MirrorCommand; + +#[derive(strum::EnumIter)] +enum DBCol { + Misc, + // This tracks nonces for Access Keys added by AddKey transactions + // or transfers to implicit accounts (not present in the genesis state). + // For a given (account ID, public key), if we're preparing a transaction + // and there's no entry in the DB, then the key was present in the genesis + // state. Otherwise, we map tx nonces according to the values in this column. + Nonces, +} + +impl DBCol { + fn name(&self) -> &'static str { + match self { + Self::Misc => "miscellaneous", + Self::Nonces => "nonces", + } + } +} + +// returns bytes that serve as the key corresponding to this pair in the Nonces column +fn nonce_col_key(account_id: &AccountId, public_key: &PublicKey) -> Vec { + (account_id.clone(), public_key.clone()).try_to_vec().unwrap() +} + +#[derive(Clone, BorshDeserialize, BorshSerialize, Debug, PartialEq, Eq, PartialOrd, Hash)] +struct TxIds { + tx_hash: CryptoHash, + signer_id: AccountId, + receiver_id: AccountId, +} + +// For a given AddKey Action, records the starting nonces of the +// resulting Access Keys. We need this because when an AddKey receipt +// is processed, the nonce field of the AddKey action is actually +// ignored, and it's set to block_height*1000000, so to generate +// transactions with valid nonces, we need to map valid source chain +// nonces to valid target chain nonces. +#[derive(BorshDeserialize, BorshSerialize, Debug, Default)] +struct NonceDiff { + source_start: Option, + target_start: Option, + pending_source_txs: HashSet, +} + +#[derive(thiserror::Error, Debug)] +pub(crate) enum MapNonceError { + #[error("Source chain access key not yet on chain")] + SourceKeyNotOnChain, + #[error("Target chain access key not yet on chain")] + TargetKeyNotOnChain, + #[error("Nonce arithmetic overflow: {0} + {1}")] + AddOverflow(Nonce, Nonce), + #[error("Nonce arithmetic overflow: {0} - {1}")] + SubOverflow(Nonce, Nonce), +} + +impl NonceDiff { + fn set_source(&mut self, nonce: Nonce) { + self.source_start = Some(nonce); + self.pending_source_txs.clear(); + } + + fn map(&self, nonce: Nonce) -> Result { + let source_start = self.source_start.ok_or(MapNonceError::SourceKeyNotOnChain)?; + let target_start = self.target_start.ok_or(MapNonceError::TargetKeyNotOnChain)?; + if target_start > source_start { + let diff = target_start - source_start; + nonce.checked_add(diff).ok_or_else(|| MapNonceError::AddOverflow(nonce, diff)) + } else { + let diff = source_start - target_start; + nonce.checked_sub(diff).ok_or_else(|| MapNonceError::SubOverflow(nonce, diff)) + } + } + + fn known(&self) -> bool { + self.source_start.is_some() && self.target_start.is_some() + } +} + +struct TxMirror { + target_stream: mpsc::Receiver, + source_view_client: Addr, + source_client: Addr, + target_view_client: Addr, + target_client: Addr, + db: DB, + target_genesis_height: BlockHeight, + target_min_block_production_delay: Duration, + tracked_shards: Vec, + secret: Option<[u8; crate::secret::SECRET_LEN]>, + next_source_height: Option, +} + +fn open_db>(home: P, config: &NearConfig) -> anyhow::Result { + let db_path = + near_store::NodeStorage::opener(home.as_ref(), &config.config.store).path().join("mirror"); + let mut options = rocksdb::Options::default(); + options.create_missing_column_families(true); + options.create_if_missing(true); + let cf_descriptors = DBCol::iter() + .map(|col| rocksdb::ColumnFamilyDescriptor::new(col.name(), options.clone())) + .collect::>(); + Ok(DB::open_cf_descriptors(&options, db_path, cf_descriptors)?) +} + +// a transaction that's almost prepared, except that we don't yet know +// what nonce to use because the public key was added in an AddKey +// action that we haven't seen on chain yet. The tx field is complete +// except for the nonce field. +#[derive(Debug)] +struct TxAwaitingNonce { + source_public: PublicKey, + source_signer_id: AccountId, + target_private: SecretKey, + tx: Transaction, +} + +#[derive(Debug)] +enum TargetChainTx { + Ready(SignedTransaction), + AwaitingNonce(TxAwaitingNonce), +} + +impl TargetChainTx { + // For an AwaitingNonce(_), set the nonce and sign the transaction, changing self into Ready(_). + // must not be called if self is Ready(_) + fn set_nonce(&mut self, nonce: Nonce) { + match self { + Self::AwaitingNonce(t) => { + t.tx.nonce = nonce; + let tx = SignedTransaction::new( + t.target_private.sign(&t.tx.get_hash_and_size().0.as_ref()), + t.tx.clone(), + ); + tracing::debug!( + target: "mirror", "prepared a transaction for ({:?}, {:?}) that was previously waiting for the access key to appear on chain", + &tx.transaction.signer_id, &tx.transaction.public_key + ); + *self = Self::Ready(tx); + } + Self::Ready(_) => unreachable!(), + } + } +} + +#[derive(Debug)] +struct MappedChunk { + txs: Vec, + shard_id: ShardId, +} + +#[derive(Debug)] +struct MappedBlock { + source_height: BlockHeight, + chunks: Vec, +} + +async fn account_exists( + view_client: &Addr, + account_id: &AccountId, + prev_block: &CryptoHash, +) -> anyhow::Result { + match view_client + .send( + Query::new( + BlockReference::BlockId(BlockId::Hash(prev_block.clone())), + QueryRequest::ViewAccount { account_id: account_id.clone() }, + ) + .with_span_context(), + ) + .await? + { + Ok(res) => match res.kind { + QueryResponseKind::ViewAccount(_) => Ok(true), + other => { + panic!("Received unexpected QueryResponse after Querying Account: {:?}", other); + } + }, + Err(e) => match &e { + QueryError::UnknownAccount { .. } => Ok(false), + _ => Err(e.into()), + }, + } +} + +async fn fetch_access_key_nonce( + view_client: &Addr, + account_id: &AccountId, + public_key: &PublicKey, + block_hash: Option<&CryptoHash>, +) -> anyhow::Result> { + let block_ref = match block_hash { + Some(h) => BlockReference::BlockId(BlockId::Hash(h.clone())), + None => BlockReference::Finality(Finality::None), + }; + match view_client + .send( + Query::new( + block_ref, + QueryRequest::ViewAccessKey { + account_id: account_id.clone(), + public_key: public_key.clone(), + }, + ) + .with_span_context(), + ) + .await? + { + Ok(res) => match res.kind { + QueryResponseKind::AccessKey(access_key) => Ok(Some(access_key.nonce)), + other => { + panic!("Received unexpected QueryResponse after Querying Access Key: {:?}", other); + } + }, + Err(_) => Ok(None), + } +} + +#[derive(Clone, Debug)] +enum TxOutcome { + Success(CryptoHash), + Pending, + Failure, +} + +async fn fetch_tx_outcome( + view_client: &Addr, + transaction_hash: CryptoHash, + signer_id: &AccountId, + receiver_id: &AccountId, +) -> anyhow::Result { + let receipt_id = match view_client + .send( + GetExecutionOutcome { + id: TransactionOrReceiptId::Transaction { + transaction_hash, + sender_id: signer_id.clone(), + }, + } + .with_span_context(), + ) + .await + .unwrap() + { + Ok(GetExecutionOutcomeResponse { outcome_proof, .. }) => { + match outcome_proof.outcome.status { + ExecutionStatusView::SuccessReceiptId(id) => id, + ExecutionStatusView::SuccessValue(_) => unreachable!(), + ExecutionStatusView::Failure(_) | ExecutionStatusView::Unknown => { + return Ok(TxOutcome::Failure) + } + } + } + Err( + GetExecutionOutcomeError::NotConfirmed { .. } + | GetExecutionOutcomeError::UnknownBlock { .. }, + ) => return Ok(TxOutcome::Pending), + Err(e) => { + return Err(e) + .with_context(|| format!("failed fetching outcome for tx {}", transaction_hash)) + } + }; + match view_client + .send( + GetExecutionOutcome { + id: TransactionOrReceiptId::Receipt { + receipt_id, + receiver_id: receiver_id.clone(), + }, + } + .with_span_context(), + ) + .await + .unwrap() + { + Ok(GetExecutionOutcomeResponse { outcome_proof, .. }) => { + match outcome_proof.outcome.status { + ExecutionStatusView::SuccessReceiptId(_) | ExecutionStatusView::SuccessValue(_) => { + // the view client code actually modifies the outcome's block_hash field to be the + // next block with a new chunk in the relevant shard, so go backwards one block, + // since that's what we'll want to give in the query for AccessKeys + let block = view_client + .send( + GetBlock(BlockReference::BlockId(BlockId::Hash( + outcome_proof.block_hash, + ))) + .with_span_context(), + ) + .await + .unwrap() + .with_context(|| { + format!("failed fetching block {}", &outcome_proof.block_hash) + })?; + Ok(TxOutcome::Success(block.header.prev_hash)) + } + ExecutionStatusView::Failure(_) | ExecutionStatusView::Unknown => { + Ok(TxOutcome::Failure) + } + } + } + Err( + GetExecutionOutcomeError::NotConfirmed { .. } + | GetExecutionOutcomeError::UnknownBlock { .. } + | GetExecutionOutcomeError::UnknownTransactionOrReceipt { .. }, + ) => Ok(TxOutcome::Pending), + Err(e) => { + Err(e).with_context(|| format!("failed fetching outcome for receipt {}", &receipt_id)) + } + } +} + +async fn block_hash_to_height( + view_client: &Addr, + hash: &CryptoHash, +) -> anyhow::Result { + Ok(view_client + .send(GetBlock(BlockReference::BlockId(BlockId::Hash(hash.clone()))).with_span_context()) + .await + .unwrap()? + .header + .height) +} + +impl TxMirror { + fn new>( + source_home: P, + target_home: P, + secret: Option<[u8; crate::secret::SECRET_LEN]>, + ) -> anyhow::Result { + let target_config = + nearcore::config::load_config(target_home.as_ref(), GenesisValidationMode::UnsafeFast) + .with_context(|| { + format!("Error loading target config from {:?}", target_home.as_ref()) + })?; + let db = + open_db(target_home.as_ref(), &target_config).context("failed to open mirror DB")?; + let source_config = + nearcore::config::load_config(source_home.as_ref(), GenesisValidationMode::UnsafeFast) + .with_context(|| { + format!("Error loading source config from {:?}", source_home.as_ref()) + })?; + + let source_node = nearcore::start_with_config(source_home.as_ref(), source_config.clone()) + .context("failed to start source chain NEAR node")?; + + let target_indexer = Indexer::new(near_indexer::IndexerConfig { + home_dir: target_home.as_ref().to_path_buf(), + sync_mode: near_indexer::SyncModeEnum::LatestSynced, + await_for_node_synced: near_indexer::AwaitForNodeSyncedEnum::WaitForFullSync, + }) + .context("failed to start target chain indexer")?; + let (target_view_client, target_client) = target_indexer.client_actors(); + let target_stream = target_indexer.streamer(); + + Ok(Self { + source_view_client: source_node.view_client, + source_client: source_node.client, + target_client, + target_view_client, + target_stream, + db, + target_genesis_height: target_config.genesis.config.genesis_height, + target_min_block_production_delay: target_config + .client_config + .min_block_production_delay, + tracked_shards: target_config.config.tracked_shards.clone(), + secret, + next_source_height: None, + }) + } + + fn get_next_source_height(&mut self) -> anyhow::Result { + if let Some(height) = self.next_source_height { + return Ok(height); + } + let height = + self.db.get_cf(self.db.cf_handle(DBCol::Misc.name()).unwrap(), "next_source_height")?; + match height { + Some(h) => { + let height = BlockHeight::try_from_slice(&h).unwrap(); + self.next_source_height = Some(height); + Ok(height) + } + None => Ok(self.target_genesis_height), + } + } + + async fn send_transactions( + &mut self, + block: &MappedBlock, + ) -> anyhow::Result> { + let mut sent = vec![]; + for chunk in block.chunks.iter() { + for tx in chunk.txs.iter() { + match tx { + TargetChainTx::Ready(tx) => { + match self + .target_client + .send( + NetworkClientMessages::Transaction { + transaction: tx.clone(), + is_forwarded: false, + check_only: false, + } + .with_span_context(), + ) + .await? + { + NetworkClientResponses::RequestRouted => { + crate::metrics::TRANSACTIONS_SENT.with_label_values(&["ok"]).inc(); + sent.push(tx.clone()); + } + NetworkClientResponses::InvalidTx(e) => { + // TODO: here if we're getting an error because the tx was already included, it is possible + // that some other instance of this code ran and made progress already. For now we can assume + // only once instance of this code will run, but this is the place to detect if that's not the case. + tracing::error!( + target: "mirror", "Tried to send an invalid tx from source #{} shard {}: {:?}", + block.source_height, chunk.shard_id, e + ); + crate::metrics::TRANSACTIONS_SENT + .with_label_values(&["invalid"]) + .inc(); + } + r => { + tracing::error!( + target: "mirror", "Unexpected response sending tx from source #{} shard {}: {:?}. The transaction was not sent", + block.source_height, chunk.shard_id, r + ); + crate::metrics::TRANSACTIONS_SENT + .with_label_values(&["internal_error"]) + .inc(); + } + } + } + TargetChainTx::AwaitingNonce(tx) => { + // TODO: here we should just save this transaction for later and send it when it's known + tracing::warn!(target: "mirror", "skipped sending transaction with signer {} because valid target chain nonce not known", &tx.source_signer_id) + } + } + } + } + Ok(sent) + } + + fn read_nonce_diff( + &self, + account_id: &AccountId, + public_key: &PublicKey, + ) -> anyhow::Result> { + let db_key = nonce_col_key(account_id, public_key); + // TODO: cache this? + Ok(self + .db + .get_cf(self.db.cf_handle(DBCol::Nonces.name()).unwrap(), &db_key)? + .map(|v| NonceDiff::try_from_slice(&v).unwrap())) + } + + fn put_nonce_diff( + &self, + account_id: &AccountId, + public_key: &PublicKey, + diff: &NonceDiff, + ) -> anyhow::Result<()> { + tracing::debug!(target: "mirror", "storing {:?} in DB for ({:?}, {:?})", &diff, account_id, public_key); + let db_key = nonce_col_key(account_id, public_key); + self.db.put_cf( + self.db.cf_handle(DBCol::Nonces.name()).unwrap(), + &db_key, + &diff.try_to_vec().unwrap(), + )?; + Ok(()) + } + + // If the access key was present in the genesis records, just + // return the same nonce. Otherwise, we need to change the + // nonce. So check if we already know what the difference in + // nonces is, and if not, try to fetch that info and store it. + // `source_signer_id` and `target_signer_id` are the same unless + // it's an implicit account + async fn map_nonce( + &self, + source_signer_id: &AccountId, + target_signer_id: &AccountId, + source_public: &PublicKey, + target_public: &PublicKey, + nonce: Nonce, + ) -> anyhow::Result> { + let mut diff = match self.read_nonce_diff(source_signer_id, source_public)? { + Some(m) => m, + // If it's not stored in the database, it's an access key that was present in the genesis + // records, so we don't need to do anything to the nonce. + None => return Ok(Ok(nonce)), + }; + if diff.known() { + return Ok(diff.map(nonce)); + } + + self.update_nonces( + source_signer_id, + target_signer_id, + source_public, + target_public, + &mut diff, + ) + .await?; + Ok(diff.map(nonce)) + } + + async fn update_nonces( + &self, + source_signer_id: &AccountId, + target_signer_id: &AccountId, + source_public: &PublicKey, + target_public: &PublicKey, + diff: &mut NonceDiff, + ) -> anyhow::Result<()> { + let mut rewrite = false; + if diff.source_start.is_none() { + self.update_source_nonce(source_signer_id, source_public, diff).await?; + rewrite |= diff.source_start.is_some(); + } + if diff.target_start.is_none() { + diff.target_start = fetch_access_key_nonce( + &self.target_view_client, + target_signer_id, + target_public, + None, + ) + .await?; + rewrite |= diff.target_start.is_some(); + } + + if rewrite { + self.put_nonce_diff(source_signer_id, source_public, diff)?; + } + Ok(()) + } + + async fn update_source_nonce( + &self, + account_id: &AccountId, + public_key: &PublicKey, + diff: &mut NonceDiff, + ) -> anyhow::Result<()> { + let mut block_height = 0; + let mut block_hash = CryptoHash::default(); + let mut failed_txs = Vec::new(); + + // first find the earliest block hash where the access key should exist + for tx in diff.pending_source_txs.iter() { + match fetch_tx_outcome( + &self.source_view_client, + tx.tx_hash.clone(), + &tx.signer_id, + &tx.receiver_id, + ) + .await? + { + TxOutcome::Success(hash) => { + let height = + block_hash_to_height(&self.source_view_client, &hash).await.with_context( + || format!("failed fetching block height of block {}", &hash), + )?; + if &block_hash == &CryptoHash::default() || block_height > height { + block_height = height; + block_hash = hash; + } + } + TxOutcome::Failure => { + failed_txs.push(tx.clone()); + } + TxOutcome::Pending => {} + } + } + if &block_hash == &CryptoHash::default() { + // no need to do this if block_hash is set because set_source() below will clear it + for tx in failed_txs.iter() { + diff.pending_source_txs.remove(tx); + } + return Ok(()); + } + let nonce = fetch_access_key_nonce( + &self.source_view_client, + account_id, + public_key, + Some(&block_hash), + ) + .await? + .ok_or_else(|| { + anyhow::anyhow!( + "expected access key to exist for {}, {} after finding successful receipt in {}", + &account_id, + &public_key, + &block_hash + ) + })?; + diff.set_source(nonce); + Ok(()) + } + + // we have a situation where nonces need to be mapped (AddKey actions + // or implicit account transfers). So store the initial nonce data in the DB. + async fn store_source_nonce( + &self, + tx: &SignedTransactionView, + public_key: &PublicKey, + ) -> anyhow::Result<()> { + // TODO: probably better to use a merge operator here. Not urgent, though. + let mut diff = self.read_nonce_diff(&tx.receiver_id, &public_key)?.unwrap_or_default(); + if diff.source_start.is_some() { + return Ok(()); + } + diff.pending_source_txs.insert(TxIds { + tx_hash: tx.hash.clone(), + signer_id: tx.signer_id.clone(), + receiver_id: tx.receiver_id.clone(), + }); + self.update_source_nonce(&tx.receiver_id, &public_key, &mut diff).await?; + self.put_nonce_diff(&tx.receiver_id, &public_key, &diff) + } + + async fn map_actions( + &self, + tx: &SignedTransactionView, + prev_block: &CryptoHash, + ) -> anyhow::Result> { + let mut actions = Vec::new(); + + for a in tx.actions.iter() { + // this try_from() won't fail since the ActionView was constructed from the Action + let action = Action::try_from(a.clone()).unwrap(); + + match &action { + Action::AddKey(add_key) => { + self.store_source_nonce(tx, &add_key.public_key).await?; + + let replacement = + crate::key_mapping::map_key(&add_key.public_key, self.secret.as_ref()); + + actions.push(Action::AddKey(AddKeyAction { + public_key: replacement.public_key(), + access_key: add_key.access_key.clone(), + })); + } + Action::DeleteKey(delete_key) => { + let replacement = + crate::key_mapping::map_key(&delete_key.public_key, self.secret.as_ref()); + let public_key = replacement.public_key(); + + actions.push(Action::DeleteKey(DeleteKeyAction { public_key })); + } + Action::Transfer(_) => { + if tx.receiver_id.is_implicit() + && !account_exists(&self.source_view_client, &tx.receiver_id, prev_block) + .await + .with_context(|| { + format!("failed checking existence for account {}", &tx.receiver_id) + })? + { + let public_key = crate::key_mapping::implicit_account_key(&tx.receiver_id); + self.store_source_nonce(tx, &public_key).await?; + } + actions.push(action); + } + // We don't want to mess with the set of validators in the target chain + Action::Stake(_) => {} + _ => actions.push(action), + }; + } + Ok(actions) + } + + // fetch the source chain block at `source_height`, and prepare a + // set of transactions that should be valid in the target chain + // from it. + async fn fetch_txs( + &self, + source_height: BlockHeight, + ref_hash: CryptoHash, + ) -> anyhow::Result> { + let prev_hash = match self + .source_view_client + .send( + GetBlock(BlockReference::BlockId(BlockId::Height(source_height))) + .with_span_context(), + ) + .await + .unwrap() + { + Ok(b) => b.header.prev_hash, + Err(GetBlockError::UnknownBlock { .. }) => return Ok(None), + Err(e) => return Err(e.into()), + }; + let mut chunks = Vec::new(); + for shard_id in self.tracked_shards.iter() { + let mut txs = Vec::new(); + + let chunk = match self + .source_view_client + .send(GetChunk::Height(source_height, *shard_id).with_span_context()) + .await? + { + Ok(c) => c, + Err(e) => match e { + GetChunkError::UnknownBlock { .. } => return Ok(None), + GetChunkError::UnknownChunk { .. } => { + tracing::error!( + "Can't fetch source chain shard {} chunk at height {}. Are we tracking all shards?", + shard_id, source_height + ); + continue; + } + _ => return Err(e.into()), + }, + }; + if chunk.header.height_included != source_height { + continue; + } + + let mut num_not_ready = 0; + for t in chunk.transactions { + let actions = self.map_actions(&t, &prev_hash).await?; + if actions.is_empty() { + // If this is a tx containing only stake actions, skip it. + continue; + } + let mapped_key = crate::key_mapping::map_key(&t.public_key, self.secret.as_ref()); + let public_key = mapped_key.public_key(); + + let target_signer_id = + crate::key_mapping::map_account(&t.signer_id, self.secret.as_ref()); + match self + .map_nonce(&t.signer_id, &target_signer_id, &t.public_key, &public_key, t.nonce) + .await? + { + Ok(nonce) => { + let mut tx = Transaction::new( + target_signer_id, + public_key, + crate::key_mapping::map_account(&t.receiver_id, self.secret.as_ref()), + nonce, + ref_hash.clone(), + ); + tx.actions = actions; + let tx = SignedTransaction::new( + mapped_key.sign(&tx.get_hash_and_size().0.as_ref()), + tx, + ); + txs.push(TargetChainTx::Ready(tx)); + } + Err(e) => match e { + MapNonceError::AddOverflow(..) + | MapNonceError::SubOverflow(..) + | MapNonceError::SourceKeyNotOnChain => { + tracing::error!(target: "mirror", "error mapping nonce for ({:?}, {:?}): {:?}", &t.signer_id, &public_key, e); + continue; + } + MapNonceError::TargetKeyNotOnChain => { + let mut tx = Transaction::new( + crate::key_mapping::map_account(&t.signer_id, self.secret.as_ref()), + public_key, + crate::key_mapping::map_account( + &t.receiver_id, + self.secret.as_ref(), + ), + t.nonce, + ref_hash.clone(), + ); + tx.actions = actions; + txs.push(TargetChainTx::AwaitingNonce(TxAwaitingNonce { + tx, + source_public: t.public_key.clone(), + source_signer_id: t.signer_id.clone(), + target_private: mapped_key, + })); + num_not_ready += 1; + } + }, + }; + } + if num_not_ready == 0 { + tracing::debug!( + target: "mirror", "prepared {} transacations for source chain #{} shard {}", + txs.len(), source_height, shard_id + ); + } else { + tracing::debug!( + target: "mirror", "prepared {} transacations for source chain #{} shard {} {} of which are \ + still waiting for the corresponding access keys to make it on chain", + txs.len(), source_height, shard_id, num_not_ready, + ); + } + chunks.push(MappedChunk { txs, shard_id: *shard_id }); + } + Ok(Some(MappedBlock { source_height, chunks })) + } + + // Up to a certain capacity, prepare and queue up batches of + // transactions that we want to send to the target chain. + async fn queue_txs( + &mut self, + tracker: &mut crate::chain_tracker::TxTracker, + ref_hash: CryptoHash, + check_send_time: bool, + ) -> anyhow::Result<()> { + if tracker.num_blocks_queued() > 100 { + return Ok(()); + } + + let next_batch_time = tracker.next_batch_time(); + let source_head = + self.get_source_height().await.context("can't fetch source chain HEAD")?; + let start_height = match tracker.height_queued() { + Some(h) => h + 1, + None => self.get_next_source_height()?, + }; + + for height in start_height..=source_head { + if let Some(b) = self + .fetch_txs(height, ref_hash) + .await + .with_context(|| format!("Can't fetch source #{} transactions", height))? + { + tracker.queue_block(b); + if tracker.num_blocks_queued() > 100 { + return Ok(()); + } + }; + + if check_send_time + && tracker.num_blocks_queued() > 0 + && Instant::now() > next_batch_time - Duration::from_millis(20) + { + return Ok(()); + } + } + Ok(()) + } + + fn set_next_source_height(&mut self, height: BlockHeight) -> anyhow::Result<()> { + self.next_source_height = Some(height); + // TODO: we should instead save something like the + // (block_height, shard_id, idx_in_chunk) of the last + // transaction sent. Currently we set next_source_height after + // sending all of the transactions in that chunk, so if we get + // SIGTERM or something in the middle of sending a batch of + // txs, we'll send some that we already sent next time we + // start. Not a giant problem but kind of unclean. + self.db.put_cf( + self.db.cf_handle(DBCol::Misc.name()).unwrap(), + "next_source_height", + height.try_to_vec().unwrap(), + )?; + Ok(()) + } + + // Go through any upcoming batches of transactions that we haven't + // been able to set a valid nonce for yet, and see if we can now + // do that. + async fn set_nonces( + &self, + tracker: &mut crate::chain_tracker::TxTracker, + ) -> anyhow::Result<()> { + let next_batch_time = tracker.next_batch_time(); + let mut txs_ready = Vec::new(); + let mut keys_mapped = HashSet::new(); + + for (source_signer_id, source_public_key) in tracker.pending_access_keys_iter() { + let mut diff = self.read_nonce_diff(source_signer_id, source_public_key)?.unwrap(); + let target_signer_id = + crate::key_mapping::map_account(source_signer_id, self.secret.as_ref()); + let target_public_key = + crate::key_mapping::map_key(source_public_key, self.secret.as_ref()).public_key(); + self.update_nonces( + &source_signer_id, + &target_signer_id, + &source_public_key, + &target_public_key, + &mut diff, + ) + .await?; + if diff.known() { + keys_mapped.insert((source_signer_id.clone(), source_public_key.clone())); + } + } + for (tx_ref, tx) in tracker.tx_awaiting_nonce_iter() { + if keys_mapped.contains(&(tx.source_signer_id.clone(), tx.source_public.clone())) { + let nonce = self + .map_nonce( + &tx.source_signer_id, + &tx.tx.signer_id, + &tx.source_public, + &tx.tx.public_key, + tx.tx.nonce, + ) + .await? + .unwrap(); + txs_ready.push((tx_ref.clone(), nonce)); + } + + if Instant::now() > next_batch_time - Duration::from_millis(20) { + break; + } + } + for (tx_ref, nonce) in txs_ready { + tracker.set_tx_nonce(&tx_ref, nonce); + } + Ok(()) + } + + async fn main_loop( + &mut self, + mut tracker: crate::chain_tracker::TxTracker, + mut target_height: BlockHeight, + mut target_head: CryptoHash, + ) -> anyhow::Result<()> { + loop { + tokio::select! { + // time to send a batch of transactions + mapped_block = tracker.next_batch(), if tracker.num_blocks_queued() > 0 => { + let mapped_block = mapped_block.unwrap(); + let sent = self.send_transactions(&mapped_block).await?; + tracker.on_txs_sent(&sent, mapped_block.source_height, target_height); + + // now we have one second left until we need to send more transactions. In the + // meantime, we might as well prepare some more batches of transactions. + // TODO: continue in best effort fashion on error + self.set_next_source_height(mapped_block.source_height+1)?; + self.queue_txs(&mut tracker, target_head, true).await?; + } + msg = self.target_stream.recv() => { + let msg = msg.unwrap(); + tracker.on_target_block(&msg); + self.set_nonces(&mut tracker).await?; + target_head = msg.block.header.hash; + target_height = msg.block.header.height; + } + // If we don't have any upcoming sets of transactions to send already built, we probably fell behind in the source + // chain and can't fetch the transactions. Check if we have them now here. + _ = tokio::time::sleep(Duration::from_millis(200)), if tracker.num_blocks_queued() == 0 => { + self.queue_txs(&mut tracker, target_head, true).await?; + } + }; + } + } + + async fn get_source_height(&self) -> Option { + self.source_client + .send( + near_client::Status { is_health_check: false, detailed: false }.with_span_context(), + ) + .await + .unwrap() + .ok() + .map(|s| s.sync_info.latest_block_height) + } + + // wait until HEAD moves. We don't really need it to be fully synced. + async fn wait_source_ready(&self) { + let mut first_height = None; + loop { + if let Some(head) = self.get_source_height().await { + match first_height { + Some(h) => { + if h != head { + return; + } + } + None => { + first_height = Some(head); + } + } + } + + tokio::time::sleep(Duration::from_millis(500)).await; + } + } + + async fn wait_target_synced(&mut self) -> (BlockHeight, CryptoHash) { + let msg = self.target_stream.recv().await.unwrap(); + (msg.block.header.height, msg.block.header.hash) + } + + async fn run(mut self) -> anyhow::Result<()> { + let mut tracker = + crate::chain_tracker::TxTracker::new(self.target_min_block_production_delay); + self.wait_source_ready().await; + let (target_height, target_head) = self.wait_target_synced().await; + + self.queue_txs(&mut tracker, target_head, false).await?; + + self.main_loop(tracker, target_height, target_head).await + } +} + +async fn run>( + source_home: P, + target_home: P, + secret: Option<[u8; crate::secret::SECRET_LEN]>, +) -> anyhow::Result<()> { + let m = TxMirror::new(source_home, target_home, secret)?; + m.run().await +} diff --git a/tools/mirror/src/metrics.rs b/tools/mirror/src/metrics.rs new file mode 100644 index 00000000000..c26c563cfaa --- /dev/null +++ b/tools/mirror/src/metrics.rs @@ -0,0 +1,21 @@ +use near_o11y::metrics::{ + try_create_int_counter, try_create_int_counter_vec, IntCounter, IntCounterVec, +}; +use once_cell::sync::Lazy; + +pub static TRANSACTIONS_SENT: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_mirror_transactions_sent", + "Total number of transactions sent", + &["status"], + ) + .unwrap() +}); + +pub static TRANSACTIONS_INCLUDED: Lazy = Lazy::new(|| { + try_create_int_counter( + "near_mirror_transactions_included", + "Total number of transactions sent that made it on-chain", + ) + .unwrap() +}); diff --git a/tools/mirror/src/secret.rs b/tools/mirror/src/secret.rs new file mode 100644 index 00000000000..6107a831aa8 --- /dev/null +++ b/tools/mirror/src/secret.rs @@ -0,0 +1,85 @@ +use rand_core::{OsRng, RngCore}; +use serde::{Deserialize, Serialize}; +use std::fs::File; +use std::io::Write; +use std::path::Path; +use std::str::FromStr; + +pub const SECRET_LEN: usize = 64; +struct KeyMapSecret([u8; SECRET_LEN]); + +#[derive(Serialize, Deserialize)] +struct MirrorSecretConfig { + pub key_map_secret: Option, +} + +impl serde::Serialize for KeyMapSecret { + fn serialize( + &self, + serializer: S, + ) -> Result<::Ok, ::Error> + where + S: serde::Serializer, + { + let data = bs58::encode(&self.0[..]).into_string(); + serializer.serialize_str(&data) + } +} + +impl<'de> serde::Deserialize<'de> for KeyMapSecret { + fn deserialize(deserializer: D) -> Result>::Error> + where + D: serde::Deserializer<'de>, + { + let s = ::deserialize(deserializer)?; + Self::from_str(&s).map_err(|err| serde::de::Error::custom(format!("{:?}", err))) + } +} + +#[derive(thiserror::Error, Debug)] +pub(crate) enum ParseSecretError { + #[error("Base58 decode failure: `{1}`")] + BS58(#[source] bs58::decode::Error, String), + #[error("invalid decoded length (expected: 64, got: {0}: input: `{1}`)")] + BadLength(usize, String), +} + +impl FromStr for KeyMapSecret { + type Err = ParseSecretError; + + fn from_str(s: &str) -> Result { + let mut array = [0; SECRET_LEN]; + let length = bs58::decode(s) + .into(&mut array[..]) + .map_err(|err| Self::Err::BS58(err, s.to_owned()))?; + if length != SECRET_LEN { + return Err(Self::Err::BadLength(length, s.to_owned())); + } + Ok(Self(array)) + } +} + +pub(crate) fn generate>(secret_file_out: P) -> anyhow::Result<[u8; SECRET_LEN]> { + let mut secret = [0; SECRET_LEN]; + let mut out = File::create(secret_file_out)?; + + OsRng.fill_bytes(&mut secret); + let config = MirrorSecretConfig { key_map_secret: Some(KeyMapSecret(secret)) }; + let str = serde_json::to_string_pretty(&config)?; + out.write_all(str.as_bytes())?; + Ok(secret) +} + +pub(crate) fn write_empty>(secret_file_out: P) -> anyhow::Result<()> { + let mut out = File::create(secret_file_out)?; + let config = MirrorSecretConfig { key_map_secret: None }; + let str = serde_json::to_string_pretty(&config)?; + out.write_all(str.as_bytes())?; + Ok(()) +} + +pub fn load>(secret_file: P) -> anyhow::Result> { + let s = std::fs::read_to_string(secret_file)?; + let config: MirrorSecretConfig = serde_json::from_str(&s)?; + Ok(config.key_map_secret.map(|s| s.0)) +}