Skip to content

Commit

Permalink
feat(SPV-1038): decouple sync job from SyncTx (#701)
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-4chain authored Sep 23, 2024
1 parent 4ded60a commit 9ce0fac
Show file tree
Hide file tree
Showing 33 changed files with 460 additions and 614 deletions.
11 changes: 4 additions & 7 deletions actions/transactions/broadcast_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,18 @@ import (
// broadcastCallback will handle a broadcastCallback call from the broadcast api
func broadcastCallback(c *gin.Context) {
logger := reqctx.Logger(c)
var resp *broadcast.SubmittedTx
var callbackResp *broadcast.SubmittedTx

err := c.Bind(&resp)
err := c.Bind(&callbackResp)
if err != nil {
spverrors.ErrorResponse(c, spverrors.ErrCannotBindRequest, logger)
return
}

err = reqctx.Engine(c).UpdateTransaction(c.Request.Context(), resp)
err = reqctx.Engine(c).HandleTxCallback(c.Request.Context(), callbackResp)
if err != nil {
logger.Err(err).Msgf("failed to update transaction - tx: %v", resp)
spverrors.ErrorResponse(c, err, logger)
return
logger.Err(err).Msgf("failed to update transaction - tx: %v", callbackResp)
}

// Return response
c.Status(http.StatusOK)
}
63 changes: 18 additions & 45 deletions engine/action_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package engine

import (
"context"
"errors"
"math"
"time"

"github.com/bitcoin-sv/go-broadcast-client/broadcast"
"github.com/bitcoin-sv/spv-wallet/engine/chainstate"
"github.com/bitcoin-sv/spv-wallet/engine/datastore"
"github.com/bitcoin-sv/spv-wallet/engine/spverrors"
"github.com/bitcoin-sv/spv-wallet/engine/utils"
Expand Down Expand Up @@ -256,9 +254,6 @@ func (c *Client) UpdateTransactionMetadata(ctx context.Context, xPubID, id strin
// yet been synced on-chain and the utxos have not been spent.
// All utxos that are reverted will be marked as deleted (and spent)
func (c *Client) RevertTransaction(ctx context.Context, id string) error {
// Check for existing NewRelic transaction
ctx = c.GetOrStartTxn(ctx, "revert_transaction_by_id")

// Get the transaction
transaction, err := c.GetTransaction(ctx, "", id)
if err != nil {
Expand All @@ -270,25 +265,14 @@ func (c *Client) RevertTransaction(ctx context.Context, id string) error {
return spverrors.ErrTxRevertEmptyDraftID
}

var draftTransaction *DraftTransaction
if draftTransaction, err = c.GetDraftTransactionByID(ctx, transaction.DraftID, c.DefaultModelOptions()...); err != nil {
draftTransaction, err := c.GetDraftTransactionByID(ctx, transaction.DraftID, c.DefaultModelOptions()...)
if err != nil {
return err
}
if draftTransaction == nil {
return spverrors.ErrTxRevertCouldNotFindDraftTx
}

// check whether transaction is not already on chain
var info *chainstate.TransactionInfo
if info, err = c.Chainstate().QueryTransaction(ctx, transaction.ID, chainstate.RequiredInMempool, 30*time.Second); err != nil {
if !errors.Is(err, spverrors.ErrCouldNotFindTransaction) {
return spverrors.Wrapf(err, "failed to query transaction %s on chain", transaction.ID)
}
}
if info != nil {
return spverrors.ErrTxRevertNotFoundOnChain
}

// check that the utxos of this transaction have not been spent
// this transaction needs to be the tip of the chain
conditions := map[string]interface{}{
Expand Down Expand Up @@ -356,16 +340,6 @@ func (c *Client) RevertTransaction(ctx context.Context, id string) error {
return err
}

// cancel sync transaction
var syncTransaction *SyncTransaction
if syncTransaction, err = GetSyncTransactionByID(ctx, transaction.ID, c.DefaultModelOptions()...); err != nil {
return err
}
syncTransaction.SyncStatus = SyncStatusCanceled
if err = syncTransaction.Save(ctx); err != nil {
return err
}

// revert transaction
// this takes the transaction out of any possible list view of the owners of the xpubs,
// but keeps a record of what went down
Expand All @@ -380,41 +354,40 @@ func (c *Client) RevertTransaction(ctx context.Context, id string) error {
transaction.XpubOutputValue = XpubOutputValue{"reverted": 0}
transaction.DeletedAt.Valid = true
transaction.DeletedAt.Time = time.Now()
transaction.TxStatus = TxStatusReverted

err = transaction.Save(ctx) // update existing record

return err
}

// UpdateTransaction will update the broadcast callback transaction info, like: block height, block hash, status, bump.
func (c *Client) UpdateTransaction(ctx context.Context, callbackResp *broadcast.SubmittedTx) error {
// HandleTxCallback will update the broadcast callback transaction info, like: block height, block hash, status, bump.
func (c *Client) HandleTxCallback(ctx context.Context, callbackResp *broadcast.SubmittedTx) error {
logger := c.options.logger
bump, err := bc.NewBUMPFromStr(callbackResp.MerklePath)
if err != nil {
c.options.logger.Err(err).Msgf("failed to parse merkle path from broadcast callback - tx: %v", callbackResp)
logger.Err(err).Msgf("failed to parse merkle path from broadcast callback - tx: %v", callbackResp)
return spverrors.Wrapf(err, "failed to parse merkle path from broadcast callback - tx: %v", callbackResp)
}

txInfo := &chainstate.TransactionInfo{
BlockHash: callbackResp.BlockHash,
BlockHeight: callbackResp.BlockHeight,
ID: callbackResp.TxID,
TxStatus: callbackResp.TxStatus,
BUMP: bump,
}
txID := callbackResp.TxID

tx, err := c.GetTransaction(ctx, "", txInfo.ID)
tx, err := c.GetTransaction(ctx, "", txID)
if err != nil {
c.options.logger.Err(err).Msgf("failed to get transaction by id: %v", txInfo.ID)
logger.Warn().Err(err).Msgf("failed to get transaction by id: %v", txID)
return err
}

syncTx, err := GetSyncTransactionByTxID(ctx, txInfo.ID, c.DefaultModelOptions()...)
if err != nil {
c.options.logger.Err(err).Msgf("failed to get sync transaction by tx id: %v", txInfo.ID)
return err
tx.BlockHash = callbackResp.BlockHash
tx.BlockHeight = uint64(callbackResp.BlockHeight)
tx.SetBUMP(bump)
tx.UpdateFromBroadcastStatus(callbackResp.TxStatus)

if err := tx.Save(ctx); err != nil {
return spverrors.ErrDuringSaveTx.Wrap(err)
}

return processSyncTxSave(ctx, txInfo, syncTx, tx)
return nil
}

func generateTxIDFilterConditions(txIDs []string) map[string]interface{} {
Expand Down
1 change: 0 additions & 1 deletion engine/action_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ func initRevertTransactionData(t *testing.T, clientOpts ...ClientOps) (context.C
require.NoError(t, err)

transaction.draftTransaction = draftTransaction
_hydrateOutgoingWithSync(transaction)

err = transaction.processUtxos(ctx)
require.NoError(t, err)
Expand Down
7 changes: 1 addition & 6 deletions engine/chainstate/chainstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,5 @@ func (c *Client) QueryTransaction(
return nil, spverrors.ErrInvalidRequirements
}

// Try all providers and return the "first" valid response
info := c.query(ctx, id, requiredIn, timeout)
if info == nil {
return nil, spverrors.ErrCouldNotFindTransaction
}
return info, nil
return c.query(ctx, id, requiredIn, timeout)
}
61 changes: 39 additions & 22 deletions engine/chainstate/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,67 @@ package chainstate

import (
"context"
"errors"
"strings"
"time"

"github.com/bitcoin-sv/go-broadcast-client/broadcast"
"github.com/bitcoin-sv/spv-wallet/engine/spverrors"
"github.com/libsv/go-bc"
)

// query will try ALL providers in order and return the first "valid" response based on requirements
func (c *Client) query(ctx context.Context, id string, requiredIn RequiredIn,
timeout time.Duration,
) *TransactionInfo {
) (*TransactionInfo, error) {
// Create a context (to cancel or timeout)
ctxWithCancel, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

resp, err := queryBroadcastClient(
txInfo, err := queryBroadcastClient(
ctxWithCancel, c, id,
)
if err == nil && checkRequirementArc(requiredIn, id, resp) {
return resp
if err != nil {
return nil, err
}
if !checkRequirementArc(requiredIn, id, txInfo) {
return nil, spverrors.ErrCouldNotFindTransaction
}

return nil // No transaction information found
return txInfo, nil
}

// queryBroadcastClient will submit a query transaction request to a go-broadcast-client
func queryBroadcastClient(ctx context.Context, client ClientInterface, id string) (*TransactionInfo, error) {
client.DebugLog("executing request using " + ProviderBroadcastClient)
if resp, failure := client.BroadcastClient().QueryTransaction(ctx, id); failure != nil {
client.DebugLog("error executing request using " + ProviderBroadcastClient + " failed: " + failure.Error())
return nil, spverrors.Wrapf(failure, "failed to query transaction using %s", ProviderBroadcastClient)
} else if resp != nil && strings.EqualFold(resp.TxID, id) {
bump, err := bc.NewBUMPFromStr(resp.BaseTxResponse.MerklePath)
if err != nil {
return nil, spverrors.Wrapf(err, "failed to parse BUMP from response: %s", resp.BaseTxResponse.MerklePath)
resp, err := client.BroadcastClient().QueryTransaction(ctx, id)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, spverrors.ErrBroadcastUnreachable
}
var arcError *broadcast.ArcError
if errors.As(err, &arcError) {
if arcError.IsRejectedTransaction() {
return nil, spverrors.ErrBroadcastRejectedTransaction.Wrap(err)
}
}
return &TransactionInfo{
BlockHash: resp.BlockHash,
BlockHeight: resp.BlockHeight,
ID: resp.TxID,
Provider: resp.Miner,
TxStatus: resp.TxStatus,
BUMP: bump,
}, nil
return nil, spverrors.ErrCouldNotFindTransaction.Wrap(err)
}

if resp == nil || !strings.EqualFold(resp.TxID, id) {
return nil, spverrors.ErrTransactionIDMismatch
}
return nil, spverrors.ErrTransactionIDMismatch

bump, err := bc.NewBUMPFromStr(resp.BaseTxResponse.MerklePath)
if err != nil {
return nil, spverrors.ErrBroadcastWrongBUMPResponse.Wrap(err)
}

return &TransactionInfo{
BlockHash: resp.BlockHash,
BlockHeight: resp.BlockHeight,
ID: resp.TxID,
Provider: resp.Miner,
TxStatus: resp.TxStatus,
BUMP: bump,
}, nil
}
5 changes: 0 additions & 5 deletions engine/chainstate/transaction_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,3 @@ type TransactionInfo struct {
BUMP *bc.BUMP `json:"bump,omitempty"` // Merkle proof in BUMP format
TxStatus broadcast.TxStatus `json:"tx_status,omitempty"` // Status of the transaction
}

// Valid validates TransactionInfo by checking if it contains BlockHash and BUMP
func (t *TransactionInfo) Valid() bool {
return t.BlockHash != "" && t.BUMP != nil
}
9 changes: 4 additions & 5 deletions engine/cron_job_declarations.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (

// Cron job names to be used in WithCronCustomPeriod
const (
CronJobNameDraftTransactionCleanUp = "draft_transaction_clean_up"
CronJobNameSyncTransactionBroadcast = "sync_transaction_broadcast"
CronJobNameSyncTransactionSync = "sync_transaction_sync"
CronJobNameCalculateMetrics = "calculate_metrics"
CronJobNameDraftTransactionCleanUp = "draft_transaction_clean_up"
CronJobNameSyncTransaction = "sync_transaction"
CronJobNameCalculateMetrics = "calculate_metrics"
)

type cronJobHandler func(ctx context.Context, client *Client) error
Expand Down Expand Up @@ -47,7 +46,7 @@ func (c *Client) cronJobs() taskmanager.CronJobs {
taskCleanupDraftTransactions,
)
addJob(
CronJobNameSyncTransactionSync,
CronJobNameSyncTransaction,
5*time.Minute,
taskSyncTransactions,
)
Expand Down
17 changes: 2 additions & 15 deletions engine/cron_job_definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,8 @@ func taskSyncTransactions(ctx context.Context, client *Client) error {
logClient := client.Logger()
logClient.Info().Msg("running sync transaction(s) task...")

// Prevent concurrent running
unlock, err := newWriteLock(
ctx, lockKeyProcessSyncTx, client.Cachestore(),
)
defer unlock()
if err != nil {
logClient.Warn().Msg("cannot run sync transaction(s) task, previous run is not complete yet...")
return nil //nolint:nilerr // previous run is not complete yet
}

err = processSyncTransactions(ctx, 100, WithClient(client))
if err == nil || errors.Is(err, datastore.ErrNoResults) {
return nil
}
return err
processSyncTransactions(ctx, client)
return nil
}

func taskCalculateMetrics(ctx context.Context, client *Client) error {
Expand Down
5 changes: 5 additions & 0 deletions engine/datastore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,8 @@ func (c *Client) IsDebug() bool {
func (c *Client) IsNewRelicEnabled() bool {
return c.options.newRelicEnabled
}

// DB returns ready to use gorm instance
func (c *Client) DB() *gorm.DB {
return c.options.db
}
1 change: 1 addition & 0 deletions engine/datastore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type StorageService interface {
NewRawTx() (*Transaction, error)
Raw(query string) *gorm.DB
SaveModel(ctx context.Context, model interface{}, tx *Transaction, newRecord, commitTx bool) error
DB() *gorm.DB
}

// GetterInterface is the getter methods
Expand Down
7 changes: 1 addition & 6 deletions engine/db_model_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,6 @@ func (m *Transaction) ChildModels() (childModels []ModelInterface) {
childModels = append(childModels, &m.utxos[index])
}

// Add the broadcast transaction record
if m.syncTransaction != nil {
childModels = append(childModels, m.syncTransaction)
}

return
}

Expand Down Expand Up @@ -188,7 +183,7 @@ func (m *Transaction) notify() {
XPubID: m.XPubID,
},
TransactionID: m.ID,
Status: m.TxStatus,
Status: string(m.TxStatus),
XpubOutputValue: m.XpubOutputValue,
})
}
Expand Down
2 changes: 1 addition & 1 deletion engine/examples/client/custom_cron/custom_cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func main() {
client, err := engine.NewClient(
context.Background(), // Set context
engine.WithCronCustomPeriod(engine.CronJobNameDraftTransactionCleanUp, 2*time.Second),
engine.WithCronCustomPeriod(engine.CronJobNameSyncTransactionSync, 4*time.Second),
engine.WithCronCustomPeriod(engine.CronJobNameSyncTransaction, 4*time.Second),
)
if err != nil {
log.Fatalln("error: " + err.Error())
Expand Down
2 changes: 1 addition & 1 deletion engine/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ type TransactionService interface {
RecordTransaction(ctx context.Context, xPubKey, txHex, draftID string,
opts ...ModelOps) (*Transaction, error)
RecordRawTransaction(ctx context.Context, txHex string, opts ...ModelOps) (*Transaction, error)
UpdateTransaction(ctx context.Context, txInfo *broadcast.SubmittedTx) error
HandleTxCallback(ctx context.Context, callbackResp *broadcast.SubmittedTx) error
UpdateTransactionMetadata(ctx context.Context, xPubID, id string, metadata Metadata) (*Transaction, error)
RevertTransaction(ctx context.Context, id string) error
}
Expand Down
7 changes: 6 additions & 1 deletion engine/model_bump.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,14 @@ func (bump *BUMP) Scan(value interface{}) error {
return spverrors.Wrapf(err, "failed to parse BUMP from JSON, data: %v", value)
}

// IsEmpty returns true if BUMP is empty (all fields are zero values)
func (bump BUMP) IsEmpty() bool {
return reflect.DeepEqual(bump, BUMP{})
}

// Value return json value, implement driver.Valuer interface
func (bump BUMP) Value() (driver.Value, error) {
if reflect.DeepEqual(bump, BUMP{}) {
if bump.IsEmpty() {
return nil, nil
}
marshal, err := json.Marshal(bump)
Expand Down
Loading

0 comments on commit 9ce0fac

Please sign in to comment.