Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions lnd_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ type LndServicesConfig struct {
// block download is still in progress.
BlockUntilChainSynced bool

// BlockUntilChainNotifier indicates that the client should wait until
// the ChainNotifier RPC is accepting subscriptions. This requires lnd
// to be built with the "chainrpc" tag.
BlockUntilChainNotifier bool

// BlockUntilUnlocked denotes that the NewLndServices function should
// block until lnd is unlocked.
BlockUntilUnlocked bool
Expand Down Expand Up @@ -453,6 +458,33 @@ func NewLndServices(cfg *LndServicesConfig) (*GrpcLndServices, error) {
log.Infof("lnd is now fully synced to its chain backend")
}

// If requested, wait until the chain notifier RPC is ready before we
// return. This ensures sub-servers relying on the notifier don't fail
// during startup.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if cfg.BlockUntilChainNotifier {
if !hasBuildTag(services.Version, "chainrpc") {
cleanup()

return nil, fmt.Errorf("chain notifier build tag is " +
"required when waiting for chain notifier " +
"readiness")
}

log.Infof("Waiting for chain notifier RPC to be ready")

err := services.waitForChainNotifier(
cfg.CallerCtx, timeout, cfg.ChainSyncPollInterval,
)
if err != nil {
cleanup()

return nil, fmt.Errorf("error waiting for chain "+
"notifier readiness: %w", err)
}

log.Infof("Chain notifier RPC is ready")
}

return services, nil
}

Expand Down Expand Up @@ -533,6 +565,60 @@ func (s *GrpcLndServices) waitForChainSync(ctx context.Context,
return <-update
}

// waitForChainNotifier blocks until the ChainNotifier RPC accepts block epoch
// subscriptions and delivers at least one block height.
func (s *GrpcLndServices) waitForChainNotifier(ctx context.Context,
timeout, pollInterval time.Duration) error {

mainCtx := ctx
if mainCtx == nil {
mainCtx = context.Background()
}

register := s.ChainNotifier.RegisterBlockEpochNtfn

for {
// Make new RegisterBlockEpochNtfn call.
subCtx, cancel := context.WithTimeout(mainCtx, timeout)
blockChan, errChan, err := register(subCtx)
if err != nil {
cancel()

return fmt.Errorf("register block epoch ntfn: %w", err)
}

// Wait for block height notification, which indicates success.
select {
case <-mainCtx.Done():
cancel()

return mainCtx.Err()

case err := <-errChan:
cancel()

// If chainNotifier is not ready yet, retry.
if isChainNotifierStartingErr(err) {
select {
case <-time.After(pollInterval):
continue

case <-mainCtx.Done():
return mainCtx.Err()
}
}

return err

// We got a block height. Success!
case <-blockChan:
cancel()

return nil
}
}
}

// getLndInfo queries lnd for information about the node it is connected to.
// If the waitForUnlocked boolean is set, it will examine any errors returned
// and back off if the failure is due to lnd currently being locked. Otherwise,
Expand Down Expand Up @@ -676,6 +762,37 @@ func IsUnlockError(err error) bool {
return false
}

// chainNotifierStartupMessage matches the error string returned by lnd
// v0.20.0-rc3+ when a ChainNotifier RPC is invoked before the sub-server
// finishes initialization.
const chainNotifierStartupMessage = "chain notifier RPC is still in the " +
"process of starting"

// isChainNotifierStartingErr reports whether err is due to the lnd
// ChainNotifier sub-server still starting up. Starting with lnd v0.20.0-rc3
// the notifier is initialised later in the daemon lifecycle, and the RPC layer
// surfaces this as an Unknown gRPC status that contains the message defined in
// chainNotifierStartupMessage. There is a PR in LND to return code Unavailable
// instead of Unknown: https://github.com/lightningnetwork/lnd/pull/10352
func isChainNotifierStartingErr(err error) bool {
if err == nil {
return false
}

// gRPC code Unavailable means "the server can't handle this request
// now, retry later". LND's chain notifier returns this error when
// the server is starting.
// See https://github.com/lightningnetwork/lnd/pull/10352
st, ok := status.FromError(err)
if ok && st.Code() == codes.Unavailable {
return true
}

// TODO(ln-v0.20.0) remove the string fallback once lndclient depends on
// a version of lnd that returns codes.Unavailable for this condition.
return strings.Contains(err.Error(), chainNotifierStartupMessage)
}

// checkLndCompatibility makes sure the connected lnd instance is running on the
// correct network, has the version RPC implemented, is the correct minimal
// version and supports all required build tags/subservers.
Expand Down Expand Up @@ -809,6 +926,22 @@ func assertBuildTagsEnabled(actual *verrpc.Version,
return nil
}

// hasBuildTag reports whether the given version advertises the specified
// build tag.
func hasBuildTag(version *verrpc.Version, tag string) bool {
if version == nil {
return false
}

for _, t := range version.BuildTags {
if t == tag {
return true
}
}

return false
}

var (
defaultRPCPort = "10009"
defaultLndDir = btcutil.AppDataDir("lnd", false)
Expand Down
24 changes: 24 additions & 0 deletions lnd_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,27 @@ func TestCustomMacaroonHex(t *testing.T) {
_, err = NewLndServices(testCfg)
require.Error(t, err, "must set only one")
}

// TestIsChainNotifierStartingErr ensures we correctly detect the startup lag
// error returned by lnd v0.20.0-rc3+.
func TestIsChainNotifierStartingErr(t *testing.T) {
t.Parallel()

require.True(t, isChainNotifierStartingErr(
status.Error(codes.Unavailable, chainNotifierStartupMessage),
))

require.True(t, isChainNotifierStartingErr(
status.Error(codes.Unknown, chainNotifierStartupMessage),
))

require.True(t, isChainNotifierStartingErr(
status.Error(codes.Unavailable, "some other error"),
))

require.False(t, isChainNotifierStartingErr(nil))

require.False(t, isChainNotifierStartingErr(
status.Error(codes.Unknown, "some other error"),
))
}