Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/db: prune archived orders #2975

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions client/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ import (
)

const (
defaultRPCCertFile = "rpc.cert"
defaultRPCKeyFile = "rpc.key"
defaultMainnetHost = "127.0.0.1"
defaultTestnetHost = "127.0.0.2"
defaultSimnetHost = "127.0.0.3"
walletPairOneHost = "127.0.0.6"
walletPairTwoHost = "127.0.0.7"
defaultRPCPort = "5757"
defaultWebPort = "5758"
defaultLogLevel = "debug"
configFilename = "dexc.conf"
defaultRPCCertFile = "rpc.cert"
defaultRPCKeyFile = "rpc.key"
defaultMainnetHost = "127.0.0.1"
defaultTestnetHost = "127.0.0.2"
defaultSimnetHost = "127.0.0.3"
walletPairOneHost = "127.0.0.6"
walletPairTwoHost = "127.0.0.7"
defaultRPCPort = "5757"
defaultWebPort = "5758"
defaultLogLevel = "debug"
configFilename = "dexc.conf"
defaultArchiveSizeLimit = 1000
)

var (
Expand Down Expand Up @@ -106,6 +107,8 @@ type CoreConfig struct {
UnlockCoinsOnLogin bool `long:"release-wallet-coins" description:"On login or wallet creation, instruct the wallet to release any coins that it may have locked."`

ExtensionModeFile string `long:"extension-mode-file" description:"path to a file that specifies options for running core as an extension."`

ArchiveSizeLimit uint64 `long:"archivesize" description:"the maximum number of orders to be archived before deleting the oldest"`
Comment on lines +110 to +111
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could mention it cannot be zero, or error somewhere if it is.

}

// WebConfig encapsulates the configuration needed for the web server.
Expand Down Expand Up @@ -213,6 +216,7 @@ func (cfg *Config) Core(log dex.Logger) *core.Config {
NoAutoWalletLock: cfg.NoAutoWalletLock,
NoAutoDBBackup: cfg.NoAutoDBBackup,
ExtensionModeFile: cfg.ExtensionModeFile,
ArchiveSizeLimit: cfg.ArchiveSizeLimit,
}
}

Expand All @@ -223,6 +227,9 @@ var DefaultConfig = Config{
RPCConfig: RPCConfig{
CertHosts: []string{defaultTestnetHost, defaultSimnetHost, defaultMainnetHost},
},
CoreConfig: CoreConfig{
ArchiveSizeLimit: defaultArchiveSizeLimit,
},
}

// ParseCLIConfig parses the command-line arguments into the provided struct
Expand Down
4 changes: 4 additions & 0 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,9 @@ type Config struct {
// for running core in extension mode, which gives the caller options for
// e.g. limiting the ability to configure wallets.
ExtensionModeFile string
// ArchiveSizeLimit is the maximum number of orders that will be archived
// before we start deleting the oldest.
ArchiveSizeLimit uint64
}

// locale is data associated with the currently selected language.
Expand Down Expand Up @@ -1515,6 +1518,7 @@ func New(cfg *Config) (*Core, error) {
}
dbOpts := bolt.Opts{
BackupOnShutdown: !cfg.NoAutoDBBackup,
ArchiveSizeLimit: cfg.ArchiveSizeLimit,
}
boltDB, err := bolt.NewDB(cfg.DBPath, cfg.Logger.SubLogger("DB"), dbOpts)
if err != nil {
Expand Down
146 changes: 146 additions & 0 deletions client/db/bolt/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"time"

"decred.org/dcrdex/client/db"
Expand Down Expand Up @@ -138,6 +139,7 @@ var (
// Opts is a set of options for the DB.
type Opts struct {
BackupOnShutdown bool // default is true
ArchiveSizeLimit uint64
}

var defaultOpts = Opts{
Expand Down Expand Up @@ -234,8 +236,29 @@ func (db *BoltDB) fileSize(path string) int64 {

// Run waits for context cancellation and closes the database.
func (db *BoltDB) Run(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
tick := time.After(time.Minute)
for {
select {
case <-tick:
case <-ctx.Done():
return
}
if err := db.pruneArchivedOrders(); err != nil {
db.log.Errorf("Error cleaning archive: %v", err)
}
tick = time.After(time.Minute * 30)
}
}()

<-ctx.Done() // wait for shutdown to backup and compact

// Wait for archive cleaner to exit.
wg.Wait()

// Create a backup in the backups folder.
if db.opts.BackupOnShutdown {
db.log.Infof("Backing up database...")
Expand Down Expand Up @@ -408,6 +431,129 @@ func (db *BoltDB) SetPrimaryCredentials(creds *dexdb.PrimaryCredentials) error {
})
}

func (db *BoltDB) pruneArchivedOrders() error {
var archiveSizeLimit uint64 = 1000
if db.opts.ArchiveSizeLimit != 0 {
archiveSizeLimit = db.opts.ArchiveSizeLimit
}

return db.Update(func(tx *bbolt.Tx) error {
archivedOB := tx.Bucket(archivedOrdersBucket)
if archivedOB == nil {
return fmt.Errorf("failed to open %s bucket", string(archivedOrdersBucket))
}

// We won't delete any orders with active matches.
activeMatches := tx.Bucket(activeMatchesBucket)
if activeMatches == nil {
return fmt.Errorf("failed to open %s bucket", string(activeMatchesBucket))
}
oidsWithActiveMatches := make(map[order.OrderID]struct{}, 0)
if err := activeMatches.ForEach(func(k, _ []byte) error {
mBkt := activeMatches.Bucket(k)
if mBkt == nil {
return fmt.Errorf("error getting match bucket %x", k)
}
var oid order.OrderID
copy(oid[:], mBkt.Get(orderIDKey))
oidsWithActiveMatches[oid] = struct{}{}
return nil
}); err != nil {
return fmt.Errorf("error building active match order ID index: %w", err)
}

nOrds := uint64(archivedOB.Stats().BucketN - 1 /* BucketN includes top bucket */)
if nOrds <= archiveSizeLimit {
return nil
}
Comment on lines +465 to +468
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could do this first and maybe avoid collecting the active orders sometimes.


toClear := int(nOrds - archiveSizeLimit)

type orderStamp struct {
oid []byte
stamp uint64
}
deletes := make([]*orderStamp, 0, toClear)
sortDeletes := func() {
sort.Slice(deletes, func(i, j int) bool {
return deletes[i].stamp < deletes[j].stamp
})
}
if err := archivedOB.ForEach(func(oidB, v []byte) error {
var oid order.OrderID
copy(oid[:], oidB)
if _, found := oidsWithActiveMatches[oid]; found {
return nil
}
Comment on lines +485 to +487
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if instead of just not deleting the oids with active matches you don't delete them with any matches. I think the problem is that the archives get clogged up with orders that just get created and cancelled after the market moves. Orders that actually matched shouldn't get deleted because people need to keep records of those. You could even just check the swap fees here, and not delete any trade that has swap fees > 0.

oBkt := archivedOB.Bucket(oidB)
if oBkt == nil {
return fmt.Errorf("no order bucket iterated order %x", oidB)
}
stampB := oBkt.Get(updateTimeKey)
if stampB == nil {
// Highly improbable.
stampB = make([]byte, 8)
}
stamp := intCoder.Uint64(stampB)
if len(deletes) < toClear {
deletes = append(deletes, &orderStamp{
stamp: stamp,
oid: oidB,
})
sortDeletes()
return nil
}
if stamp > deletes[len(deletes)-1].stamp {
return nil
}
deletes[len(deletes)-1] = &orderStamp{
stamp: stamp,
oid: oidB,
}
sortDeletes()
return nil
}); err != nil {
return fmt.Errorf("archive iteration error: %v", err)
}

deletedOrders := make(map[order.OrderID]struct{})
for _, del := range deletes {
var oid order.OrderID
copy(oid[:], del.oid)
deletedOrders[oid] = struct{}{}
if err := archivedOB.DeleteBucket(del.oid); err != nil {
return fmt.Errorf("error deleting archived order %q: %v", del.oid, err)
}
}

matchesToDelete := make([][]byte, 0, archiveSizeLimit /* just avoid some allocs if we can */)
archivedMatches := tx.Bucket(archivedMatchesBucket)
if archivedMatches == nil {
return errors.New("no archived match bucket")
}
if err := archivedMatches.ForEach(func(k, _ []byte) error {
matchBkt := archivedMatches.Bucket(k)
if matchBkt == nil {
return fmt.Errorf("no bucket found for %x during iteration", k)
}
var oid order.OrderID
copy(oid[:], matchBkt.Get(orderIDKey))
if _, found := deletedOrders[oid]; found {
matchesToDelete = append(matchesToDelete, k)
}
return nil
}); err != nil {
return fmt.Errorf("error finding matches to prune: %w", err)
}
for i := range matchesToDelete {
if err := archivedMatches.DeleteBucket(matchesToDelete[i]); err != nil {
return fmt.Errorf("error deleting pruned match %x: %w", matchesToDelete[i], err)
}
}
return nil
})
}

// validateCreds checks that the PrimaryCredentials fields are properly
// populated.
func validateCreds(creds *dexdb.PrimaryCredentials) error {
Expand Down
Loading
Loading