Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(SPV-1038): decouple sync job from SyncTx #701

Merged
merged 16 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions actions/transactions/broadcast_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@ import (
// broadcastCallback will handle a broadcastCallback call from the broadcast api
func broadcastCallback(c *gin.Context) {
logger := reqctx.Logger(c)
var resp *broadcast.SubmittedTx
var callbackResp *broadcast.SubmittedTx

err := c.Bind(&resp)
err := c.Bind(&callbackResp)
if err != nil {
spverrors.ErrorResponse(c, spverrors.ErrCannotBindRequest, logger)
return
}

err = reqctx.Engine(c).UpdateTransaction(c.Request.Context(), resp)
err = reqctx.Engine(c).HandleTxCallback(c.Request.Context(), callbackResp)
if err != nil {
logger.Err(err).Msgf("failed to update transaction - tx: %v", resp)
logger.Err(err).Msgf("failed to update transaction - tx: %v", callbackResp)
dorzepowski marked this conversation as resolved.
Show resolved Hide resolved
spverrors.ErrorResponse(c, err, logger)
// TODO Does ARC cares about our errors?
return
}

Expand Down
61 changes: 17 additions & 44 deletions engine/action_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package engine

import (
"context"
"errors"
"math"
"time"

"github.com/bitcoin-sv/go-broadcast-client/broadcast"
"github.com/bitcoin-sv/spv-wallet/engine/chainstate"
"github.com/bitcoin-sv/spv-wallet/engine/datastore"
"github.com/bitcoin-sv/spv-wallet/engine/spverrors"
"github.com/bitcoin-sv/spv-wallet/engine/utils"
Expand Down Expand Up @@ -256,9 +254,6 @@ func (c *Client) UpdateTransactionMetadata(ctx context.Context, xPubID, id strin
// yet been synced on-chain and the utxos have not been spent.
// All utxos that are reverted will be marked as deleted (and spent)
func (c *Client) RevertTransaction(ctx context.Context, id string) error {
// Check for existing NewRelic transaction
ctx = c.GetOrStartTxn(ctx, "revert_transaction_by_id")

// Get the transaction
transaction, err := c.GetTransaction(ctx, "", id)
if err != nil {
Expand All @@ -270,25 +265,14 @@ func (c *Client) RevertTransaction(ctx context.Context, id string) error {
return spverrors.ErrTxRevertEmptyDraftID
}

var draftTransaction *DraftTransaction
if draftTransaction, err = c.GetDraftTransactionByID(ctx, transaction.DraftID, c.DefaultModelOptions()...); err != nil {
draftTransaction, err := c.GetDraftTransactionByID(ctx, transaction.DraftID, c.DefaultModelOptions()...)
if err != nil {
return err
}
if draftTransaction == nil {
return spverrors.ErrTxRevertCouldNotFindDraftTx
}

// check whether transaction is not already on chain
var info *chainstate.TransactionInfo
if info, err = c.Chainstate().QueryTransaction(ctx, transaction.ID, chainstate.RequiredInMempool, 30*time.Second); err != nil {
if !errors.Is(err, spverrors.ErrCouldNotFindTransaction) {
return spverrors.Wrapf(err, "failed to query transaction %s on chain", transaction.ID)
}
}
if info != nil {
return spverrors.ErrTxRevertNotFoundOnChain
}

// check that the utxos of this transaction have not been spent
// this transaction needs to be the tip of the chain
conditions := map[string]interface{}{
Expand Down Expand Up @@ -356,16 +340,6 @@ func (c *Client) RevertTransaction(ctx context.Context, id string) error {
return err
}

// cancel sync transaction
var syncTransaction *SyncTransaction
if syncTransaction, err = GetSyncTransactionByID(ctx, transaction.ID, c.DefaultModelOptions()...); err != nil {
return err
}
syncTransaction.SyncStatus = SyncStatusCanceled
if err = syncTransaction.Save(ctx); err != nil {
return err
}

// revert transaction
// this takes the transaction out of any possible list view of the owners of the xpubs,
// but keeps a record of what went down
Expand All @@ -380,41 +354,40 @@ func (c *Client) RevertTransaction(ctx context.Context, id string) error {
transaction.XpubOutputValue = XpubOutputValue{"reverted": 0}
transaction.DeletedAt.Valid = true
transaction.DeletedAt.Time = time.Now()
transaction.TxStatus = TxStatusReverted

err = transaction.Save(ctx) // update existing record

return err
}

// UpdateTransaction will update the broadcast callback transaction info, like: block height, block hash, status, bump.
func (c *Client) UpdateTransaction(ctx context.Context, callbackResp *broadcast.SubmittedTx) error {
// HandleTxCallback will update the broadcast callback transaction info, like: block height, block hash, status, bump.
func (c *Client) HandleTxCallback(ctx context.Context, callbackResp *broadcast.SubmittedTx) error {
logger := c.options.logger
bump, err := bc.NewBUMPFromStr(callbackResp.MerklePath)
if err != nil {
c.options.logger.Err(err).Msgf("failed to parse merkle path from broadcast callback - tx: %v", callbackResp)
logger.Err(err).Msgf("failed to parse merkle path from broadcast callback - tx: %v", callbackResp)
return spverrors.Wrapf(err, "failed to parse merkle path from broadcast callback - tx: %v", callbackResp)
}

txInfo := &chainstate.TransactionInfo{
BlockHash: callbackResp.BlockHash,
BlockHeight: callbackResp.BlockHeight,
ID: callbackResp.TxID,
TxStatus: callbackResp.TxStatus,
BUMP: bump,
}
txID := callbackResp.TxID

tx, err := c.GetTransaction(ctx, "", txInfo.ID)
tx, err := c.GetTransaction(ctx, "", txID)
if err != nil {
c.options.logger.Err(err).Msgf("failed to get transaction by id: %v", txInfo.ID)
logger.Warn().Err(err).Msgf("failed to get transaction by id: %v", txID)
return err
}

syncTx, err := GetSyncTransactionByTxID(ctx, txInfo.ID, c.DefaultModelOptions()...)
if err != nil {
c.options.logger.Err(err).Msgf("failed to get sync transaction by tx id: %v", txInfo.ID)
tx.BlockHash = callbackResp.BlockHash
tx.BlockHeight = uint64(callbackResp.BlockHeight)
tx.SetBUMP(bump)
tx.UpdateFromBroadcastStatus(callbackResp.TxStatus)

if err := tx.Save(ctx); err != nil {
chris-4chain marked this conversation as resolved.
Show resolved Hide resolved
return err
}

return processSyncTxSave(ctx, txInfo, syncTx, tx)
return nil
}

func generateTxIDFilterConditions(txIDs []string) map[string]interface{} {
Expand Down
1 change: 0 additions & 1 deletion engine/action_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ func initRevertTransactionData(t *testing.T, clientOpts ...ClientOps) (context.C
require.NoError(t, err)

transaction.draftTransaction = draftTransaction
_hydrateOutgoingWithSync(transaction)

err = transaction.processUtxos(ctx)
require.NoError(t, err)
Expand Down
7 changes: 1 addition & 6 deletions engine/chainstate/chainstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,7 @@ func (c *Client) QueryTransaction(
return nil, spverrors.ErrInvalidRequirements
}

chris-4chain marked this conversation as resolved.
Show resolved Hide resolved
// Try all providers and return the "first" valid response
info := c.query(ctx, id, requiredIn, timeout)
if info == nil {
return nil, spverrors.ErrCouldNotFindTransaction
}
return info, nil
return c.query(ctx, id, requiredIn, timeout)
}

// QueryTransactionFastest will get the transaction info from ALL provider(s) returning the "fastest" valid result
Expand Down
61 changes: 39 additions & 22 deletions engine/chainstate/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,35 @@ package chainstate

import (
"context"
"errors"
"strings"
"sync"
"time"

"github.com/bitcoin-sv/go-broadcast-client/broadcast"
"github.com/bitcoin-sv/spv-wallet/engine/spverrors"
"github.com/libsv/go-bc"
)

// query will try ALL providers in order and return the first "valid" response based on requirements
func (c *Client) query(ctx context.Context, id string, requiredIn RequiredIn,
timeout time.Duration,
) *TransactionInfo {
) (*TransactionInfo, error) {
// Create a context (to cancel or timeout)
ctxWithCancel, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

resp, err := queryBroadcastClient(
txInfo, err := queryBroadcastClient(
ctxWithCancel, c, id,
)
if err == nil && checkRequirementArc(requiredIn, id, resp) {
return resp
if err != nil {
return nil, err
}
if !checkRequirementArc(requiredIn, id, txInfo) {
return nil, spverrors.ErrCouldNotFindTransaction
}

return nil // No transaction information found
return txInfo, nil
}

// fastestQuery will try ALL providers on once and return the fastest "valid" response based on requirements
Expand Down Expand Up @@ -65,23 +70,35 @@ func (c *Client) fastestQuery(ctx context.Context, id string, requiredIn Require

// queryBroadcastClient will submit a query transaction request to a go-broadcast-client
func queryBroadcastClient(ctx context.Context, client ClientInterface, id string) (*TransactionInfo, error) {
client.DebugLog("executing request using " + ProviderBroadcastClient)
if resp, failure := client.BroadcastClient().QueryTransaction(ctx, id); failure != nil {
client.DebugLog("error executing request using " + ProviderBroadcastClient + " failed: " + failure.Error())
return nil, spverrors.Wrapf(failure, "failed to query transaction using %s", ProviderBroadcastClient)
} else if resp != nil && strings.EqualFold(resp.TxID, id) {
bump, err := bc.NewBUMPFromStr(resp.BaseTxResponse.MerklePath)
if err != nil {
return nil, spverrors.Wrapf(err, "failed to parse BUMP from response: %s", resp.BaseTxResponse.MerklePath)
resp, err := client.BroadcastClient().QueryTransaction(ctx, id)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, spverrors.ErrBroadcastUnreachable
}
return &TransactionInfo{
BlockHash: resp.BlockHash,
BlockHeight: resp.BlockHeight,
ID: resp.TxID,
Provider: resp.Miner,
TxStatus: resp.TxStatus,
BUMP: bump,
}, nil
var arcError *broadcast.ArcError
if errors.As(err, &arcError) {
if arcError.IsRejectedTransaction() {
return nil, spverrors.ErrBroadcastRejectedTransaction.Wrap(err)
}
}
return nil, spverrors.ErrCouldNotFindTransaction.Wrap(err)
}

if resp == nil || !strings.EqualFold(resp.TxID, id) {
return nil, spverrors.ErrTransactionIDMismatch
}

bump, err := bc.NewBUMPFromStr(resp.BaseTxResponse.MerklePath)
if err != nil {
return nil, spverrors.ErrBroadcastWrongBUMPResponse.Wrap(err)
}
return nil, spverrors.ErrTransactionIDMismatch

return &TransactionInfo{
BlockHash: resp.BlockHash,
BlockHeight: resp.BlockHeight,
ID: resp.TxID,
Provider: resp.Miner,
TxStatus: resp.TxStatus,
BUMP: bump,
}, nil
}
5 changes: 0 additions & 5 deletions engine/chainstate/transaction_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,3 @@ type TransactionInfo struct {
BUMP *bc.BUMP `json:"bump,omitempty"` // Merkle proof in BUMP format
TxStatus broadcast.TxStatus `json:"tx_status,omitempty"` // Status of the transaction
}

// Valid validates TransactionInfo by checking if it contains BlockHash and BUMP
func (t *TransactionInfo) Valid() bool {
return t.BlockHash != "" && t.BUMP != nil
}
9 changes: 4 additions & 5 deletions engine/cron_job_declarations.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (

// Cron job names to be used in WithCronCustomPeriod
const (
CronJobNameDraftTransactionCleanUp = "draft_transaction_clean_up"
CronJobNameSyncTransactionBroadcast = "sync_transaction_broadcast"
CronJobNameSyncTransactionSync = "sync_transaction_sync"
CronJobNameCalculateMetrics = "calculate_metrics"
CronJobNameDraftTransactionCleanUp = "draft_transaction_clean_up"
CronJobNameSyncTransaction = "sync_transaction"
CronJobNameCalculateMetrics = "calculate_metrics"
)

type cronJobHandler func(ctx context.Context, client *Client) error
Expand Down Expand Up @@ -47,7 +46,7 @@ func (c *Client) cronJobs() taskmanager.CronJobs {
taskCleanupDraftTransactions,
)
addJob(
CronJobNameSyncTransactionSync,
CronJobNameSyncTransaction,
5*time.Minute,
taskSyncTransactions,
)
Expand Down
17 changes: 2 additions & 15 deletions engine/cron_job_definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,8 @@ func taskSyncTransactions(ctx context.Context, client *Client) error {
logClient := client.Logger()
logClient.Info().Msg("running sync transaction(s) task...")

// Prevent concurrent running
unlock, err := newWriteLock(
ctx, lockKeyProcessSyncTx, client.Cachestore(),
)
defer unlock()
if err != nil {
logClient.Warn().Msg("cannot run sync transaction(s) task, previous run is not complete yet...")
return nil //nolint:nilerr // previous run is not complete yet
}

err = processSyncTransactions(ctx, 100, WithClient(client))
if err == nil || errors.Is(err, datastore.ErrNoResults) {
return nil
}
return err
processSyncTransactions(ctx, client)
return nil
}

func taskCalculateMetrics(ctx context.Context, client *Client) error {
Expand Down
5 changes: 5 additions & 0 deletions engine/datastore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,8 @@ func (c *Client) IsDebug() bool {
func (c *Client) IsNewRelicEnabled() bool {
return c.options.newRelicEnabled
}

// DB returns ready to use gorm instance
func (c *Client) DB() *gorm.DB {
return c.options.db
}
1 change: 1 addition & 0 deletions engine/datastore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type StorageService interface {
NewRawTx() (*Transaction, error)
Raw(query string) *gorm.DB
SaveModel(ctx context.Context, model interface{}, tx *Transaction, newRecord, commitTx bool) error
DB() *gorm.DB
}

// GetterInterface is the getter methods
Expand Down
7 changes: 1 addition & 6 deletions engine/db_model_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,6 @@ func (m *Transaction) ChildModels() (childModels []ModelInterface) {
childModels = append(childModels, &m.utxos[index])
}

// Add the broadcast transaction record
if m.syncTransaction != nil {
childModels = append(childModels, m.syncTransaction)
}

return
}

Expand Down Expand Up @@ -188,7 +183,7 @@ func (m *Transaction) notify() {
XPubID: m.XPubID,
},
TransactionID: m.ID,
Status: m.TxStatus,
Status: string(m.TxStatus),
XpubOutputValue: m.XpubOutputValue,
})
}
Expand Down
2 changes: 1 addition & 1 deletion engine/examples/client/custom_cron/custom_cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func main() {
client, err := engine.NewClient(
context.Background(), // Set context
engine.WithCronCustomPeriod(engine.CronJobNameDraftTransactionCleanUp, 2*time.Second),
engine.WithCronCustomPeriod(engine.CronJobNameSyncTransactionSync, 4*time.Second),
engine.WithCronCustomPeriod(engine.CronJobNameSyncTransaction, 4*time.Second),
)
if err != nil {
log.Fatalln("error: " + err.Error())
Expand Down
2 changes: 1 addition & 1 deletion engine/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ type TransactionService interface {
RecordTransaction(ctx context.Context, xPubKey, txHex, draftID string,
opts ...ModelOps) (*Transaction, error)
RecordRawTransaction(ctx context.Context, txHex string, opts ...ModelOps) (*Transaction, error)
UpdateTransaction(ctx context.Context, txInfo *broadcast.SubmittedTx) error
HandleTxCallback(ctx context.Context, callbackResp *broadcast.SubmittedTx) error
UpdateTransactionMetadata(ctx context.Context, xPubID, id string, metadata Metadata) (*Transaction, error)
RevertTransaction(ctx context.Context, id string) error
}
Expand Down
7 changes: 6 additions & 1 deletion engine/model_bump.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,14 @@ func (bump *BUMP) Scan(value interface{}) error {
return spverrors.Wrapf(err, "failed to parse BUMP from JSON, data: %v", value)
}

// Empty returns true if BUMP is empty (all fields are zero values)
func (bump BUMP) Empty() bool {
chris-4chain marked this conversation as resolved.
Show resolved Hide resolved
return reflect.DeepEqual(bump, BUMP{})
}

// Value return json value, implement driver.Valuer interface
func (bump BUMP) Value() (driver.Value, error) {
if reflect.DeepEqual(bump, BUMP{}) {
if bump.Empty() {
return nil, nil
}
marshal, err := json.Marshal(bump)
Expand Down
Loading
Loading