Skip to content

Commit

Permalink
core/services: add MultiStart for staring multiple services together (#…
Browse files Browse the repository at this point in the history
…7501)

* core/services: add MultiStart for staring multiple services together

* core/services/chainlink: use MultiStart in ChainlinkApplication to clean up failed start

* sigscanner: attempt to fix auth

* Update .github/workflows/sigscanner.yml

Co-authored-by: chainchad <96362174+chainchad@users.noreply.github.com>

Co-authored-by: chainchad <96362174+chainchad@users.noreply.github.com>
  • Loading branch information
jmank88 and chainchad authored Sep 23, 2022
1 parent bb91c01 commit 54aafe5
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 34 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/sigscanner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ jobs:

runs-on: ubuntu-latest
steps:
- run: echo "🔎 Checking commit ${{ github.sha }} by ${{ github.actor }} in ${{ github.repository }} - ${{ github.event_name }}"
- run: |
- name: "SigScanner checking ${{ github.sha }} by ${{ github.actor }}"
env:
API_TOKEN: ${{ secrets.SIGSCANNER_API_TOKEN }}
API_URL: ${{ secrets.SIGSCANNER_API_URL }}
run: |
echo "🔎 Checking commit ${{ github.sha }} by ${{ github.actor }} in ${{ github.repository }} - ${{ github.event_name }}"
CODE=`curl --write-out '%{http_code}' -X POST -H "Content-Type: application/json" -H "Authorization: $API_TOKEN" --silent --output /dev/null --url "$API_URL" --data '{"commit":"${{ github.sha }}","repository":"${{ github.repository }}","author":"${{ github.actor }}"}'`
echo "Received $CODE"
if [[ "$CODE" == "200" ]]; then
Expand All @@ -20,6 +24,3 @@ jobs:
echo "❌ Commit is NOT verified"
exit 1
fi
env:
API_TOKEN: ${{ secrets.SIGSCANNER_API_TOKEN }}
API_URL: ${{ secrets.SIGSCANNER_API_URL }}
18 changes: 9 additions & 9 deletions core/chains/evm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func newChain(ctx context.Context, dbchain types.DBChain, nodes []types.Node, op
}

func (c *chain) Start(ctx context.Context) error {
return c.StartOnce("Chain", func() (merr error) {
return c.StartOnce("Chain", func() error {
c.logger.Debugf("Chain: starting with ID %s", c.ID().String())
// Must ensure that EthClient is dialed first because subsequent
// services may make eth calls on startup
Expand All @@ -165,17 +165,17 @@ func (c *chain) Start(ctx context.Context) error {
}
// We do not start the log poller here, it gets
// started after the jobs so they have a chance to apply their filters.
merr = multierr.Combine(
c.txm.Start(ctx),
c.headBroadcaster.Start(ctx),
c.headTracker.Start(ctx),
c.logBroadcaster.Start(ctx),
)
var ms services.MultiStart
if err := ms.Start(ctx, c.txm, c.headBroadcaster, c.headTracker, c.logBroadcaster); err != nil {
return err
}
if c.balanceMonitor != nil {
merr = multierr.Combine(merr, c.balanceMonitor.Start(ctx))
if err := ms.Start(ctx, c.balanceMonitor); err != nil {
return err
}
}

return merr
return nil
})
}

Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/forwarders/forwarder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (f *FwdMgr) collectAddresses() (addrs []common.Address) {
}

// Stop cancels all outgoings calls and stops internal ticker loop.
func (f *FwdMgr) Stop() error {
func (f *FwdMgr) Close() error {
return f.StopOnce("EVMForwarderManager", func() (err error) {
f.cancel()
f.wg.Wait()
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/forwarders/forwarder_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) {
addr, err := fwdMgr.GetForwarderForEOA(owner.From)
require.NoError(t, err)
require.Equal(t, addr, forwarderAddr)
err = fwdMgr.Stop()
err = fwdMgr.Close()
require.NoError(t, err)
}

Expand Down Expand Up @@ -114,6 +114,6 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) {
addr, err := fwdMgr.GetForwarderForEOA(owner.From)
require.ErrorContains(t, err, "Cannot find forwarder for given EOA")
require.Equal(t, addr, common.Address{})
err = fwdMgr.Stop()
err = fwdMgr.Close()
require.NoError(t, err)
}
2 changes: 1 addition & 1 deletion core/chains/evm/txmgr/eth_confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func NewEthConfirmer(db *sqlx.DB, ethClient evmclient.Client, config Config, key
}

// Start is a comment to appease the linter
func (ec *EthConfirmer) Start() error {
func (ec *EthConfirmer) Start(_ context.Context) error {
return ec.StartOnce("EthConfirmer", func() error {
if ec.config.EvmGasBumpThreshold() == 0 {
ec.lggr.Infow("Gas bumping is disabled (ETH_GAS_BUMP_THRESHOLD set to 0)", "ethGasBumpThreshold", 0)
Expand Down
14 changes: 7 additions & 7 deletions core/chains/evm/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,17 @@ func (b *Txm) Start(ctx context.Context) (merr error) {
return errors.Wrap(err, "Txm: failed to sync with on-chain nonce")
}
}

var ms services.MultiStart
eb := NewEthBroadcaster(b.db, b.ethClient, b.config, b.keyStore, b.eventBroadcaster, keyStates, b.gasEstimator, b.resumeCallback, b.logger, b.checkerFactory)
ec := NewEthConfirmer(b.db, b.ethClient, b.config, b.keyStore, keyStates, b.gasEstimator, b.resumeCallback, b.logger)
if err = eb.Start(ctx); err != nil {
if err = ms.Start(ctx, eb); err != nil {
return errors.Wrap(err, "Txm: EthBroadcaster failed to start")
}
if err = ec.Start(); err != nil {
if err = ms.Start(ctx, ec); err != nil {
return errors.Wrap(err, "Txm: EthConfirmer failed to start")
}

if err = b.gasEstimator.Start(ctx); err != nil {
if err = ms.Start(ctx, b.gasEstimator); err != nil {
return errors.Wrap(err, "Txm: Estimator failed to start")
}

Expand All @@ -220,7 +220,7 @@ func (b *Txm) Start(ctx context.Context) (merr error) {
}

if b.fwdMgr != nil {
if err = b.fwdMgr.Start(ctx); err != nil {
if err = ms.Start(ctx, b.fwdMgr); err != nil {
return errors.Wrap(err, "Txm: EVMForwarderManager failed to start")
}
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func (b *Txm) Close() (merr error) {
b.ethResender.Stop()
}
if b.fwdMgr != nil {
if err := b.fwdMgr.Stop(); err != nil {
if err := b.fwdMgr.Close(); err != nil {
return errors.Wrap(err, "Txm: failed to stop EVMForwarderManager")
}
}
Expand Down Expand Up @@ -354,7 +354,7 @@ func (b *Txm) runLoop(eb *EthBroadcaster, ec *EthConfirmer, keyStates []ethkey.S
for {
select {
case <-time.After(backoff.Duration()):
if err := ec.Start(); err != nil {
if err := ec.Start(ctx); err != nil {
b.logger.Criticalw("Failed to start EthConfirmer", "err", err)
continue
}
Expand Down
5 changes: 2 additions & 3 deletions core/chains/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,8 @@ func (c *chain) Start(ctx context.Context) error {
c.lggr.Debug("Starting")
c.lggr.Debug("Starting txm")
c.lggr.Debug("Starting balance monitor")
return multierr.Combine(
c.txm.Start(ctx),
c.balanceMonitor.Start(ctx))
var ms services.MultiStart
return ms.Start(ctx, c.txm, c.balanceMonitor)
})
}

Expand Down
8 changes: 4 additions & 4 deletions core/chains/terra/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (

sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/smartcontractkit/sqlx"

"github.com/smartcontractkit/chainlink-terra/pkg/terra"
terraclient "github.com/smartcontractkit/chainlink-terra/pkg/terra/client"
"github.com/smartcontractkit/chainlink-terra/pkg/terra/db"
"github.com/smartcontractkit/sqlx"

"github.com/smartcontractkit/chainlink/core/chains/terra/monitor"
"github.com/smartcontractkit/chainlink/core/chains/terra/terratxm"
Expand Down Expand Up @@ -137,9 +138,8 @@ func (c *chain) Start(ctx context.Context) error {

c.lggr.Debug("Starting txm")
c.lggr.Debug("Starting balance monitor")
return multierr.Combine(
c.txm.Start(ctx),
c.balanceMonitor.Start(ctx))
var ms services.MultiStart
return ms.Start(ctx, c.txm, c.balanceMonitor)
})
}

Expand Down
6 changes: 4 additions & 2 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,14 +505,16 @@ func (app *ChainlinkApplication) Start(ctx context.Context) error {
}
}

var ms services.MultiStart
for _, service := range app.srvcs {
if ctx.Err() != nil {
return errors.Wrap(ctx.Err(), "aborting start")
err := errors.Wrap(ctx.Err(), "aborting start")
return multierr.Combine(err, ms.Close())
}

app.logger.Debugw("Starting service...", "serviceType", reflect.TypeOf(service))

if err := service.Start(ctx); err != nil {
if err := ms.Start(ctx, service); err != nil {
return err
}
}
Expand Down
50 changes: 50 additions & 0 deletions core/services/must.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package services

import (
"context"

"go.uber.org/multierr"
)

// StartClose is a subset of the ServiceCtx interface.
type StartClose interface {
Start(context.Context) error
Close() error
}

// MultiStart is a utility for starting multiple services together.
// The set of started services is tracked internally, so that they can be closed if any single service fails to start.
type MultiStart struct {
started []StartClose
}

// Start attempts to Start all services. If any service fails to start, the previously started services will be
// Closed, and an error returned.
func (m *MultiStart) Start(ctx context.Context, srvcs ...StartClose) (err error) {
for _, s := range srvcs {
err = m.start(ctx, s)
if err != nil {
return err
}
}
return
}

func (m *MultiStart) start(ctx context.Context, s StartClose) (err error) {
err = s.Start(ctx)
if err != nil {
err = multierr.Append(err, m.Close())
} else {
m.started = append(m.started, s)
}
return
}

// Close closes all started services, in reverse order.
func (m *MultiStart) Close() (err error) {
for i := len(m.started) - 1; i >= 0; i-- {
s := m.started[i]
err = multierr.Append(err, s.Close())
}
return
}
63 changes: 63 additions & 0 deletions core/services/must_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package services

import (
"context"
"fmt"
)

type Healthy string

func (h Healthy) Start(ctx context.Context) error {
fmt.Println(h, "started")
return nil
}

func (h Healthy) Close() error {
fmt.Println(h, "closed")
return nil
}

type CloseFailure string

func (c CloseFailure) Start(ctx context.Context) error {
fmt.Println(c, "started")
return nil
}

func (c CloseFailure) Close() error {
fmt.Println(c, "close failure")
return fmt.Errorf("failed to close: %s", c)
}

type WontStart string

func (f WontStart) Start(ctx context.Context) error {
fmt.Println(f, "start failure")
return fmt.Errorf("failed to start: %s", f)
}

func (f WontStart) Close() error {
fmt.Println(f, "close failure")
return fmt.Errorf("cannot call Close after failed Start: %s", f)
}

func ExampleMultiStart() {
ctx := context.Background()

a := Healthy("a")
b := CloseFailure("b")
c := WontStart("c")

var ms MultiStart
if err := ms.Start(ctx, a, b, c); err != nil {
fmt.Println(err)
}

// Output:
// a started
// b started
// c start failure
// b close failure
// a closed
// failed to start: c; failed to close: b
}

0 comments on commit 54aafe5

Please sign in to comment.