Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganization the devnet subscription services #6331

Merged
merged 8 commits into from
Jan 10, 2023
Merged
Prev Previous commit
Next Next commit
finished setup for subscription to new heads being separate
  • Loading branch information
leonardchinonso committed Dec 15, 2022
commit 3f848c67f20ef399f21be93caf3c5e1b40d43d81
31 changes: 3 additions & 28 deletions cmd/devnet/commands/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,26 @@ package commands

import (
"fmt"
"github.com/ledgerwatch/erigon/cmd/devnet/services"

"github.com/ledgerwatch/erigon/cmd/devnet/models"
"github.com/ledgerwatch/erigon/cmd/devnet/services"
)

// ExecuteAllMethods runs all the simulation tests for erigon devnet
func ExecuteAllMethods() {
// unsubscribe from all the subscriptions made
defer services.UnsubscribeAll()

// test connection to JSON RPC
fmt.Printf("\nPINGING JSON RPC...\n")
if err := pingErigonRpc(); err != nil {
return
}
fmt.Println()

fmt.Printf("CONNECTING TO WEBSOCKETS AND SUBSCRIBING TO METHODS...\n")
methods := []models.SubMethod{models.ETHNewHeads}
if err := services.SubscribeAll(methods); err != nil {
fmt.Printf("failed to subscribe to all methods: %v\n", err)
return
}

// get balance of the receiver's account
callGetBalance(addr, models.Latest, 0)
fmt.Println()

// confirm that the txpool is empty
fmt.Println("CONFIRMING TXPOOL IS EMPTY BEFORE SENDING TRANSACTION...")
checkTxPoolContent(0, 0)
services.CheckTxPoolContent(0, 0)
fmt.Println()

/*
Expand All @@ -42,23 +31,13 @@ func ExecuteAllMethods() {
*/

// send a token from the dev address to the recipient address
//nonContractHash, err := callSendTx(sendValue, recipientAddress, models.DevAddress)
//_, err := callSendTx(sendValue, recipientAddress, models.DevAddress)
//if err != nil {
// fmt.Printf("callSendTx error: %v\n", err)
// return
//}
//fmt.Println()

//// confirm that the txpool has this transaction in the pending queue
//fmt.Println("CONFIRMING TXPOOL HAS THE LATEST TRANSACTION...")
//checkTxPoolContent(1, 0)
//fmt.Println()
//
//// look for the transaction hash in the newly mined block
//fmt.Println("LOOKING FOR TRANSACTION IN THE LATEST BLOCK...")
//callSubscribeToNewHeads(*nonContractHash)
//fmt.Println()

// initiate a contract transaction
fmt.Println("INITIATING A CONTRACT TRANSACTION...")
_, err := callContractTx()
Expand All @@ -67,8 +46,4 @@ func ExecuteAllMethods() {
return
}
fmt.Println()

// confirm that the transaction has been moved from the pending queue and the txpool is empty once again
fmt.Println("CONFIRMING TXPOOL IS EMPTY ONCE AGAIN...")
checkTxPoolContent(0, 0)
}
25 changes: 19 additions & 6 deletions cmd/devnet/commands/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,18 @@ func callSendTx(value uint64, toAddr, fromAddr string) (*common.Hash, error) {

fmt.Printf("SUCCESS => Tx submitted, adding tx with hash %q to txpool\n", hash)

hashes := map[common.Hash]bool{*hash: true}
if err := services.SearchReservesForTransactionHash(hashes); err != nil {
return nil, fmt.Errorf("failed to call contract tx: %v", err)
}

return hash, nil
}

func callContractTx() (*common.Hash, error) {
// hashset to hold hashes for search after mining
hashes := make(map[common.Hash]bool)

// get the latest nonce for the next transaction
nonce, err := services.GetNonce(models.ReqId, common.HexToAddress(models.DevAddress))
if err != nil {
Expand All @@ -64,19 +72,24 @@ func callContractTx() (*common.Hash, error) {
fmt.Printf("failed to send transaction: %v\n", err)
return nil, err
}
hashes[*hash] = true
fmt.Printf("SUCCESS => Tx submitted, adding tx with hash %q to txpool\n", hash)
fmt.Println()

_, err = services.SearchBlockForTransactionHash(*hash)
eventHash, err := services.EmitFallbackEvent(models.ReqId, subscriptionContract, transactOpts, address)
if err != nil {
return nil, fmt.Errorf("failed to find tx in block: %v", err)
}
fmt.Println()

if err := services.EmitFallbackEvent(models.ReqId, subscriptionContract, transactOpts, address); err != nil {
fmt.Printf("failed to emit events: %v\n", err)
return nil, err
}
hashes[*eventHash] = true

if err := services.SearchReservesForTransactionHash(hashes); err != nil {
return nil, fmt.Errorf("failed to call contract tx: %v", err)
}

//if err = requests.GetLogs(reqId, blockN, blockN, address, false); err != nil {
// return nil, fmt.Errorf("failed to get logs: %v", err)
//}

return hash, nil
}
16 changes: 0 additions & 16 deletions cmd/devnet/commands/event.go

This file was deleted.

17 changes: 14 additions & 3 deletions cmd/devnet/main.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,40 @@
package main

import (
"github.com/ledgerwatch/erigon/cmd/devnet/commands"
"github.com/ledgerwatch/erigon/cmd/devnet/models"
"sync"
"time"

"github.com/ledgerwatch/erigon/cmd/devnet/commands"
"github.com/ledgerwatch/erigon/cmd/devnet/devnetutils"
"github.com/ledgerwatch/erigon/cmd/devnet/node"
"github.com/ledgerwatch/erigon/cmd/devnet/services"
)

func main() {
defer func() {
// unsubscribe from all the subscriptions made
defer services.UnsubscribeAll()

// clear all the dev files
devnetutils.ClearDevDB()
}()

// wait group variable to prevent main function from terminating until routines are finished
var wg sync.WaitGroup

// remove the old logs from previous runs
devnetutils.DeleteLogs()

defer devnetutils.ClearDevDB()

// start the first erigon node in a go routine
node.Start(&wg)

// sleep for seconds to allow the nodes fully start up
time.Sleep(time.Second * 10)

// start up the subscription services for the different sub methods
services.InitSubscriptions([]models.SubMethod{models.ETHNewHeads})

// execute all rpc methods amongst the two nodes
commands.ExecuteAllMethods()

Expand Down
8 changes: 6 additions & 2 deletions cmd/devnet/models/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package models

import (
"fmt"
"github.com/ledgerwatch/erigon/rpc"

"github.com/ledgerwatch/erigon/accounts/abi/bind/backends"
"github.com/ledgerwatch/erigon/cmd/rpctest/rpctest"
Expand All @@ -11,6 +10,7 @@ import (
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/p2p"
"github.com/ledgerwatch/erigon/rpc"
)

type (
Expand Down Expand Up @@ -126,9 +126,13 @@ var (

// MethodSubscriptionMap is a container for all the subscription methods
MethodSubscriptionMap *map[SubMethod]*MethodSubscription

// NewHeadsChan is the block cache the eth_NewHeads
NewHeadsChan chan interface{}
// OldHeads holds a list of visited blocks to recheck transactions
//OldHeads []string
)

// Responses for the rpc calls
type (
// AdminNodeInfoResponse is the response for calls made to admin_nodeInfo
AdminNodeInfoResponse struct {
Expand Down
56 changes: 36 additions & 20 deletions cmd/devnet/services/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,57 +97,73 @@ func initializeTransactOps(nonce uint64) (*bind.TransactOpts, error) {
}

// txHashInBlock checks if the block with block number has the transaction hash in its list of transactions
func txHashInBlock(client *rpc.Client, hash common.Hash, blockNumber string) (uint64, bool, error) {
func txHashInBlock(client *rpc.Client, hashmap map[common.Hash]bool, blockNumber string) (uint64, int, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() // releases the resources held by the context

var currBlock models.Block
var (
currBlock models.Block
numFound int
)
err := client.CallContext(ctx, &currBlock, string(models.ETHGetBlockByNumber), blockNumber, false)
if err != nil {
return uint64(0), false, fmt.Errorf("failed to get block by number: %v", err)
return uint64(0), 0, fmt.Errorf("failed to get block by number: %v", err)
}

for _, txnHash := range currBlock.Transactions {
if txnHash == hash {
fmt.Printf("SUCCESS => Tx with hash %q is in mined block with number %q\n", hash, blockNumber)
return devnetutils.HexToInt(blockNumber), true, nil
// check if tx is in the hash set and remove it from the set if it is present
if _, ok := hashmap[txnHash]; ok {
numFound++
fmt.Printf("SUCCESS => Tx with hash %q is in mined block with number %q\n", txnHash, blockNumber)
delete(hashmap, txnHash)
if len(hashmap) == 0 {
return devnetutils.HexToInt(blockNumber), numFound, nil
}
}
}

return uint64(0), false, nil
return uint64(0), 0, nil
}

//func checkOldHeads(client *rpc.Client, hashmap map[common.Hash]bool) (uint64, int, error) {
//
// for _, blockNumber := range models.OldHeads {
// num, numFound, foundErr := txHashInBlock(client, hashmap, blockNumber)
// if foundErr != nil {
// return uint64(0), 0, foundErr
// }
// if numFound > 0 {
// models.OldHeads = []string{}
// return num, numFound, nil
// }
// }
// return uint64(0), 0, nil
//}

// EmitFallbackEvent emits an event from the contract using the fallback method
func EmitFallbackEvent(reqId int, subContract *contracts.Subscription, opts *bind.TransactOpts, address common.Address) error {
func EmitFallbackEvent(reqId int, subContract *contracts.Subscription, opts *bind.TransactOpts, address common.Address) (*common.Hash, error) {
fmt.Println("EMITTING EVENT FROM FALLBACK...")

// adding one to the nonce before initiating another transaction
opts.Nonce.Add(opts.Nonce, big.NewInt(1))

tx, err := subContract.Fallback(opts, []byte{})
if err != nil {
return fmt.Errorf("failed to emit event from fallback: %v", err)
return nil, fmt.Errorf("failed to emit event from fallback: %v", err)
}

signedTx, err := types.SignTx(tx, *signer, models.DevSignedPrivateKey)
if err != nil {
return fmt.Errorf("failed to sign fallback transaction: %v", err)
return nil, fmt.Errorf("failed to sign fallback transaction: %v", err)
}

hash, err := requests.SendTransaction(models.ReqId, &signedTx)
if err != nil {
return fmt.Errorf("failed to send fallback transaction: %v", err)
return nil, fmt.Errorf("failed to send fallback transaction: %v", err)
}
fmt.Printf("Tx submitted, adding tx with hash %q to txpool\n", hash)

blockN, err := SearchBlockForTransactionHash(*hash)
if err != nil {
return fmt.Errorf("failed to find tx in block: %v", err)
}

if err = requests.GetLogs(reqId, blockN, blockN, address, false); err != nil {
return fmt.Errorf("failed to get logs: %v", err)
}
// TODO: Get all the logs across the blocks that mined the transactions and check that they are logged

return nil
return hash, nil
}
Loading