Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into ws_jwt_client
Browse files Browse the repository at this point in the history
  • Loading branch information
tsahee committed Feb 25, 2023
2 parents 2688566 + 01cc043 commit c3e683a
Show file tree
Hide file tree
Showing 16 changed files with 426 additions and 95 deletions.
44 changes: 35 additions & 9 deletions arbitrum/apibackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func CreateFallbackClient(fallbackClientUrl string, fallbackClientTimeout time.D

type SyncProgressBackend interface {
SyncProgressMap() map[string]interface{}
SafeBlockNumber(ctx context.Context) (uint64, error)
FinalizedBlockNumber(ctx context.Context) (uint64, error)
}

func createRegisterAPIBackend(backend *Backend, sync SyncProgressBackend, filterConfig filters.Config, fallbackClientUrl string, fallbackClientTimeout time.Duration) (*filters.FilterSystem, error) {
Expand Down Expand Up @@ -310,34 +312,54 @@ func (a *APIBackend) SetHead(number uint64) {
}

func (a *APIBackend) HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) {
return HeaderByNumber(a.blockChain(), number), nil
return a.headerByNumberImpl(ctx, number)
}

func (a *APIBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
return a.blockChain().GetHeaderByHash(hash), nil
}

func HeaderByNumber(blockchain *core.BlockChain, number rpc.BlockNumber) *types.Header {
func (a *APIBackend) blockNumberToUint(ctx context.Context, number rpc.BlockNumber) (uint64, error) {
if number == rpc.LatestBlockNumber || number == rpc.PendingBlockNumber {
return blockchain.CurrentBlock().Header()
return a.blockChain().CurrentBlock().Number().Uint64(), nil
}
return blockchain.GetHeaderByNumber(uint64(number.Int64()))
if number == rpc.SafeBlockNumber {
return a.sync.SafeBlockNumber(ctx)
}
if number == rpc.FinalizedBlockNumber {
return a.sync.FinalizedBlockNumber(ctx)
}
if number < 0 {
return 0, errors.New("block number not supported")
}
return uint64(number.Int64()), nil
}

func HeaderByNumberOrHash(blockchain *core.BlockChain, blockNrOrHash rpc.BlockNumberOrHash) (*types.Header, error) {
func (a *APIBackend) headerByNumberImpl(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) {
if number == rpc.LatestBlockNumber || number == rpc.PendingBlockNumber {
return a.blockChain().CurrentBlock().Header(), nil
}
numUint, err := a.blockNumberToUint(ctx, number)
if err != nil {
return nil, err
}
return a.blockChain().GetHeaderByNumber(numUint), nil
}

func (a *APIBackend) headerByNumberOrHashImpl(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Header, error) {
number, isnum := blockNrOrHash.Number()
if isnum {
return HeaderByNumber(blockchain, number), nil
return a.headerByNumberImpl(ctx, number)
}
hash, ishash := blockNrOrHash.Hash()
if ishash {
return blockchain.GetHeaderByHash(hash), nil
return a.blockChain().GetHeaderByHash(hash), nil
}
return nil, errors.New("invalid arguments; neither block nor hash specified")
}

func (a *APIBackend) HeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Header, error) {
return HeaderByNumberOrHash(a.blockChain(), blockNrOrHash)
return a.headerByNumberOrHashImpl(ctx, blockNrOrHash)
}

func (a *APIBackend) CurrentHeader() *types.Header {
Expand All @@ -352,7 +374,11 @@ func (a *APIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber)
if number == rpc.LatestBlockNumber || number == rpc.PendingBlockNumber {
return a.blockChain().CurrentBlock(), nil
}
return a.blockChain().GetBlockByNumber(uint64(number.Int64())), nil
numUint, err := a.blockNumberToUint(ctx, number)
if err != nil {
return nil, err
}
return a.blockChain().GetBlockByNumber(numUint), nil
}

func (a *APIBackend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
Expand Down
2 changes: 1 addition & 1 deletion arbitrum/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func NewBackend(stack *node.Node, config *Config, chainDb ethdb.Database, publis
if err != nil {
return nil, nil, err
}
backend.shutdownTracker.MarkStartup()
return backend, filterSystem, nil
}

Expand Down Expand Up @@ -86,6 +85,7 @@ func (b *Backend) ArbInterface() ArbInterface {
// TODO: this is used when registering backend as lifecycle in stack
func (b *Backend) Start() error {
b.startBloomHandlers(b.config.BloomBitsBlocks)
b.shutdownTracker.MarkStartup()
b.shutdownTracker.Start()

return nil
Expand Down
3 changes: 2 additions & 1 deletion arbitrum/recordingdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,10 @@ func (r *RecordingDatabase) GetOrRecreateState(ctx context.Context, header *type
if currentHeader.Number.Uint64() <= genesis {
return nil, fmt.Errorf("moved beyond genesis looking for state looking for %d, genesis %d, err %w", returnedBlockNumber, genesis, err)
}
lastHeader := currentHeader
currentHeader = r.bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1)
if currentHeader == nil {
return nil, fmt.Errorf("chain doesn't contain parent of block %d hash %v", currentHeader.Number, currentHeader.Hash())
return nil, fmt.Errorf("chain doesn't contain parent of block %d hash %v (expected parent hash %v)", lastHeader.Number, lastHeader.Hash(), lastHeader.ParentHash)
}
stateDb, err = r.StateFor(currentHeader)
if err == nil {
Expand Down
10 changes: 5 additions & 5 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type Filter struct {
addresses []common.Address
topics [][]common.Hash

block common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks

matcher *bloombits.Matcher
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Add
func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
// Create a generic filter and convert it into a block filter
filter := newFilter(sys, addresses, topics)
filter.block = block
filter.block = &block
return filter
}

Expand All @@ -96,8 +96,8 @@ func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common.
// first block that contains matches, updating the start of the filter accordingly.
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
// If we're doing singleton block filtering, execute and return
if f.block != (common.Hash{}) {
header, err := f.sys.backend.HeaderByHash(ctx, f.block)
if f.block != nil {
header, err := f.sys.backend.HeaderByHash(ctx, *f.block)
if err != nil {
return nil, err
}
Expand Down
18 changes: 14 additions & 4 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,16 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
if h.checkpointHash != (common.Hash{}) {
// Request the peer's checkpoint header for chain height/weight validation
resCh := make(chan *eth.Response)
if _, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh); err != nil {

req, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh)
if err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
go func() {
// Ensure the request gets cancelled in case of error/drop
defer req.Close()

timeout := time.NewTimer(syncChallengeTimeout)
defer timeout.Stop()

Expand Down Expand Up @@ -437,10 +442,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
// If we have any explicit peer required block hashes, request them
for number, hash := range h.requiredBlocks {
resCh := make(chan *eth.Response)
if _, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh); err != nil {

req, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh)
if err != nil {
return err
}
go func(number uint64, hash common.Hash) {
go func(number uint64, hash common.Hash, req *eth.Request) {
// Ensure the request gets cancelled in case of error/drop
defer req.Close()

timeout := time.NewTimer(syncChallengeTimeout)
defer timeout.Stop()

Expand Down Expand Up @@ -469,7 +479,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
peer.Log().Warn("Required block challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name())
h.removePeer(peer.ID())
}
}(number, hash)
}(number, hash, req)
}
// Handle incoming messages until the connection is torn down
return handler(peer)
Expand Down
Loading

0 comments on commit c3e683a

Please sign in to comment.