Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: subscribe for payments and invoices #281

Merged
merged 12 commits into from
Jul 19, 2024
184 changes: 140 additions & 44 deletions lnclient/lnd/lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
decodepay "github.com/nbd-wtf/ln-decodepay"

"github.com/getAlby/hub/events"
"github.com/getAlby/hub/lnclient"
"github.com/getAlby/hub/lnclient/lnd/wrapper"
"github.com/getAlby/hub/logger"
Expand All @@ -24,13 +25,13 @@ import (
// "gorm.io/gorm"

"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
)

// wrap it again :sweat_smile:
// todo: drop dependency on lndhub package
type LNDService struct {
client *wrapper.LNDWrapper
pubkey string
cancel context.CancelFunc
}

func (svc *LNDService) GetBalance(ctx context.Context) (balance int64, err error) {
Expand Down Expand Up @@ -77,47 +78,13 @@ func (svc *LNDService) ListTransactions(ctx context.Context, from, until, limit,
// this will cause retrieved amount to be less than limit
continue
}
var paymentRequest decodepay.Bolt11
var expiresAt *int64
var description string
var descriptionHash string
if payment.PaymentRequest != "" {
paymentRequest, err = decodepay.Decodepay(strings.ToLower(payment.PaymentRequest))
if err != nil {
logger.Logger.WithFields(logrus.Fields{
"bolt11": payment.PaymentRequest,
}).Errorf("Failed to decode bolt11 invoice: %v", err)

return nil, err
}
expiresAtUnix := time.UnixMilli(int64(paymentRequest.CreatedAt) * 1000).Add(time.Duration(paymentRequest.Expiry) * time.Second).Unix()
expiresAt = &expiresAtUnix
description = paymentRequest.Description
descriptionHash = paymentRequest.DescriptionHash
}

var settledAt *int64
if payment.Status == lnrpc.Payment_SUCCEEDED {
// FIXME: how to get the actual settled at time?
settledAtUnix := time.Unix(0, payment.CreationTimeNs).Unix()
settledAt = &settledAtUnix
transaction, err := lndPaymentToTransaction(payment)
if err != nil {
return nil, err
}

transaction := lnclient.Transaction{
Type: "outgoing",
Invoice: payment.PaymentRequest,
Preimage: payment.PaymentPreimage,
PaymentHash: payment.PaymentHash,
Amount: payment.ValueMsat,
FeesPaid: payment.FeeMsat,
CreatedAt: time.Unix(0, payment.CreationTimeNs).Unix(),
Description: description,
DescriptionHash: descriptionHash,
ExpiresAt: expiresAt,
SettledAt: settledAt,
//TODO: Metadata: (e.g. keysend),
}
transactions = append(transactions, transaction)
transactions = append(transactions, *transaction)
}

// sort by created date descending
Expand Down Expand Up @@ -397,7 +364,7 @@ func (svc *LNDService) SendKeysend(ctx context.Context, amount uint64, destinati
return paymentHash, "", 0, errors.New(resp.PaymentError)
}
respPreimage := hex.EncodeToString(resp.PaymentPreimage)
if respPreimage == preimage {
if respPreimage != preimage {
logger.Logger.WithFields(logrus.Fields{
"amount": amount,
"payeePubkey": destination,
Expand Down Expand Up @@ -429,7 +396,7 @@ func makePreimageHex() ([]byte, error) {
return bytes, nil
}

func NewLNDService(ctx context.Context, lndAddress, lndCertHex, lndMacaroonHex string) (result lnclient.LNClient, err error) {
func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, lndAddress, lndCertHex, lndMacaroonHex string) (result lnclient.LNClient, err error) {
if lndAddress == "" || lndCertHex == "" || lndMacaroonHex == "" {
return nil, errors.New("one or more required LND configuration are missing")
}
Expand All @@ -448,14 +415,102 @@ func NewLNDService(ctx context.Context, lndAddress, lndCertHex, lndMacaroonHex s
return nil, err
}

lndService := &LNDService{client: lndClient, pubkey: info.IdentityPubkey}
lndCtx, cancel := context.WithCancel(ctx)

lndService := &LNDService{client: lndClient, pubkey: info.IdentityPubkey, cancel: cancel}

// Subscribe to payments
go func() {
for {
paymentStream, err := lndClient.SubscribePayments(lndCtx, &routerrpc.TrackPaymentsRequest{
NoInflightUpdates: true,
})
if err != nil {
logger.Logger.WithError(err).Error("Error subscribing to payments")
continue
}
for {
select {
case <-lndCtx.Done():
return
default:
payment, err := paymentStream.Recv()
if err != nil {
logger.Logger.WithError(err).Error("Failed to receive payment")
continue
}

var eventName string
switch payment.Status {
case lnrpc.Payment_FAILED:
eventName = "nwc_payment_failed_async"
case lnrpc.Payment_SUCCEEDED:
eventName = "nwc_payment_sent"
default:
continue
}

logger.Logger.WithFields(logrus.Fields{
"payment": payment,
}).Info("Received new payment")

transaction, err := lndPaymentToTransaction(payment)
if err != nil {
continue
}

eventPublisher.Publish(&events.Event{
Event: eventName,
Properties: transaction,
})
}
}
}
}()

// Subscribe to invoices
go func() {
for {
invoiceStream, err := lndClient.SubscribeInvoices(lndCtx, &lnrpc.InvoiceSubscription{})
if err != nil {
logger.Logger.WithError(err).Error("Error subscribing to invoices")
continue
}
for {
select {
case <-lndCtx.Done():
return
default:
invoice, err := invoiceStream.Recv()
if err != nil {
logger.Logger.WithError(err).Error("Failed to receive invoice")
continue
}
if invoice.State != lnrpc.Invoice_SETTLED {
continue
}

logger.Logger.WithFields(logrus.Fields{
"invoice": invoice,
}).Info("Received new invoice")

eventPublisher.Publish(&events.Event{
Event: "nwc_payment_received",
Properties: lndInvoiceToTransaction(invoice),
})
}
}
}
}()

logger.Logger.Infof("Connected to LND - alias %s", info.Alias)

return lndService, nil
}

func (svc *LNDService) Shutdown() error {
logger.Logger.Info("cancelling LND context")
svc.cancel()
return nil
}

Expand Down Expand Up @@ -693,6 +748,47 @@ func (svc *LNDService) GetBalances(ctx context.Context) (*lnclient.BalancesRespo
}, nil
}

func lndPaymentToTransaction(payment *lnrpc.Payment) (*lnclient.Transaction, error) {
var expiresAt *int64
var description string
var descriptionHash string
if payment.PaymentRequest != "" {
paymentRequest, err := decodepay.Decodepay(strings.ToLower(payment.PaymentRequest))
if err != nil {
logger.Logger.WithFields(logrus.Fields{
"bolt11": payment.PaymentRequest,
}).Errorf("Failed to decode bolt11 invoice: %v", err)
return nil, err
}
expiresAtUnix := time.UnixMilli(int64(paymentRequest.CreatedAt) * 1000).Add(time.Duration(paymentRequest.Expiry) * time.Second).Unix()
expiresAt = &expiresAtUnix
description = paymentRequest.Description
descriptionHash = paymentRequest.DescriptionHash
}

var settledAt *int64
if payment.Status == lnrpc.Payment_SUCCEEDED {
// FIXME: how to get the actual settled at time?
settledAtUnix := time.Unix(0, payment.CreationTimeNs).Unix()
settledAt = &settledAtUnix
}

return &lnclient.Transaction{
Type: "outgoing",
Invoice: payment.PaymentRequest,
Preimage: payment.PaymentPreimage,
PaymentHash: payment.PaymentHash,
Amount: payment.ValueMsat,
FeesPaid: payment.FeeMsat,
CreatedAt: time.Unix(0, payment.CreationTimeNs).Unix(),
Description: description,
DescriptionHash: descriptionHash,
ExpiresAt: expiresAt,
SettledAt: settledAt,
//TODO: Metadata: (e.g. keysend),
rolznz marked this conversation as resolved.
Show resolved Hide resolved
}, nil
}

func lndInvoiceToTransaction(invoice *lnrpc.Invoice) *lnclient.Transaction {
var settledAt *int64
preimage := hex.EncodeToString(invoice.RPreimage)
Expand Down Expand Up @@ -810,7 +906,7 @@ func (svc *LNDService) GetSupportedNIP47Methods() []string {
}

func (svc *LNDService) GetSupportedNIP47NotificationTypes() []string {
return []string{}
return []string{"payment_received", "payment_sent"}
}

func (svc *LNDService) GetPubkey() string {
Expand Down
4 changes: 4 additions & 0 deletions lnclient/lnd/wrapper/lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (wrapper *LNDWrapper) SubscribeInvoices(ctx context.Context, req *lnrpc.Inv
return wrapper.client.SubscribeInvoices(ctx, req, options...)
}

func (wrapper *LNDWrapper) SubscribePayments(ctx context.Context, req *routerrpc.TrackPaymentsRequest, options ...grpc.CallOption) (routerrpc.Router_TrackPaymentsClient, error) {
return wrapper.routerClient.TrackPayments(ctx, req, options...)
}

func (wrapper *LNDWrapper) ListInvoices(ctx context.Context, req *lnrpc.ListInvoiceRequest, options ...grpc.CallOption) (*lnrpc.ListInvoiceResponse, error) {
return wrapper.client.ListInvoices(ctx, req, options...)
}
Expand Down
2 changes: 1 addition & 1 deletion service/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (svc *service) launchLNBackend(ctx context.Context, encryptionKey string) e
LNDAddress, _ := svc.cfg.Get("LNDAddress", encryptionKey)
LNDCertHex, _ := svc.cfg.Get("LNDCertHex", encryptionKey)
LNDMacaroonHex, _ := svc.cfg.Get("LNDMacaroonHex", encryptionKey)
lnClient, err = lnd.NewLNDService(ctx, LNDAddress, LNDCertHex, LNDMacaroonHex)
lnClient, err = lnd.NewLNDService(ctx, svc.eventPublisher, LNDAddress, LNDCertHex, LNDMacaroonHex)
case config.LDKBackendType:
Mnemonic, _ := svc.cfg.Get("Mnemonic", encryptionKey)
LDKWorkdir := path.Join(svc.cfg.GetEnv().Workdir, "ldk")
Expand Down
Loading