From e5b6320669a48e6bbcd52673eec61821f5e3f5af Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Thu, 7 Dec 2023 16:03:05 +0000 Subject: [PATCH] add log messages Signed-off-by: Benjamin Wang --- bucket.go | 60 +++++++++++++++++++++++++++++++++---- db.go | 89 ++++++++++++++++++++++++++++++++++++++++++++++++------- tx.go | 38 +++++++++++++++++++----- 3 files changed, 163 insertions(+), 24 deletions(-) diff --git a/bucket.go b/bucket.go index 0d60a35e3..d9b384a2a 100644 --- a/bucket.go +++ b/bucket.go @@ -145,7 +145,16 @@ func (b *Bucket) openBucket(value []byte) *Bucket { // CreateBucket creates a new bucket at the given key and returns the new bucket. // Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long. // The bucket instance is only valid for the lifetime of the transaction. -func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) { +func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) { + lg := b.tx.db.Logger() + lg.Debugf("Creating bucket %q", string(key)) + defer func() { + if err != nil { + lg.Errorf("Creating bucket %q failed: %v", string(key), err) + } else { + lg.Debugf("Creating bucket %q successfully", string(key)) + } + }() if b.tx.db == nil { return nil, errors.ErrTxClosed } else if !b.tx.writable { @@ -192,7 +201,17 @@ func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) { // CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it. // Returns an error if the bucket name is blank, or if the bucket name is too long. // The bucket instance is only valid for the lifetime of the transaction. -func (b *Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error) { +func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) { + lg := b.tx.db.Logger() + lg.Debugf("Creating bucket if not exist %q", string(key)) + defer func() { + if err != nil { + lg.Errorf("Creating bucket if not exist %q failed: %v", string(key), err) + } else { + lg.Debugf("Creating bucket if not exist %q successfully", string(key)) + } + }() + if b.tx.db == nil { return nil, errors.ErrTxClosed } else if !b.tx.writable { @@ -249,7 +268,17 @@ func (b *Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error) { // DeleteBucket deletes a bucket at the given key. // Returns an error if the bucket does not exist, or if the key represents a non-bucket value. -func (b *Bucket) DeleteBucket(key []byte) error { +func (b *Bucket) DeleteBucket(key []byte) (err error) { + lg := b.tx.db.Logger() + lg.Debugf("Deleting bucket %q", string(key)) + defer func() { + if err != nil { + lg.Errorf("Deleting bucket %q failed: %v", string(key), err) + } else { + lg.Debugf("Deleting bucket %q successfully", string(key)) + } + }() + if b.tx.db == nil { return errors.ErrTxClosed } else if !b.Writable() { @@ -269,7 +298,7 @@ func (b *Bucket) DeleteBucket(key []byte) error { // Recursively delete all child buckets. child := b.Bucket(key) - err := child.ForEachBucket(func(k []byte) error { + err = child.ForEachBucket(func(k []byte) error { if err := child.DeleteBucket(k); err != nil { return fmt.Errorf("delete bucket: %s", err) } @@ -316,7 +345,16 @@ func (b *Bucket) Get(key []byte) []byte { // If the key exist then its previous value will be overwritten. // Supplied value must remain valid for the life of the transaction. // Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large. -func (b *Bucket) Put(key []byte, value []byte) error { +func (b *Bucket) Put(key []byte, value []byte) (err error) { + lg := b.tx.db.Logger() + lg.Debugf("Putting key %q", string(key)) + defer func() { + if err != nil { + lg.Errorf("Putting key %q failed: %v", string(key), err) + } else { + lg.Debugf("Putting key %q successfully", string(key)) + } + }() if b.tx.db == nil { return errors.ErrTxClosed } else if !b.Writable() { @@ -353,7 +391,17 @@ func (b *Bucket) Put(key []byte, value []byte) error { // Delete removes a key from the bucket. // If the key does not exist then nothing is done and a nil error is returned. // Returns an error if the bucket was created from a read-only transaction. -func (b *Bucket) Delete(key []byte) error { +func (b *Bucket) Delete(key []byte) (err error) { + lg := b.tx.db.Logger() + lg.Debugf("Deleting key %q", string(key)) + defer func() { + if err != nil { + lg.Errorf("Deleting key %q failed: %v", string(key), err) + } else { + lg.Debugf("Deleting key %q successfully", string(key)) + } + }() + if b.tx.db == nil { return errors.ErrTxClosed } else if !b.Writable() { diff --git a/db.go b/db.go index 2c5c694b3..fec355171 100644 --- a/db.go +++ b/db.go @@ -116,8 +116,7 @@ type DB struct { // Supported only on Unix via mlock/munlock syscalls. Mlock bool - // Logger is the logger used for bbolt. - Logger Logger + logger Logger path string openFile func(string, int, os.FileMode) (*os.File, error) @@ -176,10 +175,11 @@ func (db *DB) String() string { // If the file does not exist then it will be created automatically with a given file mode. // Passing in nil options will cause Bolt to open the database with the default options. // Note: For read/write transactions, ensure the owner has write permission on the created/opened database file, e.g. 0600 -func Open(path string, mode os.FileMode, options *Options) (*DB, error) { - db := &DB{ +func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) { + db = &DB{ opened: true, } + // Set default options if no options are provided. if options == nil { options = DefaultOptions @@ -198,11 +198,21 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { db.AllocSize = common.DefaultAllocSize if options.Logger == nil { - db.Logger = getDiscardLogger() + db.logger = getDiscardLogger() } else { - db.Logger = options.Logger + db.logger = options.Logger } + lg := db.Logger() + lg.Infof("Opening db file (%s) with mode %x and with options: %s", path, mode, options) + defer func() { + if err != nil { + lg.Errorf("Opening bbolt db (%s) failed: %v", path, err) + } else { + lg.Infof("Opening bbolt db (%s) successfully", path) + } + }() + flag := os.O_RDWR if options.ReadOnly { flag = os.O_RDONLY @@ -219,9 +229,9 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { } // Open data file and separate sync handler for metadata writes. - var err error if db.file, err = db.openFile(path, flag, mode); err != nil { _ = db.close() + lg.Errorf("failed to open db file (%s): %v", path, err) return nil, err } db.path = db.file.Name() @@ -235,6 +245,7 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { // hold a lock at the same time) otherwise (options.ReadOnly is set). if err := flock(db, !db.readOnly, options.Timeout); err != nil { _ = db.close() + lg.Errorf("failed to lock db file (%s), readonly: %t, error: %v", path, db.readOnly, err) return nil, err } @@ -249,12 +260,14 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { // Initialize the database if it doesn't exist. if info, err := db.file.Stat(); err != nil { _ = db.close() + lg.Errorf("failed to get db file's stats (%s): %v", path, err) return nil, err } else if info.Size() == 0 { // Initialize new files with meta pages. if err := db.init(); err != nil { // clean up file descriptor on initialization fail _ = db.close() + lg.Errorf("failed to initialize db file (%s): %v", path, err) return nil, err } } else { @@ -263,7 +276,8 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { db.pageSize = pgSize } else { _ = db.close() - return nil, berrors.ErrInvalid + lg.Errorf("failed to get page size from db file (%s): %v", path, err) + return nil, err } } @@ -277,6 +291,7 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { // Memory map the data file. if err := db.mmap(options.InitialMmapSize); err != nil { _ = db.close() + lg.Errorf("failed to map db file (%s): %v", path, err) return nil, err } @@ -296,13 +311,13 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { err = tx.Commit() } if err != nil { + lg.Errorf("starting readwrite transaction failed: %v", err) _ = db.close() return nil, err } } // Mark the database as opened and return. - db.Logger.Debug("bbolt opened successfully") return db, nil } @@ -435,9 +450,12 @@ func (db *DB) mmap(minsz int) (err error) { db.mmaplock.Lock() defer db.mmaplock.Unlock() + lg := db.Logger() + // Ensure the size is at least the minimum size. fileSize, err := db.fileSize() if err != nil { + lg.Errorf("getting file size failed: %w", err) return err } var size = fileSize @@ -446,6 +464,7 @@ func (db *DB) mmap(minsz int) (err error) { } size, err = db.mmapSize(size) if err != nil { + lg.Errorf("getting map size failed: %w", err) return err } @@ -470,6 +489,7 @@ func (db *DB) mmap(minsz int) (err error) { // gofail: var mapError string // return errors.New(mapError) if err = mmap(db, size); err != nil { + lg.Errorf("[GOOS: %s, GOARCH: %s] mmap failed, size: %d, error: %v", runtime.GOOS, runtime.GOARCH, size, err) return err } @@ -500,6 +520,7 @@ func (db *DB) mmap(minsz int) (err error) { err0 := db.meta0.Validate() err1 := db.meta1.Validate() if err0 != nil && err1 != nil { + lg.Errorf("both meta pages are invalid, meta0: %v, meta1: %v", err0, err1) return err0 } @@ -522,6 +543,7 @@ func (db *DB) munmap() error { // gofail: var unmapError string // return errors.New(unmapError) if err := munmap(db); err != nil { + db.Logger().Errorf("[GOOS: %s, GOARCH: %s] munmap failed, db.datasz: %d, error: %v", runtime.GOOS, runtime.GOARCH, db.datasz, err) return fmt.Errorf("unmap error: " + err.Error()) } @@ -569,6 +591,7 @@ func (db *DB) munlock(fileSize int) error { // gofail: var munlockError string // return errors.New(munlockError) if err := munlock(db, fileSize); err != nil { + db.Logger().Errorf("[GOOS: %s, GOARCH: %s] munlock failed, fileSize: %d, db.datasz: %d, error: %v", runtime.GOOS, runtime.GOARCH, fileSize, db.datasz, err) return fmt.Errorf("munlock error: " + err.Error()) } return nil @@ -578,6 +601,7 @@ func (db *DB) mlock(fileSize int) error { // gofail: var mlockError string // return errors.New(mlockError) if err := mlock(db, fileSize); err != nil { + db.Logger().Errorf("[GOOS: %s, GOARCH: %s] mlock failed, fileSize: %d, db.datasz: %d, error: %v", runtime.GOOS, runtime.GOARCH, fileSize, db.datasz, err) return fmt.Errorf("mlock error: " + err.Error()) } return nil @@ -628,9 +652,11 @@ func (db *DB) init() error { // Write the buffer to our data file. if _, err := db.ops.writeAt(buf, 0); err != nil { + db.Logger().Errorf("writeAt failed: %w", err) return err } if err := fdatasync(db); err != nil { + db.Logger().Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err) return err } @@ -713,13 +739,29 @@ func (db *DB) close() error { // // IMPORTANT: You must close read-only transactions after you are finished or // else the database will not reclaim old pages. -func (db *DB) Begin(writable bool) (*Tx, error) { +func (db *DB) Begin(writable bool) (t *Tx, err error) { + db.Logger().Debugf("Starting a new transaction [writable: %t]", writable) + defer func() { + if err != nil { + db.Logger().Errorf("Starting a new transaction [writable: %t] failed: %v", writable, err) + } else { + db.Logger().Debugf("Starting a new transaction [writable: %t] successfully", writable) + } + }() + if writable { return db.beginRWTx() } return db.beginTx() } +func (db *DB) Logger() Logger { + if db == nil || db.logger == nil { + return getDiscardLogger() + } + return db.logger +} + func (db *DB) beginTx() (*Tx, error) { // Lock the meta pages while we initialize the transaction. We obtain // the meta lock before the mmap lock because that's the order that the @@ -1053,7 +1095,18 @@ func safelyCall(fn func(*Tx) error, tx *Tx) (err error) { // // This is not necessary under normal operation, however, if you use NoSync // then it allows you to force the database file to sync against the disk. -func (db *DB) Sync() error { return fdatasync(db) } +func (db *DB) Sync() (err error) { + db.Logger().Debug("Syncing bbolt db (%s)", db.path) + defer func() { + if err != nil { + db.Logger().Errorf("[GOOS: %s, GOARCH: %s] syncing bbolt db (%s) failed: %v", runtime.GOOS, runtime.GOARCH, db.path, err) + } else { + db.Logger().Debugf("Syncing bbolt db (%s) successfully", db.path) + } + }() + + return fdatasync(db) +} // Stats retrieves ongoing performance stats for the database. // This is only updated when a transaction closes. @@ -1142,8 +1195,10 @@ func (db *DB) allocate(txid common.Txid, count int) (*common.Page, error) { // grow grows the size of the database to the given sz. func (db *DB) grow(sz int) error { // Ignore if the new size is less than available file size. + lg := db.Logger() fileSize, err := db.fileSize() if err != nil { + lg.Errorf("getting file size failed: %w", err) return err } if sz <= fileSize { @@ -1165,10 +1220,12 @@ func (db *DB) grow(sz int) error { // gofail: var resizeFileError string // return errors.New(resizeFileError) if err := db.file.Truncate(int64(sz)); err != nil { + lg.Errorf("[GOOS: %s, GOARCH: %s] truncating file failed, size: %d, db.datasz: %d, error: %v", runtime.GOOS, runtime.GOARCH, sz, db.datasz, err) return fmt.Errorf("file resize error: %s", err) } } if err := db.file.Sync(); err != nil { + lg.Errorf("[GOOS: %s, GOARCH: %s] syncing file failed, db.datasz: %d, error: %v", runtime.GOOS, runtime.GOARCH, db.datasz, err) return fmt.Errorf("file sync error: %s", err) } if db.Mlock { @@ -1283,6 +1340,16 @@ type Options struct { Logger Logger } +func (o *Options) String() string { + if o == nil { + return "{}" + } + + return fmt.Sprintf("{Timeout: %s, NoGrowSync: %t, NoFreelistSync: %t, PreLoadFreelist: %t, FreelistType: %s, ReadOnly: %t, MmapFlags: %x, InitialMmapSize: %d, PageSize: %d, NoSync: %t, OpenFile: %p, Mlock: %t, Logger: %p}", + o.Timeout, o.NoGrowSync, o.NoFreelistSync, o.PreLoadFreelist, o.FreelistType, o.ReadOnly, o.MmapFlags, o.InitialMmapSize, o.PageSize, o.NoSync, o.OpenFile, o.Mlock, o.Logger) + +} + // DefaultOptions represent the options used if nil options are passed into Open(). // No timeout is used which will cause Bolt to wait indefinitely for a lock. var DefaultOptions = &Options{ diff --git a/tx.go b/tx.go index f6cebf720..502f62683 100644 --- a/tx.go +++ b/tx.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "runtime" "sort" "strings" "sync/atomic" @@ -32,7 +33,6 @@ type Tx struct { pages map[common.Pgid]*common.Page stats TxStats commitHandlers []func() - Logger Logger // WriteFlag specifies the flag for write-related methods like WriteTo(). // Tx opens the database file with the specified flag to copy the data. @@ -57,8 +57,6 @@ func (tx *Tx) init(db *DB) { tx.root.InBucket = &common.InBucket{} *tx.root.InBucket = *(tx.meta.RootBucket()) - tx.Logger = db.Logger - // Increment the transaction id and add a page cache for writable transactions. if tx.writable { tx.pages = make(map[common.Pgid]*common.Page) @@ -68,6 +66,9 @@ func (tx *Tx) init(db *DB) { // ID returns the transaction id. func (tx *Tx) ID() int { + if tx == nil || tx.meta == nil { + return -1 + } return int(tx.meta.Txid()) } @@ -143,7 +144,18 @@ func (tx *Tx) OnCommit(fn func()) { // Commit writes all changes to disk, updates the meta page and closes the transaction. // Returns an error if a disk write error occurs, or if Commit is // called on a read-only transaction. -func (tx *Tx) Commit() error { +func (tx *Tx) Commit() (err error) { + txId := tx.ID() + lg := tx.db.Logger() + lg.Debugf("Committing transaction %d", txId) + defer func() { + if err != nil { + lg.Errorf("Committing transaction failed: %v", err) + } else { + lg.Debugf("Committing transaction %d successfully", txId) + } + }() + common.Assert(!tx.managed, "managed tx commit not allowed") if tx.db == nil { return berrors.ErrTxClosed @@ -151,7 +163,6 @@ func (tx *Tx) Commit() error { return berrors.ErrTxNotWritable } - tx.Logger.Infof("Committing transaction %d", tx.ID()) // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. // Rebalance nodes which have had deletions. @@ -166,6 +177,7 @@ func (tx *Tx) Commit() error { // spill data onto dirty pages. startTime = time.Now() if err := tx.root.spill(); err != nil { + lg.Errorf("spilling data onto dirty pages failed: %v", err) tx.rollback() return err } @@ -182,6 +194,7 @@ func (tx *Tx) Commit() error { if !tx.db.NoFreelistSync { err := tx.commitFreelist() if err != nil { + lg.Errorf("committing freelist failed: %v", err) return err } } else { @@ -195,6 +208,7 @@ func (tx *Tx) Commit() error { // tx.rollback() // return errors.New(lackOfDiskSpace) if err := tx.db.grow(int(tx.meta.Pgid()+1) * tx.db.pageSize); err != nil { + lg.Errorf("growing db size failed, pgid: %d, pagesize: %d, error: %v", tx.meta.Pgid(), tx.db.pageSize, err) tx.rollback() return err } @@ -202,7 +216,8 @@ func (tx *Tx) Commit() error { // Write dirty pages to disk. startTime = time.Now() - if err := tx.write(); err != nil { + if err = tx.write(); err != nil { + lg.Errorf("writing data failed: %v", err) tx.rollback() return err } @@ -224,7 +239,8 @@ func (tx *Tx) Commit() error { } // Write meta to disk. - if err := tx.writeMeta(); err != nil { + if err = tx.writeMeta(); err != nil { + lg.Errorf("writeMeta failed: %v", err) tx.rollback() return err } @@ -418,8 +434,10 @@ func (tx *Tx) CopyFile(path string, mode os.FileMode) error { // allocate returns a contiguous block of memory starting at a given page. func (tx *Tx) allocate(count int) (*common.Page, error) { + lg := tx.db.Logger() p, err := tx.db.allocate(tx.meta.Txid(), count) if err != nil { + lg.Errorf("allocating failed, txid: %d, count: %d, error: %v", tx.meta.Txid(), count, err) return nil, err } @@ -436,6 +454,7 @@ func (tx *Tx) allocate(count int) (*common.Page, error) { // write writes any dirty pages to disk. func (tx *Tx) write() error { // Sort pages by id. + lg := tx.db.Logger() pages := make(common.Pages, 0, len(tx.pages)) for _, p := range tx.pages { pages = append(pages, p) @@ -459,6 +478,7 @@ func (tx *Tx) write() error { buf := common.UnsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz)) if _, err := tx.db.ops.writeAt(buf, offset); err != nil { + lg.Errorf("writeAt failed, offset: %d: %w", offset, err) return err } @@ -481,6 +501,7 @@ func (tx *Tx) write() error { if !tx.db.NoSync || common.IgnoreNoSync { // gofail: var beforeSyncDataPages struct{} if err := fdatasync(tx.db); err != nil { + lg.Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err) return err } } @@ -508,17 +529,20 @@ func (tx *Tx) write() error { // writeMeta writes the meta to the disk. func (tx *Tx) writeMeta() error { // Create a temporary buffer for the meta page. + lg := tx.db.Logger() buf := make([]byte, tx.db.pageSize) p := tx.db.pageInBuffer(buf, 0) tx.meta.Write(p) // Write the meta page to file. if _, err := tx.db.ops.writeAt(buf, int64(p.Id())*int64(tx.db.pageSize)); err != nil { + lg.Errorf("writeAt failed, pgid: %d, pageSize: %d, error: %v", p.Id(), tx.db.pageSize, err) return err } if !tx.db.NoSync || common.IgnoreNoSync { // gofail: var beforeSyncMetaPage struct{} if err := fdatasync(tx.db); err != nil { + lg.Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err) return err } }