Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 2 additions & 54 deletions util/db/dbutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,12 @@ func (db *Accessor) IsSharedCacheConnection() bool {
// Atomic executes a piece of code with respect to the database atomically.
// For transactions where readOnly is false, sync determines whether or not to wait for the result.
func (db *Accessor) Atomic(fn idemFn, extras ...interface{}) (err error) {
return db.atomic(fn, nil, extras...)
return db.atomic(fn, extras...)
}

// Atomic executes a piece of code with respect to the database atomically.
// For transactions where readOnly is false, sync determines whether or not to wait for the result.
func (db *Accessor) atomic(fn idemFn, commitLocker sync.Locker, extras ...interface{}) (err error) {
func (db *Accessor) atomic(fn idemFn, extras ...interface{}) (err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should also eliminate atomic and leave just Atomic at this point.

atomicDeadline := time.Now().Add(time.Second)

// note that the sql library will drop panics inside an active transaction
Expand All @@ -232,29 +232,6 @@ func (db *Accessor) atomic(fn idemFn, commitLocker sync.Locker, extras ...interf
var conn *sql.Conn
ctx := context.Background()

commitWriteLockTaken := false
if commitLocker != nil && db.IsSharedCacheConnection() {
// When we're using in memory database, the sqlite implementation forces us to use a shared cache
// mode so that multiple connections ( i.e. read and write ) could share the database instance.
// ( it would also create issues between precompiled statements and regular atomic calls, as the former
// would generate a connection on the fly).
// when using a shared cache, we have to be aware that there are additional locking mechanisms that are
// internal to the sqlite. Two of them which play a role here are the sqlite_unlock_notify which
// prevents a shared cache locks from returning "database is busy" error and would block instead, and
// table level locks, which ensure that at any one time, a single table may have any number of active
// read-locks or a single active write lock.
// see https://www.sqlite.org/sharedcache.html for more details.
// These shared cache constrains are more strict than the WAL based concurrency limitations, which allows
// one writer and multiple readers at the same time.
// In particular, the shared cache limitation means that since a connection could become a writer, any synchronization
// operating that would prevent this operation from completing could result with a deadlock.
// This is the reason why for shared cache connections, we'll take the lock before starting the write transaction,
// and would keep it along. It will cause a degraded performance when using a shared cache connection
// compared to a private cache connection, but would grentee correct locking semantics.
commitLocker.Lock()
commitWriteLockTaken = true
}

for i := 0; (i == 0) || dbretry(err); i++ {
if i > 0 {
if i < infoTxRetries {
Expand All @@ -271,21 +248,11 @@ func (db *Accessor) atomic(fn idemFn, commitLocker sync.Locker, extras ...interf

if err != nil {
// fail case - unable to create database connection
if commitLocker != nil && commitWriteLockTaken {
commitLocker.Unlock()
}
return
}
defer conn.Close()

for i := 0; ; i++ {
// check if the lock was taken in previous iteration
if commitLocker != nil && (!db.IsSharedCacheConnection()) && commitWriteLockTaken {
// undo the lock.
commitLocker.Unlock()
commitWriteLockTaken = false
}

if i > 0 {
if i < infoTxRetries {
db.getDecoratedLogger(fn, extras).Infof("db.atomic: %d retries (last err: %v)", i, err)
Expand Down Expand Up @@ -319,12 +286,6 @@ func (db *Accessor) atomic(fn idemFn, commitLocker sync.Locker, extras ...interf
}
}

// if everytyhing went well, take the lock, as we're going to attempt to commit the transaction to database.
if commitLocker != nil && (!commitWriteLockTaken) && (!db.IsSharedCacheConnection()) {
commitLocker.Lock()
commitWriteLockTaken = true
}

err = tx.Commit()
if err == nil {
// update the deadline, as it might have been updated.
Expand All @@ -335,11 +296,6 @@ func (db *Accessor) atomic(fn idemFn, commitLocker sync.Locker, extras ...interf
}
}

// if we've errored, make sure to unlock the commitLocker ( if there is any )
if err != nil && commitLocker != nil && commitWriteLockTaken {
commitLocker.Unlock()
}

if time.Now().After(atomicDeadline) {
db.getDecoratedLogger(fn, extras).Warnf("dbatomic: tx surpassed expected deadline by %v", time.Now().Sub(atomicDeadline))
}
Expand All @@ -362,14 +318,6 @@ func ResetTransactionWarnDeadline(ctx context.Context, tx *sql.Tx, deadline time
return
}

// AtomicCommitWriteLock executes a piece of code with respect to the database atomically.
// For transactions where readOnly is false, sync determines whether or not to wait for the result.
// The commitLocker is being taken before the transaction is committed. In case of an error, the lock would get released.
// on all success cases ( i.e. err = nil ) the lock would be taken. on all the fail cases, the lock would be released
func (db *Accessor) AtomicCommitWriteLock(fn idemFn, commitLocker sync.Locker, extras ...interface{}) (err error) {
return db.atomic(fn, commitLocker, extras...)
}

// Vacuum perform a full-vacuum on the given database. In order for the vacuum to succeed, the storage needs to have
// double the amount of the current database size ( roughly ), and we cannot have any other transaction ( either read
// or write ) being active.
Expand Down